]> Sergey Matveev's repositories - btrtrc.git/blobdiff - peerconn_test.go
Drop support for go 1.20
[btrtrc.git] / peerconn_test.go
index 439f533ab9801e2778a752b031ccd09be5ec2728..e294b6b620590de33886514facd5bfd46cae9bc6 100644 (file)
@@ -1,15 +1,18 @@
 package torrent
 
 import (
+       "encoding/binary"
+       "errors"
+       "fmt"
        "io"
        "net"
        "sync"
        "testing"
-       "time"
 
-       "github.com/anacrolix/missinggo/pubsub"
-       "github.com/bradfitz/iter"
+       "github.com/frankban/quicktest"
+       qt "github.com/frankban/quicktest"
        "github.com/stretchr/testify/require"
+       "golang.org/x/time/rate"
 
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
@@ -19,19 +22,18 @@ import (
 // Ensure that no race exists between sending a bitfield, and a subsequent
 // Have that would potentially alter it.
 func TestSendBitfieldThenHave(t *testing.T) {
-       cl := Client{
-               config: TestingConfig(),
-       }
+       var cl Client
+       cl.init(TestingConfig(t))
        cl.initLogger()
-       c := cl.newConnection(nil, false, nil, "", "")
+       c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"})
        c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
-       c.t.setInfo(&metainfo.Info{
-               Pieces: make([]byte, metainfo.HashSize*3),
-       })
+       if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
+               t.Log(err)
+       }
        r, w := io.Pipe()
-       c.r = r
+       // c.r = r
        c.w = w
-       go c.writer(time.Minute)
+       c.startMessageWriter()
        c.locker().Lock()
        c.t._completedPieces.Add(1)
        c.postBitfield( /*[]bool{false, true, false}*/ )
@@ -87,29 +89,32 @@ func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
 }
 
 func BenchmarkConnectionMainReadLoop(b *testing.B) {
-       cl := &Client{
-               config: &ClientConfig{
-                       DownloadRateLimiter: unlimited,
-               },
-       }
+       c := quicktest.New(b)
+       var cl Client
+       cl.init(&ClientConfig{
+               DownloadRateLimiter: unlimited,
+       })
        cl.initLogger()
        ts := &torrentStorage{}
-       t := &Torrent{
-               cl:                cl,
-               storage:           &storage.Torrent{TorrentImpl: ts},
-               pieceStateChanges: pubsub.NewPubSub(),
-       }
+       t := cl.newTorrent(metainfo.Hash{}, nil)
+       t.initialPieceCheckDisabled = true
        require.NoError(b, t.setInfo(&metainfo.Info{
                Pieces:      make([]byte, 20),
                Length:      1 << 20,
                PieceLength: 1 << 20,
        }))
-       t.setChunkSize(defaultChunkSize)
-       t._pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
+       t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
+       t.onSetInfo()
+       t._pendingPieces.Add(0)
        r, w := net.Pipe()
-       cn := cl.newConnection(r, true, nil, "", "")
+       cn := cl.newConnection(r, newConnectionOpts{
+               outgoing:   true,
+               remoteAddr: r.RemoteAddr(),
+               network:    r.RemoteAddr().Network(),
+               connString: regularNetConnPeerConnConnString(r),
+       })
        cn.setTorrent(t)
-       mrlErr := make(chan error)
+       mrlErrChan := make(chan error)
        msg := pp.Message{
                Type:  pp.Piece,
                Piece: make([]byte, defaultChunkSize),
@@ -118,30 +123,37 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
                cl.lock()
                err := cn.mainReadLoop()
                if err != nil {
-                       mrlErr <- err
+                       mrlErrChan <- err
                }
-               close(mrlErr)
+               close(mrlErrChan)
        }()
        wb := msg.MustMarshalBinary()
        b.SetBytes(int64(len(msg.Piece)))
        go func() {
-               defer w.Close()
                ts.writeSem.Lock()
-               for range iter.N(b.N) {
+               for i := 0; i < b.N; i += 1 {
                        cl.lock()
                        // The chunk must be written to storage everytime, to ensure the
                        // writeSem is unlocked.
-                       t.pieces[0]._dirtyChunks.Clear()
-                       cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): {}}
+                       t.pendAllChunkSpecs(0)
+                       cn.validReceiveChunks = map[RequestIndex]int{
+                               t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
+                       }
                        cl.unlock()
                        n, err := w.Write(wb)
                        require.NoError(b, err)
                        require.EqualValues(b, len(wb), n)
                        ts.writeSem.Lock()
                }
+               if err := w.Close(); err != nil {
+                       panic(err)
+               }
        }()
-       require.NoError(b, <-mrlErr)
-       require.EqualValues(b, b.N, cn._stats.ChunksReadUseful.Int64())
+       mrlErr := <-mrlErrChan
+       if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
+               c.Fatal(mrlErr)
+       }
+       c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
 }
 
 func TestConnPexPeerFlags(t *testing.T) {
@@ -149,18 +161,18 @@ func TestConnPexPeerFlags(t *testing.T) {
                tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
                udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
        )
-       var testcases = []struct {
+       testcases := []struct {
                conn *PeerConn
                f    pp.PexPeerFlags
        }{
-               {&PeerConn{outgoing: false, PeerPrefersEncryption: false}, 0},
-               {&PeerConn{outgoing: false, PeerPrefersEncryption: true}, pp.PexPrefersEncryption},
-               {&PeerConn{outgoing: true, PeerPrefersEncryption: false}, pp.PexOutgoingConn},
-               {&PeerConn{outgoing: true, PeerPrefersEncryption: true}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
-               {&PeerConn{remoteAddr: udpAddr}, pp.PexSupportsUtp},
-               {&PeerConn{remoteAddr: udpAddr, outgoing: true}, pp.PexOutgoingConn | pp.PexSupportsUtp},
-               {&PeerConn{remoteAddr: tcpAddr, outgoing: true}, pp.PexOutgoingConn},
-               {&PeerConn{remoteAddr: tcpAddr}, 0},
+               {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
+               {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
+               {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
+               {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
+               {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
+               {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
+               {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
+               {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
        }
        for i, tc := range testcases {
                f := tc.conn.pexPeerFlags()
@@ -169,40 +181,188 @@ func TestConnPexPeerFlags(t *testing.T) {
 }
 
 func TestConnPexEvent(t *testing.T) {
+       c := qt.New(t)
        var (
                udpAddr     = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
                tcpAddr     = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
                dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
                dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
        )
-       var testcases = []struct {
+       testcases := []struct {
                t pexEventType
                c *PeerConn
                e pexEvent
        }{
                {
                        pexAdd,
-                       &PeerConn{remoteAddr: udpAddr},
-                       pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp},
+                       &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
+                       pexEvent{pexAdd, udpAddr.AddrPort(), pp.PexSupportsUtp, nil},
                },
                {
                        pexDrop,
-                       &PeerConn{remoteAddr: tcpAddr, outgoing: true, PeerListenPort: dialTcpAddr.Port},
-                       pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn},
+                       &PeerConn{
+                               Peer:           Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true},
+                               PeerListenPort: dialTcpAddr.Port,
+                       },
+                       pexEvent{pexDrop, tcpAddr.AddrPort(), pp.PexOutgoingConn, nil},
                },
                {
                        pexAdd,
-                       &PeerConn{remoteAddr: tcpAddr, PeerListenPort: dialTcpAddr.Port},
-                       pexEvent{pexAdd, dialTcpAddr, 0},
+                       &PeerConn{
+                               Peer:           Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()},
+                               PeerListenPort: dialTcpAddr.Port,
+                       },
+                       pexEvent{pexAdd, dialTcpAddr.AddrPort(), 0, nil},
                },
                {
                        pexDrop,
-                       &PeerConn{remoteAddr: udpAddr, PeerListenPort: dialUdpAddr.Port},
-                       pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp},
+                       &PeerConn{
+                               Peer:           Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()},
+                               PeerListenPort: dialUdpAddr.Port,
+                       },
+                       pexEvent{pexDrop, dialUdpAddr.AddrPort(), pp.PexSupportsUtp, nil},
                },
        }
        for i, tc := range testcases {
-               e := tc.c.pexEvent(tc.t)
-               require.EqualValues(t, tc.e, e, i)
+               c.Run(fmt.Sprintf("%v", i), func(c *qt.C) {
+                       e, err := tc.c.pexEvent(tc.t)
+                       c.Assert(err, qt.IsNil)
+                       c.Check(e, qt.Equals, tc.e)
+               })
+       }
+}
+
+func TestHaveAllThenBitfield(t *testing.T) {
+       c := qt.New(t)
+       cl := newTestingClient(t)
+       tt := cl.newTorrentForTesting()
+       // cl.newConnection()
+       pc := PeerConn{
+               Peer: Peer{t: tt},
+       }
+       pc.initRequestState()
+       pc.peerImpl = &pc
+       tt.conns[&pc] = struct{}{}
+       c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
+       c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
+       pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
+       c.Check(pc.peerMinPieces, qt.Equals, 6)
+       c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
+       c.Assert(pc.t.setInfo(&metainfo.Info{
+               PieceLength: 0,
+               Pieces:      make([]byte, pieceHash.Size()*7),
+       }), qt.IsNil)
+       pc.t.onSetInfo()
+       c.Check(tt.numPieces(), qt.Equals, 7)
+       c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
+               // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
+               // pieces.
+               {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
+       })
+}
+
+func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
+       c := qt.New(t)
+       c.Check(interestedMsgLen, qt.Equals, 5)
+       c.Check(requestMsgLen, qt.Equals, 17)
+       c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
+       c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
+}
+
+func peerConnForPreferredNetworkDirection(
+       localPeerId, remotePeerId int,
+       outgoing, utp, ipv6 bool,
+) *PeerConn {
+       pc := PeerConn{}
+       pc.outgoing = outgoing
+       if utp {
+               pc.Network = "udp"
+       }
+       if ipv6 {
+               pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP("::420")}
+       } else {
+               pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)}
+       }
+       binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId))
+       cl := Client{}
+       binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId))
+       pc.t = &Torrent{cl: &cl}
+       return &pc
+}
+
+func TestPreferredNetworkDirection(t *testing.T) {
+       pc := peerConnForPreferredNetworkDirection
+       c := qt.New(t)
+
+       // Prefer outgoing to lower peer ID
+
+       c.Check(
+               pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
+               qt.IsFalse,
+       )
+       c.Check(
+               pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)),
+               qt.IsTrue,
+       )
+       c.Check(
+               pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)),
+               qt.IsFalse,
+       )
+
+       // Don't prefer uTP
+       c.Check(
+               pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
+               qt.IsFalse,
+       )
+       // Prefer IPv6
+       c.Check(
+               pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)),
+               qt.IsFalse,
+       )
+       // No difference
+       c.Check(
+               pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
+               qt.IsFalse,
+       )
+}
+
+func TestReceiveLargeRequest(t *testing.T) {
+       c := qt.New(t)
+       cl := newTestingClient(t)
+       pc := cl.newConnection(nil, newConnectionOpts{network: "test"})
+       tor := cl.newTorrentForTesting()
+       tor.info = &metainfo.Info{PieceLength: 3 << 20}
+       pc.setTorrent(tor)
+       tor._completedPieces.Add(0)
+       pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true)
+       pc.choking = false
+       pc.initMessageWriter()
+       req := Request{}
+       req.Length = defaultChunkSize
+       c.Assert(pc.fastEnabled(), qt.IsTrue)
+       c.Check(pc.onReadRequest(req, false), qt.IsNil)
+       c.Check(pc.peerRequests, qt.HasLen, 1)
+       req.Length = 2 << 20
+       c.Check(pc.onReadRequest(req, false), qt.IsNil)
+       c.Check(pc.peerRequests, qt.HasLen, 2)
+       pc.peerRequests = nil
+       pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize)
+       req.Length = defaultChunkSize
+       c.Check(pc.onReadRequest(req, false), qt.IsNil)
+       c.Check(pc.peerRequests, qt.HasLen, 1)
+       req.Length = 2 << 20
+       c.Check(pc.onReadRequest(req, false), qt.IsNil)
+       c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17)
+}
+
+func TestChunkOverflowsPiece(t *testing.T) {
+       c := qt.New(t)
+       check := func(begin, length, limit pp.Integer, expected bool) {
+               c.Check(chunkOverflowsPiece(ChunkSpec{begin, length}, limit), qt.Equals, expected)
        }
+       check(2, 3, 1, true)
+       check(2, pp.IntegerMax, 1, true)
+       check(2, pp.IntegerMax, 3, true)
+       check(2, pp.IntegerMax, pp.IntegerMax, true)
+       check(2, pp.IntegerMax-2, pp.IntegerMax, false)
 }