]> Sergey Matveev's repositories - btrtrc.git/blob - peerconn_test.go
No Web*
[btrtrc.git] / peerconn_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "net"
9         "sync"
10         "testing"
11
12         "golang.org/x/time/rate"
13
14         "github.com/frankban/quicktest"
15         qt "github.com/frankban/quicktest"
16         "github.com/stretchr/testify/require"
17
18         "github.com/anacrolix/torrent/metainfo"
19         pp "github.com/anacrolix/torrent/peer_protocol"
20         "github.com/anacrolix/torrent/storage"
21 )
22
23 // Ensure that no race exists between sending a bitfield, and a subsequent
24 // Have that would potentially alter it.
25 func TestSendBitfieldThenHave(t *testing.T) {
26         var cl Client
27         cl.init(TestingConfig(t))
28         cl.initLogger()
29         c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"})
30         c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
31         if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
32                 t.Log(err)
33         }
34         r, w := io.Pipe()
35         // c.r = r
36         c.w = w
37         c.startMessageWriter()
38         c.locker().Lock()
39         c.t._completedPieces.Add(1)
40         c.postBitfield( /*[]bool{false, true, false}*/ )
41         c.locker().Unlock()
42         c.locker().Lock()
43         c.have(2)
44         c.locker().Unlock()
45         b := make([]byte, 15)
46         n, err := io.ReadFull(r, b)
47         c.locker().Lock()
48         // This will cause connection.writer to terminate.
49         c.closed.Set()
50         c.locker().Unlock()
51         require.NoError(t, err)
52         require.EqualValues(t, 15, n)
53         // Here we see that the bitfield doesn't have piece 2 set, as that should
54         // arrive in the following Have message.
55         require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
56 }
57
58 type torrentStorage struct {
59         writeSem sync.Mutex
60 }
61
62 func (me *torrentStorage) Close() error { return nil }
63
64 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
65         return me
66 }
67
68 func (me *torrentStorage) Completion() storage.Completion {
69         return storage.Completion{}
70 }
71
72 func (me *torrentStorage) MarkComplete() error {
73         return nil
74 }
75
76 func (me *torrentStorage) MarkNotComplete() error {
77         return nil
78 }
79
80 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
81         panic("shouldn't be called")
82 }
83
84 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
85         if len(b) != defaultChunkSize {
86                 panic(len(b))
87         }
88         me.writeSem.Unlock()
89         return len(b), nil
90 }
91
92 func BenchmarkConnectionMainReadLoop(b *testing.B) {
93         c := quicktest.New(b)
94         var cl Client
95         cl.init(&ClientConfig{
96                 DownloadRateLimiter: unlimited,
97         })
98         cl.initLogger()
99         ts := &torrentStorage{}
100         t := cl.newTorrent(metainfo.Hash{}, nil)
101         t.initialPieceCheckDisabled = true
102         require.NoError(b, t.setInfo(&metainfo.Info{
103                 Pieces:      make([]byte, 20),
104                 Length:      1 << 20,
105                 PieceLength: 1 << 20,
106         }))
107         t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
108         t.onSetInfo()
109         t._pendingPieces.Add(0)
110         r, w := net.Pipe()
111         cn := cl.newConnection(r, newConnectionOpts{
112                 outgoing:   true,
113                 remoteAddr: r.RemoteAddr(),
114                 network:    r.RemoteAddr().Network(),
115                 connString: regularNetConnPeerConnConnString(r),
116         })
117         cn.setTorrent(t)
118         mrlErrChan := make(chan error)
119         msg := pp.Message{
120                 Type:  pp.Piece,
121                 Piece: make([]byte, defaultChunkSize),
122         }
123         go func() {
124                 cl.lock()
125                 err := cn.mainReadLoop()
126                 if err != nil {
127                         mrlErrChan <- err
128                 }
129                 close(mrlErrChan)
130         }()
131         wb := msg.MustMarshalBinary()
132         b.SetBytes(int64(len(msg.Piece)))
133         go func() {
134                 ts.writeSem.Lock()
135                 for i := 0; i < b.N; i += 1 {
136                         cl.lock()
137                         // The chunk must be written to storage everytime, to ensure the
138                         // writeSem is unlocked.
139                         t.pendAllChunkSpecs(0)
140                         cn.validReceiveChunks = map[RequestIndex]int{
141                                 t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
142                         }
143                         cl.unlock()
144                         n, err := w.Write(wb)
145                         require.NoError(b, err)
146                         require.EqualValues(b, len(wb), n)
147                         ts.writeSem.Lock()
148                 }
149                 if err := w.Close(); err != nil {
150                         panic(err)
151                 }
152         }()
153         mrlErr := <-mrlErrChan
154         if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
155                 c.Fatal(mrlErr)
156         }
157         c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
158 }
159
160 func TestConnPexPeerFlags(t *testing.T) {
161         var (
162                 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
163                 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
164         )
165         testcases := []struct {
166                 conn *PeerConn
167                 f    pp.PexPeerFlags
168         }{
169                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
170                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
171                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
172                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
173                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
174                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
175                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
176                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
177         }
178         for i, tc := range testcases {
179                 f := tc.conn.pexPeerFlags()
180                 require.EqualValues(t, tc.f, f, i)
181         }
182 }
183
184 func TestConnPexEvent(t *testing.T) {
185         var (
186                 udpAddr     = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
187                 tcpAddr     = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
188                 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
189                 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
190         )
191         testcases := []struct {
192                 t pexEventType
193                 c *PeerConn
194                 e pexEvent
195         }{
196                 {
197                         pexAdd,
198                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
199                         pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil},
200                 },
201                 {
202                         pexDrop,
203                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
204                         pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
205                 },
206                 {
207                         pexAdd,
208                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
209                         pexEvent{pexAdd, dialTcpAddr, 0, nil},
210                 },
211                 {
212                         pexDrop,
213                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
214                         pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
215                 },
216         }
217         for i, tc := range testcases {
218                 e := tc.c.pexEvent(tc.t)
219                 require.EqualValues(t, tc.e, e, i)
220         }
221 }
222
223 func TestHaveAllThenBitfield(t *testing.T) {
224         c := qt.New(t)
225         cl := newTestingClient(t)
226         tt := cl.newTorrentForTesting()
227         // cl.newConnection()
228         pc := PeerConn{
229                 Peer: Peer{t: tt},
230         }
231         pc.initRequestState()
232         pc.peerImpl = &pc
233         tt.conns[&pc] = struct{}{}
234         c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
235         c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
236         pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
237         c.Check(pc.peerMinPieces, qt.Equals, 6)
238         c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
239         c.Assert(pc.t.setInfo(&metainfo.Info{
240                 PieceLength: 0,
241                 Pieces:      make([]byte, pieceHash.Size()*7),
242         }), qt.IsNil)
243         pc.t.onSetInfo()
244         c.Check(tt.numPieces(), qt.Equals, 7)
245         c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
246                 // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
247                 // pieces.
248                 {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
249         })
250 }
251
252 func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
253         c := qt.New(t)
254         c.Check(interestedMsgLen, qt.Equals, 5)
255         c.Check(requestMsgLen, qt.Equals, 17)
256         c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
257         c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
258 }
259
260 func peerConnForPreferredNetworkDirection(localPeerId, remotePeerId int, outgoing, utp, ipv6 bool) *PeerConn {
261         pc := PeerConn{}
262         pc.outgoing = outgoing
263         if utp {
264                 pc.Network = "udp"
265         }
266         if ipv6 {
267                 pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP(fmt.Sprintf("::420"))}
268         } else {
269                 pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)}
270         }
271         binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId))
272         cl := Client{}
273         binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId))
274         pc.t = &Torrent{cl: &cl}
275         return &pc
276 }
277
278 func TestPreferredNetworkDirection(t *testing.T) {
279         pc := peerConnForPreferredNetworkDirection
280         c := qt.New(t)
281         // Prefer outgoing to higher peer ID
282         c.Assert(pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsTrue)
283         c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)), qt.IsFalse)
284         c.Assert(pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)), qt.IsTrue)
285         // Don't prefer uTP
286         c.Assert(pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
287         // Prefer IPv6
288         c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)), qt.IsFalse)
289         // No difference
290         c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
291 }
292
293 func TestReceiveLargeRequest(t *testing.T) {
294         c := qt.New(t)
295         cl := newTestingClient(t)
296         pc := cl.newConnection(nil, newConnectionOpts{network: "test"})
297         tor := cl.newTorrentForTesting()
298         tor.info = &metainfo.Info{PieceLength: 3 << 20}
299         pc.setTorrent(tor)
300         tor._completedPieces.Add(0)
301         pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true)
302         pc.choking = false
303         pc.initMessageWriter()
304         req := Request{}
305         req.Length = defaultChunkSize
306         c.Assert(pc.fastEnabled(), qt.IsTrue)
307         c.Check(pc.onReadRequest(req, false), qt.IsNil)
308         c.Check(pc.peerRequests, qt.HasLen, 1)
309         req.Length = 2 << 20
310         c.Check(pc.onReadRequest(req, false), qt.IsNil)
311         c.Check(pc.peerRequests, qt.HasLen, 2)
312         pc.peerRequests = nil
313         pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize)
314         req.Length = defaultChunkSize
315         c.Check(pc.onReadRequest(req, false), qt.IsNil)
316         c.Check(pc.peerRequests, qt.HasLen, 1)
317         req.Length = 2 << 20
318         c.Check(pc.onReadRequest(req, false), qt.IsNil)
319         c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17)
320 }
321
322 func TestChunkOverflowsPiece(t *testing.T) {
323         c := qt.New(t)
324         check := func(begin, length, limit pp.Integer, expected bool) {
325                 c.Check(chunkOverflowsPiece(ChunkSpec{begin, length}, limit), qt.Equals, expected)
326         }
327         check(2, 3, 1, true)
328         check(2, pp.IntegerMax, 1, true)
329         check(2, pp.IntegerMax, 3, true)
330         check(2, pp.IntegerMax, pp.IntegerMax, true)
331         check(2, pp.IntegerMax-2, pp.IntegerMax, false)
332 }