]> Sergey Matveev's repositories - btrtrc.git/blob - peerconn_test.go
Merge pull request #935 from anacrolix/dependabot/go_modules/tests/issue-930/golang...
[btrtrc.git] / peerconn_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "net"
9         "net/netip"
10         "sync"
11         "testing"
12
13         g "github.com/anacrolix/generics"
14         "github.com/frankban/quicktest"
15         qt "github.com/frankban/quicktest"
16         "github.com/stretchr/testify/require"
17         "golang.org/x/time/rate"
18
19         "github.com/anacrolix/torrent/metainfo"
20         pp "github.com/anacrolix/torrent/peer_protocol"
21         "github.com/anacrolix/torrent/storage"
22 )
23
24 // Ensure that no race exists between sending a bitfield, and a subsequent
25 // Have that would potentially alter it.
26 func TestSendBitfieldThenHave(t *testing.T) {
27         var cl Client
28         cl.init(TestingConfig(t))
29         cl.initLogger()
30         qtc := qt.New(t)
31         c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"})
32         c.setTorrent(cl.newTorrentForTesting())
33         err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)})
34         qtc.Assert(err, qt.IsNil)
35         r, w := io.Pipe()
36         // c.r = r
37         c.w = w
38         c.startMessageWriter()
39         c.locker().Lock()
40         c.t._completedPieces.Add(1)
41         c.postBitfield( /*[]bool{false, true, false}*/ )
42         c.locker().Unlock()
43         c.locker().Lock()
44         c.have(2)
45         c.locker().Unlock()
46         b := make([]byte, 15)
47         n, err := io.ReadFull(r, b)
48         c.locker().Lock()
49         // This will cause connection.writer to terminate.
50         c.closed.Set()
51         c.locker().Unlock()
52         require.NoError(t, err)
53         require.EqualValues(t, 15, n)
54         // Here we see that the bitfield doesn't have piece 2 set, as that should
55         // arrive in the following Have message.
56         require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
57 }
58
59 type torrentStorage struct {
60         allChunksWritten sync.WaitGroup
61 }
62
63 func (me *torrentStorage) Close() error { return nil }
64
65 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
66         return me
67 }
68
69 func (me *torrentStorage) Completion() storage.Completion {
70         return storage.Completion{}
71 }
72
73 func (me *torrentStorage) MarkComplete() error {
74         return nil
75 }
76
77 func (me *torrentStorage) MarkNotComplete() error {
78         return nil
79 }
80
81 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
82         panic("shouldn't be called")
83 }
84
85 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
86         if len(b) != defaultChunkSize {
87                 panic(len(b))
88         }
89         me.allChunksWritten.Done()
90         return len(b), nil
91 }
92
93 func BenchmarkConnectionMainReadLoop(b *testing.B) {
94         c := quicktest.New(b)
95         var cl Client
96         cl.init(&ClientConfig{
97                 DownloadRateLimiter: unlimited,
98         })
99         cl.initLogger()
100         ts := &torrentStorage{}
101         t := cl.newTorrentForTesting()
102         t.initialPieceCheckDisabled = true
103         require.NoError(b, t.setInfo(&metainfo.Info{
104                 Pieces:      make([]byte, 20),
105                 Length:      1 << 20,
106                 PieceLength: 1 << 20,
107         }))
108         t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
109         t.onSetInfo()
110         t._pendingPieces.Add(0)
111         r, w := net.Pipe()
112         c.Logf("pipe reader remote addr: %v", r.RemoteAddr())
113         cn := cl.newConnection(r, newConnectionOpts{
114                 outgoing: true,
115                 // TODO: This is a hack to give the pipe a bannable remote address.
116                 remoteAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{1, 2, 3, 4}), 1234),
117                 network:    r.RemoteAddr().Network(),
118                 connString: regularNetConnPeerConnConnString(r),
119         })
120         c.Assert(cn.bannableAddr.Ok, qt.IsTrue)
121         cn.setTorrent(t)
122         requestIndexBegin := t.pieceRequestIndexOffset(0)
123         requestIndexEnd := t.pieceRequestIndexOffset(1)
124         eachRequestIndex := func(f func(ri RequestIndex)) {
125                 for ri := requestIndexBegin; ri < requestIndexEnd; ri++ {
126                         f(ri)
127                 }
128         }
129         const chunkSize = defaultChunkSize
130         numRequests := requestIndexEnd - requestIndexBegin
131         msgBufs := make([][]byte, 0, numRequests)
132         eachRequestIndex(func(ri RequestIndex) {
133                 msgBufs = append(msgBufs, pp.Message{
134                         Type:  pp.Piece,
135                         Piece: make([]byte, chunkSize),
136                         Begin: pp.Integer(chunkSize) * pp.Integer(ri),
137                 }.MustMarshalBinary())
138         })
139         // errgroup can't handle this pattern...
140         allErrors := make(chan error, 2)
141         var wg sync.WaitGroup
142         wg.Add(1)
143         go func() {
144                 defer wg.Done()
145                 cl.lock()
146                 err := cn.mainReadLoop()
147                 if errors.Is(err, io.EOF) {
148                         err = nil
149                 }
150                 allErrors <- err
151         }()
152         b.SetBytes(chunkSize * int64(numRequests))
153         wg.Add(1)
154         go func() {
155                 defer wg.Done()
156                 for i := 0; i < b.N; i += 1 {
157                         cl.lock()
158                         // The chunk must be written to storage everytime, to ensure the
159                         // writeSem is unlocked.
160                         t.pendAllChunkSpecs(0)
161                         g.MakeMapIfNil(&cn.validReceiveChunks)
162                         eachRequestIndex(func(ri RequestIndex) {
163                                 cn.validReceiveChunks[ri] = 1
164                         })
165                         cl.unlock()
166                         ts.allChunksWritten.Add(int(numRequests))
167                         for _, wb := range msgBufs {
168                                 n, err := w.Write(wb)
169                                 require.NoError(b, err)
170                                 require.EqualValues(b, len(wb), n)
171                         }
172                         // This is unlocked by a successful write to storage. So this unblocks when that is
173                         // done.
174                         ts.allChunksWritten.Wait()
175                 }
176                 if err := w.Close(); err != nil {
177                         panic(err)
178                 }
179         }()
180         go func() {
181                 wg.Wait()
182                 close(allErrors)
183         }()
184         var err error
185         for err = range allErrors {
186                 if err != nil {
187                         break
188                 }
189         }
190         c.Assert(err, qt.IsNil)
191         c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N)*int64(numRequests))
192         c.Assert(t.smartBanCache.HasBlocks(), qt.IsTrue)
193 }
194
195 func TestConnPexPeerFlags(t *testing.T) {
196         var (
197                 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
198                 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
199         )
200         testcases := []struct {
201                 conn *PeerConn
202                 f    pp.PexPeerFlags
203         }{
204                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
205                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
206                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
207                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
208                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
209                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
210                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
211                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
212         }
213         for i, tc := range testcases {
214                 f := tc.conn.pexPeerFlags()
215                 require.EqualValues(t, tc.f, f, i)
216         }
217 }
218
219 func TestConnPexEvent(t *testing.T) {
220         c := qt.New(t)
221         var (
222                 udpAddr     = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
223                 tcpAddr     = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
224                 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
225                 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
226         )
227         testcases := []struct {
228                 t pexEventType
229                 c *PeerConn
230                 e pexEvent
231         }{
232                 {
233                         pexAdd,
234                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
235                         pexEvent{pexAdd, udpAddr.AddrPort(), pp.PexSupportsUtp, nil},
236                 },
237                 {
238                         pexDrop,
239                         &PeerConn{
240                                 Peer:           Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true},
241                                 PeerListenPort: dialTcpAddr.Port,
242                         },
243                         pexEvent{pexDrop, tcpAddr.AddrPort(), pp.PexOutgoingConn, nil},
244                 },
245                 {
246                         pexAdd,
247                         &PeerConn{
248                                 Peer:           Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()},
249                                 PeerListenPort: dialTcpAddr.Port,
250                         },
251                         pexEvent{pexAdd, dialTcpAddr.AddrPort(), 0, nil},
252                 },
253                 {
254                         pexDrop,
255                         &PeerConn{
256                                 Peer:           Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()},
257                                 PeerListenPort: dialUdpAddr.Port,
258                         },
259                         pexEvent{pexDrop, dialUdpAddr.AddrPort(), pp.PexSupportsUtp, nil},
260                 },
261         }
262         for i, tc := range testcases {
263                 c.Run(fmt.Sprintf("%v", i), func(c *qt.C) {
264                         e, err := tc.c.pexEvent(tc.t)
265                         c.Assert(err, qt.IsNil)
266                         c.Check(e, qt.Equals, tc.e)
267                 })
268         }
269 }
270
271 func TestHaveAllThenBitfield(t *testing.T) {
272         c := qt.New(t)
273         cl := newTestingClient(t)
274         tt := cl.newTorrentForTesting()
275         // cl.newConnection()
276         pc := PeerConn{
277                 Peer: Peer{t: tt},
278         }
279         pc.initRequestState()
280         pc.peerImpl = &pc
281         tt.conns[&pc] = struct{}{}
282         c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
283         c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
284         pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
285         c.Check(pc.peerMinPieces, qt.Equals, 6)
286         c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
287         c.Assert(pc.t.setInfo(&metainfo.Info{
288                 PieceLength: 0,
289                 Pieces:      make([]byte, pieceHash.Size()*7),
290         }), qt.IsNil)
291         pc.t.onSetInfo()
292         c.Check(tt.numPieces(), qt.Equals, 7)
293         c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
294                 // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
295                 // pieces.
296                 {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
297         })
298 }
299
300 func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
301         c := qt.New(t)
302         c.Check(interestedMsgLen, qt.Equals, 5)
303         c.Check(requestMsgLen, qt.Equals, 17)
304         c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
305         c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
306 }
307
308 func peerConnForPreferredNetworkDirection(
309         localPeerId, remotePeerId int,
310         outgoing, utp, ipv6 bool,
311 ) *PeerConn {
312         pc := PeerConn{}
313         pc.outgoing = outgoing
314         if utp {
315                 pc.Network = "udp"
316         }
317         if ipv6 {
318                 pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP("::420")}
319         } else {
320                 pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)}
321         }
322         binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId))
323         cl := Client{}
324         binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId))
325         pc.t = &Torrent{cl: &cl}
326         return &pc
327 }
328
329 func TestPreferredNetworkDirection(t *testing.T) {
330         pc := peerConnForPreferredNetworkDirection
331         c := qt.New(t)
332
333         // Prefer outgoing to lower peer ID
334
335         c.Check(
336                 pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
337                 qt.IsFalse,
338         )
339         c.Check(
340                 pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)),
341                 qt.IsTrue,
342         )
343         c.Check(
344                 pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)),
345                 qt.IsFalse,
346         )
347
348         // Don't prefer uTP
349         c.Check(
350                 pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
351                 qt.IsFalse,
352         )
353         // Prefer IPv6
354         c.Check(
355                 pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)),
356                 qt.IsFalse,
357         )
358         // No difference
359         c.Check(
360                 pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
361                 qt.IsFalse,
362         )
363 }
364
365 func TestReceiveLargeRequest(t *testing.T) {
366         c := qt.New(t)
367         cl := newTestingClient(t)
368         pc := cl.newConnection(nil, newConnectionOpts{network: "test"})
369         tor := cl.newTorrentForTesting()
370         tor.info = &metainfo.Info{PieceLength: 3 << 20}
371         pc.setTorrent(tor)
372         tor._completedPieces.Add(0)
373         pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true)
374         pc.choking = false
375         pc.initMessageWriter()
376         req := Request{}
377         req.Length = defaultChunkSize
378         c.Assert(pc.fastEnabled(), qt.IsTrue)
379         c.Check(pc.onReadRequest(req, false), qt.IsNil)
380         c.Check(pc.peerRequests, qt.HasLen, 1)
381         req.Length = 2 << 20
382         c.Check(pc.onReadRequest(req, false), qt.IsNil)
383         c.Check(pc.peerRequests, qt.HasLen, 2)
384         pc.peerRequests = nil
385         pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize)
386         req.Length = defaultChunkSize
387         c.Check(pc.onReadRequest(req, false), qt.IsNil)
388         c.Check(pc.peerRequests, qt.HasLen, 1)
389         req.Length = 2 << 20
390         c.Check(pc.onReadRequest(req, false), qt.IsNil)
391         c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17)
392 }
393
394 func TestChunkOverflowsPiece(t *testing.T) {
395         c := qt.New(t)
396         check := func(begin, length, limit pp.Integer, expected bool) {
397                 c.Check(chunkOverflowsPiece(ChunkSpec{begin, length}, limit), qt.Equals, expected)
398         }
399         check(2, 3, 1, true)
400         check(2, pp.IntegerMax, 1, true)
401         check(2, pp.IntegerMax, 3, true)
402         check(2, pp.IntegerMax, pp.IntegerMax, true)
403         check(2, pp.IntegerMax-2, pp.IntegerMax, false)
404 }