X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=peerconn_test.go;h=e294b6b620590de33886514facd5bfd46cae9bc6;hb=HEAD;hp=654f7689fcd1d9fc105e14ff34d6e02e53be5066;hpb=456a2f7c5d47567467451feef5a386722fb4b6bf;p=btrtrc.git diff --git a/peerconn_test.go b/peerconn_test.go index 654f7689..e294b6b6 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -1,16 +1,18 @@ package torrent import ( + "encoding/binary" + "errors" + "fmt" "io" "net" "sync" "testing" - "time" - "github.com/anacrolix/missinggo/pubsub" - "github.com/bradfitz/iter" "github.com/frankban/quicktest" + qt "github.com/frankban/quicktest" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" @@ -20,19 +22,18 @@ import ( // Ensure that no race exists between sending a bitfield, and a subsequent // Have that would potentially alter it. func TestSendBitfieldThenHave(t *testing.T) { - cl := Client{ - config: TestingConfig(), - } + var cl Client + cl.init(TestingConfig(t)) cl.initLogger() - c := cl.newConnection(nil, false, nil, "io.Pipe", "") + c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"}) c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) - c.t.setInfo(&metainfo.Info{ - Pieces: make([]byte, metainfo.HashSize*3), - }) + if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil { + t.Log(err) + } r, w := io.Pipe() - //c.r = r + // c.r = r c.w = w - go c.writer(time.Minute) + c.startMessageWriter() c.locker().Lock() c.t._completedPieces.Add(1) c.postBitfield( /*[]bool{false, true, false}*/ ) @@ -89,29 +90,31 @@ func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) { func BenchmarkConnectionMainReadLoop(b *testing.B) { c := quicktest.New(b) - cl := &Client{ - config: &ClientConfig{ - DownloadRateLimiter: unlimited, - }, - } + var cl Client + cl.init(&ClientConfig{ + DownloadRateLimiter: unlimited, + }) cl.initLogger() ts := &torrentStorage{} - t := &Torrent{ - cl: cl, - storage: &storage.Torrent{TorrentImpl: ts}, - pieceStateChanges: pubsub.NewPubSub(), - } + t := cl.newTorrent(metainfo.Hash{}, nil) + t.initialPieceCheckDisabled = true require.NoError(b, t.setInfo(&metainfo.Info{ Pieces: make([]byte, 20), Length: 1 << 20, PieceLength: 1 << 20, })) - t.setChunkSize(defaultChunkSize) - t._pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority()) + t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}} + t.onSetInfo() + t._pendingPieces.Add(0) r, w := net.Pipe() - cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r)) + cn := cl.newConnection(r, newConnectionOpts{ + outgoing: true, + remoteAddr: r.RemoteAddr(), + network: r.RemoteAddr().Network(), + connString: regularNetConnPeerConnConnString(r), + }) cn.setTorrent(t) - mrlErr := make(chan error) + mrlErrChan := make(chan error) msg := pp.Message{ Type: pp.Piece, Piece: make([]byte, defaultChunkSize), @@ -120,29 +123,36 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { cl.lock() err := cn.mainReadLoop() if err != nil { - mrlErr <- err + mrlErrChan <- err } - close(mrlErr) + close(mrlErrChan) }() wb := msg.MustMarshalBinary() b.SetBytes(int64(len(msg.Piece))) go func() { - defer w.Close() ts.writeSem.Lock() - for range iter.N(b.N) { + for i := 0; i < b.N; i += 1 { cl.lock() // The chunk must be written to storage everytime, to ensure the // writeSem is unlocked. - t.pieces[0]._dirtyChunks.Clear() - cn.validReceiveChunks = map[Request]int{newRequestFromMessage(&msg): 1} + t.pendAllChunkSpecs(0) + cn.validReceiveChunks = map[RequestIndex]int{ + t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1, + } cl.unlock() n, err := w.Write(wb) require.NoError(b, err) require.EqualValues(b, len(wb), n) ts.writeSem.Lock() } + if err := w.Close(); err != nil { + panic(err) + } }() - c.Assert([]error{nil, io.EOF}, quicktest.Contains, <-mrlErr) + mrlErr := <-mrlErrChan + if mrlErr != nil && !errors.Is(mrlErr, io.EOF) { + c.Fatal(mrlErr) + } c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N)) } @@ -151,7 +161,7 @@ func TestConnPexPeerFlags(t *testing.T) { tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848} ) - var testcases = []struct { + testcases := []struct { conn *PeerConn f pp.PexPeerFlags }{ @@ -171,13 +181,14 @@ func TestConnPexPeerFlags(t *testing.T) { } func TestConnPexEvent(t *testing.T) { + c := qt.New(t) var ( udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848} tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747} dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747} ) - var testcases = []struct { + testcases := []struct { t pexEventType c *PeerConn e pexEvent @@ -185,26 +196,173 @@ func TestConnPexEvent(t *testing.T) { { pexAdd, &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, - pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp}, + pexEvent{pexAdd, udpAddr.AddrPort(), pp.PexSupportsUtp, nil}, }, { pexDrop, - &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}}, - pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn}, + &PeerConn{ + Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, + PeerListenPort: dialTcpAddr.Port, + }, + pexEvent{pexDrop, tcpAddr.AddrPort(), pp.PexOutgoingConn, nil}, }, { pexAdd, - &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}}, - pexEvent{pexAdd, dialTcpAddr, 0}, + &PeerConn{ + Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, + PeerListenPort: dialTcpAddr.Port, + }, + pexEvent{pexAdd, dialTcpAddr.AddrPort(), 0, nil}, }, { pexDrop, - &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}}, - pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp}, + &PeerConn{ + Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, + PeerListenPort: dialUdpAddr.Port, + }, + pexEvent{pexDrop, dialUdpAddr.AddrPort(), pp.PexSupportsUtp, nil}, }, } for i, tc := range testcases { - e := tc.c.pexEvent(tc.t) - require.EqualValues(t, tc.e, e, i) + c.Run(fmt.Sprintf("%v", i), func(c *qt.C) { + e, err := tc.c.pexEvent(tc.t) + c.Assert(err, qt.IsNil) + c.Check(e, qt.Equals, tc.e) + }) + } +} + +func TestHaveAllThenBitfield(t *testing.T) { + c := qt.New(t) + cl := newTestingClient(t) + tt := cl.newTorrentForTesting() + // cl.newConnection() + pc := PeerConn{ + Peer: Peer{t: tt}, + } + pc.initRequestState() + pc.peerImpl = &pc + tt.conns[&pc] = struct{}{} + c.Assert(pc.onPeerSentHaveAll(), qt.IsNil) + c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}}) + pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false}) + c.Check(pc.peerMinPieces, qt.Equals, 6) + c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0) + c.Assert(pc.t.setInfo(&metainfo.Info{ + PieceLength: 0, + Pieces: make([]byte, pieceHash.Size()*7), + }), qt.IsNil) + pc.t.onSetInfo() + c.Check(tt.numPieces(), qt.Equals, 7) + c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{ + // The last element of the bitfield is irrelevant, as the Torrent actually only has 7 + // pieces. + {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0}, + }) +} + +func TestApplyRequestStateWriteBufferConstraints(t *testing.T) { + c := qt.New(t) + c.Check(interestedMsgLen, qt.Equals, 5) + c.Check(requestMsgLen, qt.Equals, 17) + c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue) + c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests) +} + +func peerConnForPreferredNetworkDirection( + localPeerId, remotePeerId int, + outgoing, utp, ipv6 bool, +) *PeerConn { + pc := PeerConn{} + pc.outgoing = outgoing + if utp { + pc.Network = "udp" + } + if ipv6 { + pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP("::420")} + } else { + pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)} + } + binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId)) + cl := Client{} + binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId)) + pc.t = &Torrent{cl: &cl} + return &pc +} + +func TestPreferredNetworkDirection(t *testing.T) { + pc := peerConnForPreferredNetworkDirection + c := qt.New(t) + + // Prefer outgoing to lower peer ID + + c.Check( + pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), + qt.IsFalse, + ) + c.Check( + pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)), + qt.IsTrue, + ) + c.Check( + pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)), + qt.IsFalse, + ) + + // Don't prefer uTP + c.Check( + pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), + qt.IsFalse, + ) + // Prefer IPv6 + c.Check( + pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)), + qt.IsFalse, + ) + // No difference + c.Check( + pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), + qt.IsFalse, + ) +} + +func TestReceiveLargeRequest(t *testing.T) { + c := qt.New(t) + cl := newTestingClient(t) + pc := cl.newConnection(nil, newConnectionOpts{network: "test"}) + tor := cl.newTorrentForTesting() + tor.info = &metainfo.Info{PieceLength: 3 << 20} + pc.setTorrent(tor) + tor._completedPieces.Add(0) + pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true) + pc.choking = false + pc.initMessageWriter() + req := Request{} + req.Length = defaultChunkSize + c.Assert(pc.fastEnabled(), qt.IsTrue) + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.peerRequests, qt.HasLen, 1) + req.Length = 2 << 20 + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.peerRequests, qt.HasLen, 2) + pc.peerRequests = nil + pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize) + req.Length = defaultChunkSize + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.peerRequests, qt.HasLen, 1) + req.Length = 2 << 20 + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17) +} + +func TestChunkOverflowsPiece(t *testing.T) { + c := qt.New(t) + check := func(begin, length, limit pp.Integer, expected bool) { + c.Check(chunkOverflowsPiece(ChunkSpec{begin, length}, limit), qt.Equals, expected) } + check(2, 3, 1, true) + check(2, pp.IntegerMax, 1, true) + check(2, pp.IntegerMax, 3, true) + check(2, pp.IntegerMax, pp.IntegerMax, true) + check(2, pp.IntegerMax-2, pp.IntegerMax, false) }