package torrent import ( "encoding/binary" "errors" "fmt" "io" "net" "sync" "testing" "github.com/frankban/quicktest" qt "github.com/frankban/quicktest" "github.com/stretchr/testify/require" "golang.org/x/time/rate" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/storage" ) // Ensure that no race exists between sending a bitfield, and a subsequent // Have that would potentially alter it. func TestSendBitfieldThenHave(t *testing.T) { var cl Client cl.init(TestingConfig(t)) cl.initLogger() c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"}) c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil { t.Log(err) } r, w := io.Pipe() // c.r = r c.w = w c.startMessageWriter() c.locker().Lock() c.t._completedPieces.Add(1) c.postBitfield( /*[]bool{false, true, false}*/ ) c.locker().Unlock() c.locker().Lock() c.have(2) c.locker().Unlock() b := make([]byte, 15) n, err := io.ReadFull(r, b) c.locker().Lock() // This will cause connection.writer to terminate. c.closed.Set() c.locker().Unlock() require.NoError(t, err) require.EqualValues(t, 15, n) // Here we see that the bitfield doesn't have piece 2 set, as that should // arrive in the following Have message. require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b)) } type torrentStorage struct { writeSem sync.Mutex } func (me *torrentStorage) Close() error { return nil } func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl { return me } func (me *torrentStorage) Completion() storage.Completion { return storage.Completion{} } func (me *torrentStorage) MarkComplete() error { return nil } func (me *torrentStorage) MarkNotComplete() error { return nil } func (me *torrentStorage) ReadAt([]byte, int64) (int, error) { panic("shouldn't be called") } func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) { if len(b) != defaultChunkSize { panic(len(b)) } me.writeSem.Unlock() return len(b), nil } func BenchmarkConnectionMainReadLoop(b *testing.B) { c := quicktest.New(b) var cl Client cl.init(&ClientConfig{ DownloadRateLimiter: unlimited, }) cl.initLogger() ts := &torrentStorage{} t := cl.newTorrent(metainfo.Hash{}, nil) t.initialPieceCheckDisabled = true require.NoError(b, t.setInfo(&metainfo.Info{ Pieces: make([]byte, 20), Length: 1 << 20, PieceLength: 1 << 20, })) t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}} t.onSetInfo() t._pendingPieces.Add(0) r, w := net.Pipe() cn := cl.newConnection(r, newConnectionOpts{ outgoing: true, remoteAddr: r.RemoteAddr(), network: r.RemoteAddr().Network(), connString: regularNetConnPeerConnConnString(r), }) cn.setTorrent(t) mrlErrChan := make(chan error) msg := pp.Message{ Type: pp.Piece, Piece: make([]byte, defaultChunkSize), } go func() { cl.lock() err := cn.mainReadLoop() if err != nil { mrlErrChan <- err } close(mrlErrChan) }() wb := msg.MustMarshalBinary() b.SetBytes(int64(len(msg.Piece))) go func() { ts.writeSem.Lock() for i := 0; i < b.N; i += 1 { cl.lock() // The chunk must be written to storage everytime, to ensure the // writeSem is unlocked. t.pendAllChunkSpecs(0) cn.validReceiveChunks = map[RequestIndex]int{ t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1, } cl.unlock() n, err := w.Write(wb) require.NoError(b, err) require.EqualValues(b, len(wb), n) ts.writeSem.Lock() } if err := w.Close(); err != nil { panic(err) } }() mrlErr := <-mrlErrChan if mrlErr != nil && !errors.Is(mrlErr, io.EOF) { c.Fatal(mrlErr) } c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N)) } func TestConnPexPeerFlags(t *testing.T) { var ( tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848} ) testcases := []struct { conn *PeerConn f pp.PexPeerFlags }{ {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0}, {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption}, {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn}, {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption}, {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp}, {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp}, {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn}, {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0}, } for i, tc := range testcases { f := tc.conn.pexPeerFlags() require.EqualValues(t, tc.f, f, i) } } func TestConnPexEvent(t *testing.T) { c := qt.New(t) var ( udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848} tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747} dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747} ) testcases := []struct { t pexEventType c *PeerConn e pexEvent }{ { pexAdd, &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pexEvent{pexAdd, udpAddr.AddrPort(), pp.PexSupportsUtp, nil}, }, { pexDrop, &PeerConn{ Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, PeerListenPort: dialTcpAddr.Port, }, pexEvent{pexDrop, tcpAddr.AddrPort(), pp.PexOutgoingConn, nil}, }, { pexAdd, &PeerConn{ Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, PeerListenPort: dialTcpAddr.Port, }, pexEvent{pexAdd, dialTcpAddr.AddrPort(), 0, nil}, }, { pexDrop, &PeerConn{ Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, PeerListenPort: dialUdpAddr.Port, }, pexEvent{pexDrop, dialUdpAddr.AddrPort(), pp.PexSupportsUtp, nil}, }, } for i, tc := range testcases { c.Run(fmt.Sprintf("%v", i), func(c *qt.C) { e, err := tc.c.pexEvent(tc.t) c.Assert(err, qt.IsNil) c.Check(e, qt.Equals, tc.e) }) } } func TestHaveAllThenBitfield(t *testing.T) { c := qt.New(t) cl := newTestingClient(t) tt := cl.newTorrentForTesting() // cl.newConnection() pc := PeerConn{ Peer: Peer{t: tt}, } pc.initRequestState() pc.peerImpl = &pc tt.conns[&pc] = struct{}{} c.Assert(pc.onPeerSentHaveAll(), qt.IsNil) c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}}) pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false}) c.Check(pc.peerMinPieces, qt.Equals, 6) c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0) c.Assert(pc.t.setInfo(&metainfo.Info{ PieceLength: 0, Pieces: make([]byte, pieceHash.Size()*7), }), qt.IsNil) pc.t.onSetInfo() c.Check(tt.numPieces(), qt.Equals, 7) c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{ // The last element of the bitfield is irrelevant, as the Torrent actually only has 7 // pieces. {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0}, }) } func TestApplyRequestStateWriteBufferConstraints(t *testing.T) { c := qt.New(t) c.Check(interestedMsgLen, qt.Equals, 5) c.Check(requestMsgLen, qt.Equals, 17) c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue) c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests) } func peerConnForPreferredNetworkDirection( localPeerId, remotePeerId int, outgoing, utp, ipv6 bool, ) *PeerConn { pc := PeerConn{} pc.outgoing = outgoing if utp { pc.Network = "udp" } if ipv6 { pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP("::420")} } else { pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)} } binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId)) cl := Client{} binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId)) pc.t = &Torrent{cl: &cl} return &pc } func TestPreferredNetworkDirection(t *testing.T) { pc := peerConnForPreferredNetworkDirection c := qt.New(t) // Prefer outgoing to lower peer ID c.Check( pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse, ) c.Check( pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)), qt.IsTrue, ) c.Check( pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)), qt.IsFalse, ) // Don't prefer uTP c.Check( pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse, ) // Prefer IPv6 c.Check( pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)), qt.IsFalse, ) // No difference c.Check( pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse, ) } func TestReceiveLargeRequest(t *testing.T) { c := qt.New(t) cl := newTestingClient(t) pc := cl.newConnection(nil, newConnectionOpts{network: "test"}) tor := cl.newTorrentForTesting() tor.info = &metainfo.Info{PieceLength: 3 << 20} pc.setTorrent(tor) tor._completedPieces.Add(0) pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true) pc.choking = false pc.initMessageWriter() req := Request{} req.Length = defaultChunkSize c.Assert(pc.fastEnabled(), qt.IsTrue) c.Check(pc.onReadRequest(req, false), qt.IsNil) c.Check(pc.peerRequests, qt.HasLen, 1) req.Length = 2 << 20 c.Check(pc.onReadRequest(req, false), qt.IsNil) c.Check(pc.peerRequests, qt.HasLen, 2) pc.peerRequests = nil pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize) req.Length = defaultChunkSize c.Check(pc.onReadRequest(req, false), qt.IsNil) c.Check(pc.peerRequests, qt.HasLen, 1) req.Length = 2 << 20 c.Check(pc.onReadRequest(req, false), qt.IsNil) c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17) } func TestChunkOverflowsPiece(t *testing.T) { c := qt.New(t) check := func(begin, length, limit pp.Integer, expected bool) { c.Check(chunkOverflowsPiece(ChunkSpec{begin, length}, limit), qt.Equals, expected) } check(2, 3, 1, true) check(2, pp.IntegerMax, 1, true) check(2, pp.IntegerMax, 3, true) check(2, pp.IntegerMax, pp.IntegerMax, true) check(2, pp.IntegerMax-2, pp.IntegerMax, false) }