]> Sergey Matveev's repositories - btrtrc.git/blob - peerconn_test.go
Add tests for preferred network direction
[btrtrc.git] / peerconn_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "net"
9         "sync"
10         "testing"
11
12         "github.com/frankban/quicktest"
13         qt "github.com/frankban/quicktest"
14         "github.com/stretchr/testify/require"
15
16         "github.com/anacrolix/torrent/metainfo"
17         pp "github.com/anacrolix/torrent/peer_protocol"
18         "github.com/anacrolix/torrent/storage"
19 )
20
21 // Ensure that no race exists between sending a bitfield, and a subsequent
22 // Have that would potentially alter it.
23 func TestSendBitfieldThenHave(t *testing.T) {
24         var cl Client
25         cl.init(TestingConfig(t))
26         cl.initLogger()
27         c := cl.newConnection(nil, false, nil, "io.Pipe", "")
28         c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
29         if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
30                 t.Log(err)
31         }
32         r, w := io.Pipe()
33         // c.r = r
34         c.w = w
35         c.startWriter()
36         c.locker().Lock()
37         c.t._completedPieces.Add(1)
38         c.postBitfield( /*[]bool{false, true, false}*/ )
39         c.locker().Unlock()
40         c.locker().Lock()
41         c.have(2)
42         c.locker().Unlock()
43         b := make([]byte, 15)
44         n, err := io.ReadFull(r, b)
45         c.locker().Lock()
46         // This will cause connection.writer to terminate.
47         c.closed.Set()
48         c.locker().Unlock()
49         require.NoError(t, err)
50         require.EqualValues(t, 15, n)
51         // Here we see that the bitfield doesn't have piece 2 set, as that should
52         // arrive in the following Have message.
53         require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
54 }
55
56 type torrentStorage struct {
57         writeSem sync.Mutex
58 }
59
60 func (me *torrentStorage) Close() error { return nil }
61
62 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
63         return me
64 }
65
66 func (me *torrentStorage) Completion() storage.Completion {
67         return storage.Completion{}
68 }
69
70 func (me *torrentStorage) MarkComplete() error {
71         return nil
72 }
73
74 func (me *torrentStorage) MarkNotComplete() error {
75         return nil
76 }
77
78 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
79         panic("shouldn't be called")
80 }
81
82 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
83         if len(b) != defaultChunkSize {
84                 panic(len(b))
85         }
86         me.writeSem.Unlock()
87         return len(b), nil
88 }
89
90 func BenchmarkConnectionMainReadLoop(b *testing.B) {
91         c := quicktest.New(b)
92         var cl Client
93         cl.init(&ClientConfig{
94                 DownloadRateLimiter: unlimited,
95         })
96         cl.initLogger()
97         ts := &torrentStorage{}
98         t := cl.newTorrent(metainfo.Hash{}, nil)
99         t.initialPieceCheckDisabled = true
100         require.NoError(b, t.setInfo(&metainfo.Info{
101                 Pieces:      make([]byte, 20),
102                 Length:      1 << 20,
103                 PieceLength: 1 << 20,
104         }))
105         t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
106         t.onSetInfo()
107         t._pendingPieces.Add(0)
108         r, w := net.Pipe()
109         cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r))
110         cn.setTorrent(t)
111         mrlErrChan := make(chan error)
112         msg := pp.Message{
113                 Type:  pp.Piece,
114                 Piece: make([]byte, defaultChunkSize),
115         }
116         go func() {
117                 cl.lock()
118                 err := cn.mainReadLoop()
119                 if err != nil {
120                         mrlErrChan <- err
121                 }
122                 close(mrlErrChan)
123         }()
124         wb := msg.MustMarshalBinary()
125         b.SetBytes(int64(len(msg.Piece)))
126         go func() {
127                 ts.writeSem.Lock()
128                 for i := 0; i < b.N; i += 1 {
129                         cl.lock()
130                         // The chunk must be written to storage everytime, to ensure the
131                         // writeSem is unlocked.
132                         t.pendAllChunkSpecs(0)
133                         cn.validReceiveChunks = map[RequestIndex]int{
134                                 t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
135                         }
136                         cl.unlock()
137                         n, err := w.Write(wb)
138                         require.NoError(b, err)
139                         require.EqualValues(b, len(wb), n)
140                         ts.writeSem.Lock()
141                 }
142                 if err := w.Close(); err != nil {
143                         panic(err)
144                 }
145         }()
146         mrlErr := <-mrlErrChan
147         if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
148                 c.Fatal(mrlErr)
149         }
150         c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
151 }
152
153 func TestConnPexPeerFlags(t *testing.T) {
154         var (
155                 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
156                 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
157         )
158         testcases := []struct {
159                 conn *PeerConn
160                 f    pp.PexPeerFlags
161         }{
162                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
163                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
164                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
165                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
166                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
167                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
168                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
169                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
170         }
171         for i, tc := range testcases {
172                 f := tc.conn.pexPeerFlags()
173                 require.EqualValues(t, tc.f, f, i)
174         }
175 }
176
177 func TestConnPexEvent(t *testing.T) {
178         var (
179                 udpAddr     = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
180                 tcpAddr     = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
181                 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
182                 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
183         )
184         testcases := []struct {
185                 t pexEventType
186                 c *PeerConn
187                 e pexEvent
188         }{
189                 {
190                         pexAdd,
191                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
192                         pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil},
193                 },
194                 {
195                         pexDrop,
196                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
197                         pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
198                 },
199                 {
200                         pexAdd,
201                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
202                         pexEvent{pexAdd, dialTcpAddr, 0, nil},
203                 },
204                 {
205                         pexDrop,
206                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
207                         pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
208                 },
209         }
210         for i, tc := range testcases {
211                 e := tc.c.pexEvent(tc.t)
212                 require.EqualValues(t, tc.e, e, i)
213         }
214 }
215
216 func TestHaveAllThenBitfield(t *testing.T) {
217         c := qt.New(t)
218         cl := newTestingClient(t)
219         tt := cl.newTorrentForTesting()
220         // cl.newConnection()
221         pc := PeerConn{
222                 Peer: Peer{t: tt},
223         }
224         pc.peerImpl = &pc
225         tt.conns[&pc] = struct{}{}
226         c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
227         c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
228         pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
229         c.Check(pc.peerMinPieces, qt.Equals, 6)
230         c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
231         c.Assert(pc.t.setInfo(&metainfo.Info{
232                 PieceLength: 0,
233                 Pieces:      make([]byte, pieceHash.Size()*7),
234         }), qt.IsNil)
235         pc.t.onSetInfo()
236         c.Check(tt.numPieces(), qt.Equals, 7)
237         c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
238                 // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
239                 // pieces.
240                 {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
241         })
242 }
243
244 func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
245         c := qt.New(t)
246         c.Check(interestedMsgLen, qt.Equals, 5)
247         c.Check(requestMsgLen, qt.Equals, 17)
248         c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
249         c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
250 }
251
252 func peerConnForPreferredNetworkDirection(localPeerId, remotePeerId int, outgoing, utp, ipv6 bool) *PeerConn {
253         pc := PeerConn{}
254         pc.outgoing = outgoing
255         if utp {
256                 pc.Network = "udp"
257         }
258         if ipv6 {
259                 pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP(fmt.Sprintf("::420"))}
260         } else {
261                 pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)}
262         }
263         binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId))
264         cl := Client{}
265         binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId))
266         pc.t = &Torrent{cl: &cl}
267         return &pc
268 }
269
270 func TestPreferredNetworkDirection(t *testing.T) {
271         pc := peerConnForPreferredNetworkDirection
272         c := qt.New(t)
273         // Prefer outgoing to higher peer ID
274         c.Assert(pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsTrue)
275         c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)), qt.IsFalse)
276         c.Assert(pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)), qt.IsTrue)
277         // Don't prefer uTP
278         c.Assert(pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
279         // Prefer IPv6
280         c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)), qt.IsFalse)
281         // No difference
282         c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
283 }