]> Sergey Matveev's repositories - btrtrc.git/blob - peerconn_test.go
Check that incoming peer request chunk lengths don't exceed the upload rate limiter...
[btrtrc.git] / peerconn_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "golang.org/x/time/rate"
8         "io"
9         "net"
10         "sync"
11         "testing"
12
13         "github.com/frankban/quicktest"
14         qt "github.com/frankban/quicktest"
15         "github.com/stretchr/testify/require"
16
17         "github.com/anacrolix/torrent/metainfo"
18         pp "github.com/anacrolix/torrent/peer_protocol"
19         "github.com/anacrolix/torrent/storage"
20 )
21
22 // Ensure that no race exists between sending a bitfield, and a subsequent
23 // Have that would potentially alter it.
24 func TestSendBitfieldThenHave(t *testing.T) {
25         var cl Client
26         cl.init(TestingConfig(t))
27         cl.initLogger()
28         c := cl.newConnection(nil, false, nil, "io.Pipe", "")
29         c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
30         if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
31                 t.Log(err)
32         }
33         r, w := io.Pipe()
34         // c.r = r
35         c.w = w
36         c.startMessageWriter()
37         c.locker().Lock()
38         c.t._completedPieces.Add(1)
39         c.postBitfield( /*[]bool{false, true, false}*/ )
40         c.locker().Unlock()
41         c.locker().Lock()
42         c.have(2)
43         c.locker().Unlock()
44         b := make([]byte, 15)
45         n, err := io.ReadFull(r, b)
46         c.locker().Lock()
47         // This will cause connection.writer to terminate.
48         c.closed.Set()
49         c.locker().Unlock()
50         require.NoError(t, err)
51         require.EqualValues(t, 15, n)
52         // Here we see that the bitfield doesn't have piece 2 set, as that should
53         // arrive in the following Have message.
54         require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
55 }
56
57 type torrentStorage struct {
58         writeSem sync.Mutex
59 }
60
61 func (me *torrentStorage) Close() error { return nil }
62
63 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
64         return me
65 }
66
67 func (me *torrentStorage) Completion() storage.Completion {
68         return storage.Completion{}
69 }
70
71 func (me *torrentStorage) MarkComplete() error {
72         return nil
73 }
74
75 func (me *torrentStorage) MarkNotComplete() error {
76         return nil
77 }
78
79 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
80         panic("shouldn't be called")
81 }
82
83 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
84         if len(b) != defaultChunkSize {
85                 panic(len(b))
86         }
87         me.writeSem.Unlock()
88         return len(b), nil
89 }
90
91 func BenchmarkConnectionMainReadLoop(b *testing.B) {
92         c := quicktest.New(b)
93         var cl Client
94         cl.init(&ClientConfig{
95                 DownloadRateLimiter: unlimited,
96         })
97         cl.initLogger()
98         ts := &torrentStorage{}
99         t := cl.newTorrent(metainfo.Hash{}, nil)
100         t.initialPieceCheckDisabled = true
101         require.NoError(b, t.setInfo(&metainfo.Info{
102                 Pieces:      make([]byte, 20),
103                 Length:      1 << 20,
104                 PieceLength: 1 << 20,
105         }))
106         t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
107         t.onSetInfo()
108         t._pendingPieces.Add(0)
109         r, w := net.Pipe()
110         cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r))
111         cn.setTorrent(t)
112         mrlErrChan := make(chan error)
113         msg := pp.Message{
114                 Type:  pp.Piece,
115                 Piece: make([]byte, defaultChunkSize),
116         }
117         go func() {
118                 cl.lock()
119                 err := cn.mainReadLoop()
120                 if err != nil {
121                         mrlErrChan <- err
122                 }
123                 close(mrlErrChan)
124         }()
125         wb := msg.MustMarshalBinary()
126         b.SetBytes(int64(len(msg.Piece)))
127         go func() {
128                 ts.writeSem.Lock()
129                 for i := 0; i < b.N; i += 1 {
130                         cl.lock()
131                         // The chunk must be written to storage everytime, to ensure the
132                         // writeSem is unlocked.
133                         t.pendAllChunkSpecs(0)
134                         cn.validReceiveChunks = map[RequestIndex]int{
135                                 t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
136                         }
137                         cl.unlock()
138                         n, err := w.Write(wb)
139                         require.NoError(b, err)
140                         require.EqualValues(b, len(wb), n)
141                         ts.writeSem.Lock()
142                 }
143                 if err := w.Close(); err != nil {
144                         panic(err)
145                 }
146         }()
147         mrlErr := <-mrlErrChan
148         if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
149                 c.Fatal(mrlErr)
150         }
151         c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
152 }
153
154 func TestConnPexPeerFlags(t *testing.T) {
155         var (
156                 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
157                 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
158         )
159         testcases := []struct {
160                 conn *PeerConn
161                 f    pp.PexPeerFlags
162         }{
163                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
164                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
165                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
166                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
167                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
168                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
169                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
170                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
171         }
172         for i, tc := range testcases {
173                 f := tc.conn.pexPeerFlags()
174                 require.EqualValues(t, tc.f, f, i)
175         }
176 }
177
178 func TestConnPexEvent(t *testing.T) {
179         var (
180                 udpAddr     = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
181                 tcpAddr     = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
182                 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
183                 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
184         )
185         testcases := []struct {
186                 t pexEventType
187                 c *PeerConn
188                 e pexEvent
189         }{
190                 {
191                         pexAdd,
192                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
193                         pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil},
194                 },
195                 {
196                         pexDrop,
197                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
198                         pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
199                 },
200                 {
201                         pexAdd,
202                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
203                         pexEvent{pexAdd, dialTcpAddr, 0, nil},
204                 },
205                 {
206                         pexDrop,
207                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
208                         pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
209                 },
210         }
211         for i, tc := range testcases {
212                 e := tc.c.pexEvent(tc.t)
213                 require.EqualValues(t, tc.e, e, i)
214         }
215 }
216
217 func TestHaveAllThenBitfield(t *testing.T) {
218         c := qt.New(t)
219         cl := newTestingClient(t)
220         tt := cl.newTorrentForTesting()
221         // cl.newConnection()
222         pc := PeerConn{
223                 Peer: Peer{t: tt},
224         }
225         pc.initRequestState()
226         pc.peerImpl = &pc
227         tt.conns[&pc] = struct{}{}
228         c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
229         c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
230         pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
231         c.Check(pc.peerMinPieces, qt.Equals, 6)
232         c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
233         c.Assert(pc.t.setInfo(&metainfo.Info{
234                 PieceLength: 0,
235                 Pieces:      make([]byte, pieceHash.Size()*7),
236         }), qt.IsNil)
237         pc.t.onSetInfo()
238         c.Check(tt.numPieces(), qt.Equals, 7)
239         c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
240                 // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
241                 // pieces.
242                 {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
243         })
244 }
245
246 func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
247         c := qt.New(t)
248         c.Check(interestedMsgLen, qt.Equals, 5)
249         c.Check(requestMsgLen, qt.Equals, 17)
250         c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
251         c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
252 }
253
254 func peerConnForPreferredNetworkDirection(localPeerId, remotePeerId int, outgoing, utp, ipv6 bool) *PeerConn {
255         pc := PeerConn{}
256         pc.outgoing = outgoing
257         if utp {
258                 pc.Network = "udp"
259         }
260         if ipv6 {
261                 pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP(fmt.Sprintf("::420"))}
262         } else {
263                 pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)}
264         }
265         binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId))
266         cl := Client{}
267         binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId))
268         pc.t = &Torrent{cl: &cl}
269         return &pc
270 }
271
272 func TestPreferredNetworkDirection(t *testing.T) {
273         pc := peerConnForPreferredNetworkDirection
274         c := qt.New(t)
275         // Prefer outgoing to higher peer ID
276         c.Assert(pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsTrue)
277         c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)), qt.IsFalse)
278         c.Assert(pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)), qt.IsTrue)
279         // Don't prefer uTP
280         c.Assert(pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
281         // Prefer IPv6
282         c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)), qt.IsFalse)
283         // No difference
284         c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
285 }
286
287 func TestReceiveLargeRequest(t *testing.T) {
288         c := qt.New(t)
289         cl := newTestingClient(t)
290         pc := cl.newConnection(nil, false, nil, "test", "")
291         tor := cl.newTorrentForTesting()
292         tor.info = &metainfo.Info{PieceLength: 3 << 20}
293         pc.setTorrent(tor)
294         tor._completedPieces.Add(0)
295         pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true)
296         pc.choking = false
297         pc.initMessageWriter()
298         req := Request{}
299         req.Length = defaultChunkSize
300         c.Assert(pc.fastEnabled(), qt.IsTrue)
301         c.Check(pc.onReadRequest(req, false), qt.IsNil)
302         c.Check(pc.peerRequests, qt.HasLen, 1)
303         req.Length = 2 << 20
304         c.Check(pc.onReadRequest(req, false), qt.IsNil)
305         c.Check(pc.peerRequests, qt.HasLen, 2)
306         pc.peerRequests = nil
307         pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize)
308         req.Length = defaultChunkSize
309         c.Check(pc.onReadRequest(req, false), qt.IsNil)
310         c.Check(pc.peerRequests, qt.HasLen, 1)
311         req.Length = 2 << 20
312         c.Check(pc.onReadRequest(req, false), qt.IsNil)
313         c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17)
314 }