]> Sergey Matveev's repositories - btrtrc.git/blob - peerconn_test.go
Drop support for go 1.20
[btrtrc.git] / peerconn_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "net"
9         "sync"
10         "testing"
11
12         "github.com/frankban/quicktest"
13         qt "github.com/frankban/quicktest"
14         "github.com/stretchr/testify/require"
15         "golang.org/x/time/rate"
16
17         "github.com/anacrolix/torrent/metainfo"
18         pp "github.com/anacrolix/torrent/peer_protocol"
19         "github.com/anacrolix/torrent/storage"
20 )
21
22 // Ensure that no race exists between sending a bitfield, and a subsequent
23 // Have that would potentially alter it.
24 func TestSendBitfieldThenHave(t *testing.T) {
25         var cl Client
26         cl.init(TestingConfig(t))
27         cl.initLogger()
28         c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"})
29         c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
30         if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
31                 t.Log(err)
32         }
33         r, w := io.Pipe()
34         // c.r = r
35         c.w = w
36         c.startMessageWriter()
37         c.locker().Lock()
38         c.t._completedPieces.Add(1)
39         c.postBitfield( /*[]bool{false, true, false}*/ )
40         c.locker().Unlock()
41         c.locker().Lock()
42         c.have(2)
43         c.locker().Unlock()
44         b := make([]byte, 15)
45         n, err := io.ReadFull(r, b)
46         c.locker().Lock()
47         // This will cause connection.writer to terminate.
48         c.closed.Set()
49         c.locker().Unlock()
50         require.NoError(t, err)
51         require.EqualValues(t, 15, n)
52         // Here we see that the bitfield doesn't have piece 2 set, as that should
53         // arrive in the following Have message.
54         require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
55 }
56
57 type torrentStorage struct {
58         writeSem sync.Mutex
59 }
60
61 func (me *torrentStorage) Close() error { return nil }
62
63 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
64         return me
65 }
66
67 func (me *torrentStorage) Completion() storage.Completion {
68         return storage.Completion{}
69 }
70
71 func (me *torrentStorage) MarkComplete() error {
72         return nil
73 }
74
75 func (me *torrentStorage) MarkNotComplete() error {
76         return nil
77 }
78
79 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
80         panic("shouldn't be called")
81 }
82
83 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
84         if len(b) != defaultChunkSize {
85                 panic(len(b))
86         }
87         me.writeSem.Unlock()
88         return len(b), nil
89 }
90
91 func BenchmarkConnectionMainReadLoop(b *testing.B) {
92         c := quicktest.New(b)
93         var cl Client
94         cl.init(&ClientConfig{
95                 DownloadRateLimiter: unlimited,
96         })
97         cl.initLogger()
98         ts := &torrentStorage{}
99         t := cl.newTorrent(metainfo.Hash{}, nil)
100         t.initialPieceCheckDisabled = true
101         require.NoError(b, t.setInfo(&metainfo.Info{
102                 Pieces:      make([]byte, 20),
103                 Length:      1 << 20,
104                 PieceLength: 1 << 20,
105         }))
106         t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
107         t.onSetInfo()
108         t._pendingPieces.Add(0)
109         r, w := net.Pipe()
110         cn := cl.newConnection(r, newConnectionOpts{
111                 outgoing:   true,
112                 remoteAddr: r.RemoteAddr(),
113                 network:    r.RemoteAddr().Network(),
114                 connString: regularNetConnPeerConnConnString(r),
115         })
116         cn.setTorrent(t)
117         mrlErrChan := make(chan error)
118         msg := pp.Message{
119                 Type:  pp.Piece,
120                 Piece: make([]byte, defaultChunkSize),
121         }
122         go func() {
123                 cl.lock()
124                 err := cn.mainReadLoop()
125                 if err != nil {
126                         mrlErrChan <- err
127                 }
128                 close(mrlErrChan)
129         }()
130         wb := msg.MustMarshalBinary()
131         b.SetBytes(int64(len(msg.Piece)))
132         go func() {
133                 ts.writeSem.Lock()
134                 for i := 0; i < b.N; i += 1 {
135                         cl.lock()
136                         // The chunk must be written to storage everytime, to ensure the
137                         // writeSem is unlocked.
138                         t.pendAllChunkSpecs(0)
139                         cn.validReceiveChunks = map[RequestIndex]int{
140                                 t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
141                         }
142                         cl.unlock()
143                         n, err := w.Write(wb)
144                         require.NoError(b, err)
145                         require.EqualValues(b, len(wb), n)
146                         ts.writeSem.Lock()
147                 }
148                 if err := w.Close(); err != nil {
149                         panic(err)
150                 }
151         }()
152         mrlErr := <-mrlErrChan
153         if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
154                 c.Fatal(mrlErr)
155         }
156         c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
157 }
158
159 func TestConnPexPeerFlags(t *testing.T) {
160         var (
161                 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
162                 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
163         )
164         testcases := []struct {
165                 conn *PeerConn
166                 f    pp.PexPeerFlags
167         }{
168                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
169                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
170                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
171                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
172                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
173                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
174                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
175                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
176         }
177         for i, tc := range testcases {
178                 f := tc.conn.pexPeerFlags()
179                 require.EqualValues(t, tc.f, f, i)
180         }
181 }
182
183 func TestConnPexEvent(t *testing.T) {
184         c := qt.New(t)
185         var (
186                 udpAddr     = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
187                 tcpAddr     = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
188                 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
189                 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
190         )
191         testcases := []struct {
192                 t pexEventType
193                 c *PeerConn
194                 e pexEvent
195         }{
196                 {
197                         pexAdd,
198                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
199                         pexEvent{pexAdd, udpAddr.AddrPort(), pp.PexSupportsUtp, nil},
200                 },
201                 {
202                         pexDrop,
203                         &PeerConn{
204                                 Peer:           Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true},
205                                 PeerListenPort: dialTcpAddr.Port,
206                         },
207                         pexEvent{pexDrop, tcpAddr.AddrPort(), pp.PexOutgoingConn, nil},
208                 },
209                 {
210                         pexAdd,
211                         &PeerConn{
212                                 Peer:           Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()},
213                                 PeerListenPort: dialTcpAddr.Port,
214                         },
215                         pexEvent{pexAdd, dialTcpAddr.AddrPort(), 0, nil},
216                 },
217                 {
218                         pexDrop,
219                         &PeerConn{
220                                 Peer:           Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()},
221                                 PeerListenPort: dialUdpAddr.Port,
222                         },
223                         pexEvent{pexDrop, dialUdpAddr.AddrPort(), pp.PexSupportsUtp, nil},
224                 },
225         }
226         for i, tc := range testcases {
227                 c.Run(fmt.Sprintf("%v", i), func(c *qt.C) {
228                         e, err := tc.c.pexEvent(tc.t)
229                         c.Assert(err, qt.IsNil)
230                         c.Check(e, qt.Equals, tc.e)
231                 })
232         }
233 }
234
235 func TestHaveAllThenBitfield(t *testing.T) {
236         c := qt.New(t)
237         cl := newTestingClient(t)
238         tt := cl.newTorrentForTesting()
239         // cl.newConnection()
240         pc := PeerConn{
241                 Peer: Peer{t: tt},
242         }
243         pc.initRequestState()
244         pc.peerImpl = &pc
245         tt.conns[&pc] = struct{}{}
246         c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
247         c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
248         pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
249         c.Check(pc.peerMinPieces, qt.Equals, 6)
250         c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
251         c.Assert(pc.t.setInfo(&metainfo.Info{
252                 PieceLength: 0,
253                 Pieces:      make([]byte, pieceHash.Size()*7),
254         }), qt.IsNil)
255         pc.t.onSetInfo()
256         c.Check(tt.numPieces(), qt.Equals, 7)
257         c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
258                 // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
259                 // pieces.
260                 {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
261         })
262 }
263
264 func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
265         c := qt.New(t)
266         c.Check(interestedMsgLen, qt.Equals, 5)
267         c.Check(requestMsgLen, qt.Equals, 17)
268         c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
269         c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
270 }
271
272 func peerConnForPreferredNetworkDirection(
273         localPeerId, remotePeerId int,
274         outgoing, utp, ipv6 bool,
275 ) *PeerConn {
276         pc := PeerConn{}
277         pc.outgoing = outgoing
278         if utp {
279                 pc.Network = "udp"
280         }
281         if ipv6 {
282                 pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP("::420")}
283         } else {
284                 pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)}
285         }
286         binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId))
287         cl := Client{}
288         binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId))
289         pc.t = &Torrent{cl: &cl}
290         return &pc
291 }
292
293 func TestPreferredNetworkDirection(t *testing.T) {
294         pc := peerConnForPreferredNetworkDirection
295         c := qt.New(t)
296
297         // Prefer outgoing to lower peer ID
298
299         c.Check(
300                 pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
301                 qt.IsFalse,
302         )
303         c.Check(
304                 pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)),
305                 qt.IsTrue,
306         )
307         c.Check(
308                 pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)),
309                 qt.IsFalse,
310         )
311
312         // Don't prefer uTP
313         c.Check(
314                 pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
315                 qt.IsFalse,
316         )
317         // Prefer IPv6
318         c.Check(
319                 pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)),
320                 qt.IsFalse,
321         )
322         // No difference
323         c.Check(
324                 pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
325                 qt.IsFalse,
326         )
327 }
328
329 func TestReceiveLargeRequest(t *testing.T) {
330         c := qt.New(t)
331         cl := newTestingClient(t)
332         pc := cl.newConnection(nil, newConnectionOpts{network: "test"})
333         tor := cl.newTorrentForTesting()
334         tor.info = &metainfo.Info{PieceLength: 3 << 20}
335         pc.setTorrent(tor)
336         tor._completedPieces.Add(0)
337         pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true)
338         pc.choking = false
339         pc.initMessageWriter()
340         req := Request{}
341         req.Length = defaultChunkSize
342         c.Assert(pc.fastEnabled(), qt.IsTrue)
343         c.Check(pc.onReadRequest(req, false), qt.IsNil)
344         c.Check(pc.peerRequests, qt.HasLen, 1)
345         req.Length = 2 << 20
346         c.Check(pc.onReadRequest(req, false), qt.IsNil)
347         c.Check(pc.peerRequests, qt.HasLen, 2)
348         pc.peerRequests = nil
349         pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize)
350         req.Length = defaultChunkSize
351         c.Check(pc.onReadRequest(req, false), qt.IsNil)
352         c.Check(pc.peerRequests, qt.HasLen, 1)
353         req.Length = 2 << 20
354         c.Check(pc.onReadRequest(req, false), qt.IsNil)
355         c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17)
356 }
357
358 func TestChunkOverflowsPiece(t *testing.T) {
359         c := qt.New(t)
360         check := func(begin, length, limit pp.Integer, expected bool) {
361                 c.Check(chunkOverflowsPiece(ChunkSpec{begin, length}, limit), qt.Equals, expected)
362         }
363         check(2, 3, 1, true)
364         check(2, pp.IntegerMax, 1, true)
365         check(2, pp.IntegerMax, 3, true)
366         check(2, pp.IntegerMax, pp.IntegerMax, true)
367         check(2, pp.IntegerMax-2, pp.IntegerMax, false)
368 }