7 "golang.org/x/time/rate"
13 "github.com/frankban/quicktest"
14 qt "github.com/frankban/quicktest"
15 "github.com/stretchr/testify/require"
17 "github.com/anacrolix/torrent/metainfo"
18 pp "github.com/anacrolix/torrent/peer_protocol"
19 "github.com/anacrolix/torrent/storage"
22 // Ensure that no race exists between sending a bitfield, and a subsequent
23 // Have that would potentially alter it.
24 func TestSendBitfieldThenHave(t *testing.T) {
26 cl.init(TestingConfig(t))
28 c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"})
29 c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
30 if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
36 c.startMessageWriter()
38 c.t._completedPieces.Add(1)
39 c.postBitfield( /*[]bool{false, true, false}*/ )
45 n, err := io.ReadFull(r, b)
47 // This will cause connection.writer to terminate.
50 require.NoError(t, err)
51 require.EqualValues(t, 15, n)
52 // Here we see that the bitfield doesn't have piece 2 set, as that should
53 // arrive in the following Have message.
54 require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
57 type torrentStorage struct {
61 func (me *torrentStorage) Close() error { return nil }
63 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
67 func (me *torrentStorage) Completion() storage.Completion {
68 return storage.Completion{}
71 func (me *torrentStorage) MarkComplete() error {
75 func (me *torrentStorage) MarkNotComplete() error {
79 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
80 panic("shouldn't be called")
83 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
84 if len(b) != defaultChunkSize {
91 func BenchmarkConnectionMainReadLoop(b *testing.B) {
94 cl.init(&ClientConfig{
95 DownloadRateLimiter: unlimited,
98 ts := &torrentStorage{}
99 t := cl.newTorrent(metainfo.Hash{}, nil)
100 t.initialPieceCheckDisabled = true
101 require.NoError(b, t.setInfo(&metainfo.Info{
102 Pieces: make([]byte, 20),
104 PieceLength: 1 << 20,
106 t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
108 t._pendingPieces.Add(0)
110 cn := cl.newConnection(r, newConnectionOpts{
112 remoteAddr: r.RemoteAddr(),
113 network: r.RemoteAddr().Network(),
114 connString: regularNetConnPeerConnConnString(r),
117 mrlErrChan := make(chan error)
120 Piece: make([]byte, defaultChunkSize),
124 err := cn.mainReadLoop()
130 wb := msg.MustMarshalBinary()
131 b.SetBytes(int64(len(msg.Piece)))
134 for i := 0; i < b.N; i += 1 {
136 // The chunk must be written to storage everytime, to ensure the
137 // writeSem is unlocked.
138 t.pendAllChunkSpecs(0)
139 cn.validReceiveChunks = map[RequestIndex]int{
140 t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
143 n, err := w.Write(wb)
144 require.NoError(b, err)
145 require.EqualValues(b, len(wb), n)
148 if err := w.Close(); err != nil {
152 mrlErr := <-mrlErrChan
153 if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
156 c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
159 func TestConnPexPeerFlags(t *testing.T) {
161 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
162 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
164 testcases := []struct {
168 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
169 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
170 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
171 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
172 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
173 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
174 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
175 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
177 for i, tc := range testcases {
178 f := tc.conn.pexPeerFlags()
179 require.EqualValues(t, tc.f, f, i)
183 func TestConnPexEvent(t *testing.T) {
185 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
186 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
187 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
188 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
190 testcases := []struct {
197 &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
198 pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil},
202 &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
203 pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
207 &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
208 pexEvent{pexAdd, dialTcpAddr, 0, nil},
212 &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
213 pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
216 for i, tc := range testcases {
217 e := tc.c.pexEvent(tc.t)
218 require.EqualValues(t, tc.e, e, i)
222 func TestHaveAllThenBitfield(t *testing.T) {
224 cl := newTestingClient(t)
225 tt := cl.newTorrentForTesting()
226 // cl.newConnection()
230 pc.initRequestState()
232 tt.conns[&pc] = struct{}{}
233 c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
234 c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
235 pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
236 c.Check(pc.peerMinPieces, qt.Equals, 6)
237 c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
238 c.Assert(pc.t.setInfo(&metainfo.Info{
240 Pieces: make([]byte, pieceHash.Size()*7),
243 c.Check(tt.numPieces(), qt.Equals, 7)
244 c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
245 // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
247 {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
251 func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
253 c.Check(interestedMsgLen, qt.Equals, 5)
254 c.Check(requestMsgLen, qt.Equals, 17)
255 c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
256 c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
259 func peerConnForPreferredNetworkDirection(localPeerId, remotePeerId int, outgoing, utp, ipv6 bool) *PeerConn {
261 pc.outgoing = outgoing
266 pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP(fmt.Sprintf("::420"))}
268 pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)}
270 binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId))
272 binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId))
273 pc.t = &Torrent{cl: &cl}
277 func TestPreferredNetworkDirection(t *testing.T) {
278 pc := peerConnForPreferredNetworkDirection
280 // Prefer outgoing to higher peer ID
281 c.Assert(pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsTrue)
282 c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)), qt.IsFalse)
283 c.Assert(pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)), qt.IsTrue)
285 c.Assert(pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
287 c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)), qt.IsFalse)
289 c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
292 func TestReceiveLargeRequest(t *testing.T) {
294 cl := newTestingClient(t)
295 pc := cl.newConnection(nil, newConnectionOpts{network: "test"})
296 tor := cl.newTorrentForTesting()
297 tor.info = &metainfo.Info{PieceLength: 3 << 20}
299 tor._completedPieces.Add(0)
300 pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true)
302 pc.initMessageWriter()
304 req.Length = defaultChunkSize
305 c.Assert(pc.fastEnabled(), qt.IsTrue)
306 c.Check(pc.onReadRequest(req, false), qt.IsNil)
307 c.Check(pc.peerRequests, qt.HasLen, 1)
309 c.Check(pc.onReadRequest(req, false), qt.IsNil)
310 c.Check(pc.peerRequests, qt.HasLen, 2)
311 pc.peerRequests = nil
312 pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize)
313 req.Length = defaultChunkSize
314 c.Check(pc.onReadRequest(req, false), qt.IsNil)
315 c.Check(pc.peerRequests, qt.HasLen, 1)
317 c.Check(pc.onReadRequest(req, false), qt.IsNil)
318 c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17)