]> Sergey Matveev's repositories - btrtrc.git/blob - peerconn_test.go
Drop peer request alloc reservations when peer is closed
[btrtrc.git] / peerconn_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "net"
9         "sync"
10         "testing"
11
12         "golang.org/x/time/rate"
13
14         "github.com/frankban/quicktest"
15         qt "github.com/frankban/quicktest"
16         "github.com/stretchr/testify/require"
17
18         "github.com/anacrolix/torrent/metainfo"
19         pp "github.com/anacrolix/torrent/peer_protocol"
20         "github.com/anacrolix/torrent/storage"
21 )
22
23 // Ensure that no race exists between sending a bitfield, and a subsequent
24 // Have that would potentially alter it.
25 func TestSendBitfieldThenHave(t *testing.T) {
26         var cl Client
27         cl.init(TestingConfig(t))
28         cl.initLogger()
29         c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"})
30         c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
31         if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
32                 t.Log(err)
33         }
34         r, w := io.Pipe()
35         // c.r = r
36         c.w = w
37         c.startMessageWriter()
38         c.locker().Lock()
39         c.t._completedPieces.Add(1)
40         c.postBitfield( /*[]bool{false, true, false}*/ )
41         c.locker().Unlock()
42         c.locker().Lock()
43         c.have(2)
44         c.locker().Unlock()
45         b := make([]byte, 15)
46         n, err := io.ReadFull(r, b)
47         c.locker().Lock()
48         // This will cause connection.writer to terminate.
49         c.closed.Set()
50         c.locker().Unlock()
51         require.NoError(t, err)
52         require.EqualValues(t, 15, n)
53         // Here we see that the bitfield doesn't have piece 2 set, as that should
54         // arrive in the following Have message.
55         require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
56 }
57
58 type torrentStorage struct {
59         writeSem sync.Mutex
60 }
61
62 func (me *torrentStorage) Close() error { return nil }
63
64 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
65         return me
66 }
67
68 func (me *torrentStorage) Completion() storage.Completion {
69         return storage.Completion{}
70 }
71
72 func (me *torrentStorage) MarkComplete() error {
73         return nil
74 }
75
76 func (me *torrentStorage) MarkNotComplete() error {
77         return nil
78 }
79
80 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
81         panic("shouldn't be called")
82 }
83
84 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
85         if len(b) != defaultChunkSize {
86                 panic(len(b))
87         }
88         me.writeSem.Unlock()
89         return len(b), nil
90 }
91
92 func BenchmarkConnectionMainReadLoop(b *testing.B) {
93         c := quicktest.New(b)
94         var cl Client
95         cl.init(&ClientConfig{
96                 DownloadRateLimiter: unlimited,
97         })
98         cl.initLogger()
99         ts := &torrentStorage{}
100         t := cl.newTorrent(metainfo.Hash{}, nil)
101         t.initialPieceCheckDisabled = true
102         require.NoError(b, t.setInfo(&metainfo.Info{
103                 Pieces:      make([]byte, 20),
104                 Length:      1 << 20,
105                 PieceLength: 1 << 20,
106         }))
107         t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
108         t.onSetInfo()
109         t._pendingPieces.Add(0)
110         r, w := net.Pipe()
111         cn := cl.newConnection(r, newConnectionOpts{
112                 outgoing:   true,
113                 remoteAddr: r.RemoteAddr(),
114                 network:    r.RemoteAddr().Network(),
115                 connString: regularNetConnPeerConnConnString(r),
116         })
117         cn.setTorrent(t)
118         mrlErrChan := make(chan error)
119         msg := pp.Message{
120                 Type:  pp.Piece,
121                 Piece: make([]byte, defaultChunkSize),
122         }
123         go func() {
124                 cl.lock()
125                 err := cn.mainReadLoop()
126                 if err != nil {
127                         mrlErrChan <- err
128                 }
129                 close(mrlErrChan)
130         }()
131         wb := msg.MustMarshalBinary()
132         b.SetBytes(int64(len(msg.Piece)))
133         go func() {
134                 ts.writeSem.Lock()
135                 for i := 0; i < b.N; i += 1 {
136                         cl.lock()
137                         // The chunk must be written to storage everytime, to ensure the
138                         // writeSem is unlocked.
139                         t.pendAllChunkSpecs(0)
140                         cn.validReceiveChunks = map[RequestIndex]int{
141                                 t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
142                         }
143                         cl.unlock()
144                         n, err := w.Write(wb)
145                         require.NoError(b, err)
146                         require.EqualValues(b, len(wb), n)
147                         ts.writeSem.Lock()
148                 }
149                 if err := w.Close(); err != nil {
150                         panic(err)
151                 }
152         }()
153         mrlErr := <-mrlErrChan
154         if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
155                 c.Fatal(mrlErr)
156         }
157         c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
158 }
159
160 func TestConnPexPeerFlags(t *testing.T) {
161         var (
162                 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
163                 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
164         )
165         testcases := []struct {
166                 conn *PeerConn
167                 f    pp.PexPeerFlags
168         }{
169                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
170                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
171                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
172                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
173                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
174                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
175                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
176                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
177         }
178         for i, tc := range testcases {
179                 f := tc.conn.pexPeerFlags()
180                 require.EqualValues(t, tc.f, f, i)
181         }
182 }
183
184 func TestConnPexEvent(t *testing.T) {
185         var (
186                 udpAddr     = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
187                 tcpAddr     = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
188                 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
189                 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
190         )
191         testcases := []struct {
192                 t pexEventType
193                 c *PeerConn
194                 e pexEvent
195         }{
196                 {
197                         pexAdd,
198                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
199                         pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil},
200                 },
201                 {
202                         pexDrop,
203                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, PeerListenPort: dialTcpAddr.Port},
204                         pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
205                 },
206                 {
207                         pexAdd,
208                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, PeerListenPort: dialTcpAddr.Port},
209                         pexEvent{pexAdd, dialTcpAddr, 0, nil},
210                 },
211                 {
212                         pexDrop,
213                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, PeerListenPort: dialUdpAddr.Port},
214                         pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
215                 },
216         }
217         for i, tc := range testcases {
218                 e := tc.c.pexEvent(tc.t)
219                 require.EqualValues(t, tc.e, e, i)
220         }
221 }
222
223 func TestHaveAllThenBitfield(t *testing.T) {
224         c := qt.New(t)
225         cl := newTestingClient(t)
226         tt := cl.newTorrentForTesting()
227         // cl.newConnection()
228         pc := PeerConn{
229                 Peer: Peer{t: tt},
230         }
231         pc.initRequestState()
232         pc.peerImpl = &pc
233         tt.conns[&pc] = struct{}{}
234         c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
235         c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
236         pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
237         c.Check(pc.peerMinPieces, qt.Equals, 6)
238         c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
239         c.Assert(pc.t.setInfo(&metainfo.Info{
240                 PieceLength: 0,
241                 Pieces:      make([]byte, pieceHash.Size()*7),
242         }), qt.IsNil)
243         pc.t.onSetInfo()
244         c.Check(tt.numPieces(), qt.Equals, 7)
245         c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
246                 // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
247                 // pieces.
248                 {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
249         })
250 }
251
252 func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
253         c := qt.New(t)
254         c.Check(interestedMsgLen, qt.Equals, 5)
255         c.Check(requestMsgLen, qt.Equals, 17)
256         c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
257         c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
258 }
259
260 func peerConnForPreferredNetworkDirection(
261         localPeerId, remotePeerId int,
262         outgoing, utp, ipv6 bool,
263 ) *PeerConn {
264         pc := PeerConn{}
265         pc.outgoing = outgoing
266         if utp {
267                 pc.Network = "udp"
268         }
269         if ipv6 {
270                 pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP(fmt.Sprintf("::420"))}
271         } else {
272                 pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)}
273         }
274         binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId))
275         cl := Client{}
276         binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId))
277         pc.t = &Torrent{cl: &cl}
278         return &pc
279 }
280
281 func TestPreferredNetworkDirection(t *testing.T) {
282         pc := peerConnForPreferredNetworkDirection
283         c := qt.New(t)
284
285         // Prefer outgoing to lower peer ID
286
287         c.Check(
288                 pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
289                 qt.IsFalse,
290         )
291         c.Check(
292                 pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)),
293                 qt.IsTrue,
294         )
295         c.Check(
296                 pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)),
297                 qt.IsFalse,
298         )
299
300         // Don't prefer uTP
301         c.Check(
302                 pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
303                 qt.IsFalse,
304         )
305         // Prefer IPv6
306         c.Check(
307                 pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)),
308                 qt.IsFalse,
309         )
310         // No difference
311         c.Check(
312                 pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
313                 qt.IsFalse,
314         )
315 }
316
317 func TestReceiveLargeRequest(t *testing.T) {
318         c := qt.New(t)
319         cl := newTestingClient(t)
320         pc := cl.newConnection(nil, newConnectionOpts{network: "test"})
321         tor := cl.newTorrentForTesting()
322         tor.info = &metainfo.Info{PieceLength: 3 << 20}
323         pc.setTorrent(tor)
324         tor._completedPieces.Add(0)
325         pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true)
326         pc.choking = false
327         pc.initMessageWriter()
328         req := Request{}
329         req.Length = defaultChunkSize
330         c.Assert(pc.fastEnabled(), qt.IsTrue)
331         c.Check(pc.onReadRequest(req, false), qt.IsNil)
332         c.Check(pc.peerRequests, qt.HasLen, 1)
333         req.Length = 2 << 20
334         c.Check(pc.onReadRequest(req, false), qt.IsNil)
335         c.Check(pc.peerRequests, qt.HasLen, 2)
336         pc.peerRequests = nil
337         pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize)
338         req.Length = defaultChunkSize
339         c.Check(pc.onReadRequest(req, false), qt.IsNil)
340         c.Check(pc.peerRequests, qt.HasLen, 1)
341         req.Length = 2 << 20
342         c.Check(pc.onReadRequest(req, false), qt.IsNil)
343         c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17)
344 }
345
346 func TestChunkOverflowsPiece(t *testing.T) {
347         c := qt.New(t)
348         check := func(begin, length, limit pp.Integer, expected bool) {
349                 c.Check(chunkOverflowsPiece(ChunkSpec{begin, length}, limit), qt.Equals, expected)
350         }
351         check(2, 3, 1, true)
352         check(2, pp.IntegerMax, 1, true)
353         check(2, pp.IntegerMax, 3, true)
354         check(2, pp.IntegerMax, pp.IntegerMax, true)
355         check(2, pp.IntegerMax-2, pp.IntegerMax, false)
356 }