13 g "github.com/anacrolix/generics"
14 "github.com/frankban/quicktest"
15 qt "github.com/frankban/quicktest"
16 "github.com/stretchr/testify/require"
17 "golang.org/x/time/rate"
19 "github.com/anacrolix/torrent/metainfo"
20 pp "github.com/anacrolix/torrent/peer_protocol"
21 "github.com/anacrolix/torrent/storage"
24 // Ensure that no race exists between sending a bitfield, and a subsequent
25 // Have that would potentially alter it.
26 func TestSendBitfieldThenHave(t *testing.T) {
28 cl.init(TestingConfig(t))
31 c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"})
32 c.setTorrent(cl.newTorrentForTesting())
33 err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)})
34 qtc.Assert(err, qt.IsNil)
38 c.startMessageWriter()
40 c.t._completedPieces.Add(1)
41 c.postBitfield( /*[]bool{false, true, false}*/ )
47 n, err := io.ReadFull(r, b)
49 // This will cause connection.writer to terminate.
52 require.NoError(t, err)
53 require.EqualValues(t, 15, n)
54 // Here we see that the bitfield doesn't have piece 2 set, as that should
55 // arrive in the following Have message.
56 require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
59 type torrentStorage struct {
60 allChunksWritten sync.WaitGroup
63 func (me *torrentStorage) Close() error { return nil }
65 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
69 func (me *torrentStorage) Completion() storage.Completion {
70 return storage.Completion{}
73 func (me *torrentStorage) MarkComplete() error {
77 func (me *torrentStorage) MarkNotComplete() error {
81 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
82 panic("shouldn't be called")
85 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
86 if len(b) != defaultChunkSize {
89 me.allChunksWritten.Done()
93 func BenchmarkConnectionMainReadLoop(b *testing.B) {
96 cl.init(&ClientConfig{
97 DownloadRateLimiter: unlimited,
100 ts := &torrentStorage{}
101 t := cl.newTorrentForTesting()
102 t.initialPieceCheckDisabled = true
103 require.NoError(b, t.setInfo(&metainfo.Info{
104 Pieces: make([]byte, 20),
106 PieceLength: 1 << 20,
108 t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
110 t._pendingPieces.Add(0)
112 c.Logf("pipe reader remote addr: %v", r.RemoteAddr())
113 cn := cl.newConnection(r, newConnectionOpts{
115 // TODO: This is a hack to give the pipe a bannable remote address.
116 remoteAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{1, 2, 3, 4}), 1234),
117 network: r.RemoteAddr().Network(),
118 connString: regularNetConnPeerConnConnString(r),
120 c.Assert(cn.bannableAddr.Ok, qt.IsTrue)
122 requestIndexBegin := t.pieceRequestIndexOffset(0)
123 requestIndexEnd := t.pieceRequestIndexOffset(1)
124 eachRequestIndex := func(f func(ri RequestIndex)) {
125 for ri := requestIndexBegin; ri < requestIndexEnd; ri++ {
129 const chunkSize = defaultChunkSize
130 numRequests := requestIndexEnd - requestIndexBegin
131 msgBufs := make([][]byte, 0, numRequests)
132 eachRequestIndex(func(ri RequestIndex) {
133 msgBufs = append(msgBufs, pp.Message{
135 Piece: make([]byte, chunkSize),
136 Begin: pp.Integer(chunkSize) * pp.Integer(ri),
137 }.MustMarshalBinary())
139 // errgroup can't handle this pattern...
140 allErrors := make(chan error, 2)
141 var wg sync.WaitGroup
146 err := cn.mainReadLoop()
147 if errors.Is(err, io.EOF) {
152 b.SetBytes(chunkSize * int64(numRequests))
156 for i := 0; i < b.N; i += 1 {
158 // The chunk must be written to storage everytime, to ensure the
159 // writeSem is unlocked.
160 t.pendAllChunkSpecs(0)
161 g.MakeMapIfNil(&cn.validReceiveChunks)
162 eachRequestIndex(func(ri RequestIndex) {
163 cn.validReceiveChunks[ri] = 1
166 ts.allChunksWritten.Add(int(numRequests))
167 for _, wb := range msgBufs {
168 n, err := w.Write(wb)
169 require.NoError(b, err)
170 require.EqualValues(b, len(wb), n)
172 // This is unlocked by a successful write to storage. So this unblocks when that is
174 ts.allChunksWritten.Wait()
176 if err := w.Close(); err != nil {
185 for err = range allErrors {
190 c.Assert(err, qt.IsNil)
191 c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N)*int64(numRequests))
192 c.Assert(t.smartBanCache.HasBlocks(), qt.IsTrue)
195 func TestConnPexPeerFlags(t *testing.T) {
197 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
198 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
200 testcases := []struct {
204 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
205 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
206 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
207 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
208 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
209 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
210 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
211 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
213 for i, tc := range testcases {
214 f := tc.conn.pexPeerFlags()
215 require.EqualValues(t, tc.f, f, i)
219 func TestConnPexEvent(t *testing.T) {
222 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
223 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
224 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
225 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
227 testcases := []struct {
234 &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
235 pexEvent{pexAdd, udpAddr.AddrPort(), pp.PexSupportsUtp, nil},
240 Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true},
241 PeerListenPort: dialTcpAddr.Port,
243 pexEvent{pexDrop, tcpAddr.AddrPort(), pp.PexOutgoingConn, nil},
248 Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()},
249 PeerListenPort: dialTcpAddr.Port,
251 pexEvent{pexAdd, dialTcpAddr.AddrPort(), 0, nil},
256 Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()},
257 PeerListenPort: dialUdpAddr.Port,
259 pexEvent{pexDrop, dialUdpAddr.AddrPort(), pp.PexSupportsUtp, nil},
262 for i, tc := range testcases {
263 c.Run(fmt.Sprintf("%v", i), func(c *qt.C) {
264 e, err := tc.c.pexEvent(tc.t)
265 c.Assert(err, qt.IsNil)
266 c.Check(e, qt.Equals, tc.e)
271 func TestHaveAllThenBitfield(t *testing.T) {
273 cl := newTestingClient(t)
274 tt := cl.newTorrentForTesting()
275 // cl.newConnection()
279 pc.initRequestState()
281 tt.conns[&pc] = struct{}{}
282 c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
283 c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
284 pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
285 c.Check(pc.peerMinPieces, qt.Equals, 6)
286 c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
287 c.Assert(pc.t.setInfo(&metainfo.Info{
289 Pieces: make([]byte, pieceHash.Size()*7),
292 c.Check(tt.numPieces(), qt.Equals, 7)
293 c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
294 // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
296 {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
300 func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
302 c.Check(interestedMsgLen, qt.Equals, 5)
303 c.Check(requestMsgLen, qt.Equals, 17)
304 c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
305 c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
308 func peerConnForPreferredNetworkDirection(
309 localPeerId, remotePeerId int,
310 outgoing, utp, ipv6 bool,
313 pc.outgoing = outgoing
318 pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP("::420")}
320 pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)}
322 binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId))
324 binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId))
325 pc.t = &Torrent{cl: &cl}
329 func TestPreferredNetworkDirection(t *testing.T) {
330 pc := peerConnForPreferredNetworkDirection
333 // Prefer outgoing to lower peer ID
336 pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
340 pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)),
344 pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)),
350 pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
355 pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)),
360 pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
365 func TestReceiveLargeRequest(t *testing.T) {
367 cl := newTestingClient(t)
368 pc := cl.newConnection(nil, newConnectionOpts{network: "test"})
369 tor := cl.newTorrentForTesting()
370 tor.info = &metainfo.Info{PieceLength: 3 << 20}
372 tor._completedPieces.Add(0)
373 pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true)
375 pc.initMessageWriter()
377 req.Length = defaultChunkSize
378 c.Assert(pc.fastEnabled(), qt.IsTrue)
379 c.Check(pc.onReadRequest(req, false), qt.IsNil)
380 c.Check(pc.peerRequests, qt.HasLen, 1)
382 c.Check(pc.onReadRequest(req, false), qt.IsNil)
383 c.Check(pc.peerRequests, qt.HasLen, 2)
384 pc.peerRequests = nil
385 pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize)
386 req.Length = defaultChunkSize
387 c.Check(pc.onReadRequest(req, false), qt.IsNil)
388 c.Check(pc.peerRequests, qt.HasLen, 1)
390 c.Check(pc.onReadRequest(req, false), qt.IsNil)
391 c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17)
394 func TestChunkOverflowsPiece(t *testing.T) {
396 check := func(begin, length, limit pp.Integer, expected bool) {
397 c.Check(chunkOverflowsPiece(ChunkSpec{begin, length}, limit), qt.Equals, expected)
400 check(2, pp.IntegerMax, 1, true)
401 check(2, pp.IntegerMax, 3, true)
402 check(2, pp.IntegerMax, pp.IntegerMax, true)
403 check(2, pp.IntegerMax-2, pp.IntegerMax, false)