7 "golang.org/x/time/rate"
13 "github.com/frankban/quicktest"
14 qt "github.com/frankban/quicktest"
15 "github.com/stretchr/testify/require"
17 "github.com/anacrolix/torrent/metainfo"
18 pp "github.com/anacrolix/torrent/peer_protocol"
19 "github.com/anacrolix/torrent/storage"
22 // Ensure that no race exists between sending a bitfield, and a subsequent
23 // Have that would potentially alter it.
24 func TestSendBitfieldThenHave(t *testing.T) {
26 cl.init(TestingConfig(t))
28 c := cl.newConnection(nil, false, nil, "io.Pipe", "")
29 c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
30 if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
36 c.startMessageWriter()
38 c.t._completedPieces.Add(1)
39 c.postBitfield( /*[]bool{false, true, false}*/ )
45 n, err := io.ReadFull(r, b)
47 // This will cause connection.writer to terminate.
50 require.NoError(t, err)
51 require.EqualValues(t, 15, n)
52 // Here we see that the bitfield doesn't have piece 2 set, as that should
53 // arrive in the following Have message.
54 require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
57 type torrentStorage struct {
61 func (me *torrentStorage) Close() error { return nil }
63 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
67 func (me *torrentStorage) Completion() storage.Completion {
68 return storage.Completion{}
71 func (me *torrentStorage) MarkComplete() error {
75 func (me *torrentStorage) MarkNotComplete() error {
79 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
80 panic("shouldn't be called")
83 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
84 if len(b) != defaultChunkSize {
91 func BenchmarkConnectionMainReadLoop(b *testing.B) {
94 cl.init(&ClientConfig{
95 DownloadRateLimiter: unlimited,
98 ts := &torrentStorage{}
99 t := cl.newTorrent(metainfo.Hash{}, nil)
100 t.initialPieceCheckDisabled = true
101 require.NoError(b, t.setInfo(&metainfo.Info{
102 Pieces: make([]byte, 20),
104 PieceLength: 1 << 20,
106 t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
108 t._pendingPieces.Add(0)
110 cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r))
112 mrlErrChan := make(chan error)
115 Piece: make([]byte, defaultChunkSize),
119 err := cn.mainReadLoop()
125 wb := msg.MustMarshalBinary()
126 b.SetBytes(int64(len(msg.Piece)))
129 for i := 0; i < b.N; i += 1 {
131 // The chunk must be written to storage everytime, to ensure the
132 // writeSem is unlocked.
133 t.pendAllChunkSpecs(0)
134 cn.validReceiveChunks = map[RequestIndex]int{
135 t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
138 n, err := w.Write(wb)
139 require.NoError(b, err)
140 require.EqualValues(b, len(wb), n)
143 if err := w.Close(); err != nil {
147 mrlErr := <-mrlErrChan
148 if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
151 c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
154 func TestConnPexPeerFlags(t *testing.T) {
156 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
157 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
159 testcases := []struct {
163 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
164 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
165 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
166 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
167 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
168 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
169 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
170 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
172 for i, tc := range testcases {
173 f := tc.conn.pexPeerFlags()
174 require.EqualValues(t, tc.f, f, i)
178 func TestConnPexEvent(t *testing.T) {
180 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
181 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
182 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
183 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
185 testcases := []struct {
192 &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
193 pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil},
197 &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
198 pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
202 &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
203 pexEvent{pexAdd, dialTcpAddr, 0, nil},
207 &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
208 pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
211 for i, tc := range testcases {
212 e := tc.c.pexEvent(tc.t)
213 require.EqualValues(t, tc.e, e, i)
217 func TestHaveAllThenBitfield(t *testing.T) {
219 cl := newTestingClient(t)
220 tt := cl.newTorrentForTesting()
221 // cl.newConnection()
225 pc.initRequestState()
227 tt.conns[&pc] = struct{}{}
228 c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
229 c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
230 pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
231 c.Check(pc.peerMinPieces, qt.Equals, 6)
232 c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
233 c.Assert(pc.t.setInfo(&metainfo.Info{
235 Pieces: make([]byte, pieceHash.Size()*7),
238 c.Check(tt.numPieces(), qt.Equals, 7)
239 c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
240 // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
242 {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
246 func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
248 c.Check(interestedMsgLen, qt.Equals, 5)
249 c.Check(requestMsgLen, qt.Equals, 17)
250 c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
251 c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
254 func peerConnForPreferredNetworkDirection(localPeerId, remotePeerId int, outgoing, utp, ipv6 bool) *PeerConn {
256 pc.outgoing = outgoing
261 pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP(fmt.Sprintf("::420"))}
263 pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)}
265 binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId))
267 binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId))
268 pc.t = &Torrent{cl: &cl}
272 func TestPreferredNetworkDirection(t *testing.T) {
273 pc := peerConnForPreferredNetworkDirection
275 // Prefer outgoing to higher peer ID
276 c.Assert(pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsTrue)
277 c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)), qt.IsFalse)
278 c.Assert(pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)), qt.IsTrue)
280 c.Assert(pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
282 c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)), qt.IsFalse)
284 c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
287 func TestReceiveLargeRequest(t *testing.T) {
289 cl := newTestingClient(t)
290 pc := cl.newConnection(nil, false, nil, "test", "")
291 tor := cl.newTorrentForTesting()
292 tor.info = &metainfo.Info{PieceLength: 3 << 20}
294 tor._completedPieces.Add(0)
295 pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true)
297 pc.initMessageWriter()
299 req.Length = defaultChunkSize
300 c.Assert(pc.fastEnabled(), qt.IsTrue)
301 c.Check(pc.onReadRequest(req, false), qt.IsNil)
302 c.Check(pc.peerRequests, qt.HasLen, 1)
304 c.Check(pc.onReadRequest(req, false), qt.IsNil)
305 c.Check(pc.peerRequests, qt.HasLen, 2)
306 pc.peerRequests = nil
307 pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize)
308 req.Length = defaultChunkSize
309 c.Check(pc.onReadRequest(req, false), qt.IsNil)
310 c.Check(pc.peerRequests, qt.HasLen, 1)
312 c.Check(pc.onReadRequest(req, false), qt.IsNil)
313 c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17)