X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=peerconn_test.go;h=e294b6b620590de33886514facd5bfd46cae9bc6;hb=HEAD;hp=002a5fca399de2858da971b632394f961dc2c721;hpb=dff436f102060d84f8cc5b433c94bac8985589b4;p=btrtrc.git diff --git a/peerconn_test.go b/peerconn_test.go index 002a5fca..e294b6b6 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -1,15 +1,18 @@ package torrent import ( + "encoding/binary" + "errors" + "fmt" "io" "net" "sync" "testing" - "time" - "github.com/anacrolix/missinggo/pubsub" - "github.com/bradfitz/iter" + "github.com/frankban/quicktest" + qt "github.com/frankban/quicktest" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" @@ -19,32 +22,31 @@ import ( // Ensure that no race exists between sending a bitfield, and a subsequent // Have that would potentially alter it. func TestSendBitfieldThenHave(t *testing.T) { - cl := Client{ - config: TestingConfig(), - } + var cl Client + cl.init(TestingConfig(t)) cl.initLogger() - c := cl.newConnection(nil, false, nil, "") + c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"}) c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) - c.t.setInfo(&metainfo.Info{ - Pieces: make([]byte, metainfo.HashSize*3), - }) + if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil { + t.Log(err) + } r, w := io.Pipe() - c.r = r + // c.r = r c.w = w - go c.writer(time.Minute) - c.mu().Lock() + c.startMessageWriter() + c.locker().Lock() c.t._completedPieces.Add(1) c.postBitfield( /*[]bool{false, true, false}*/ ) - c.mu().Unlock() - c.mu().Lock() + c.locker().Unlock() + c.locker().Lock() c.have(2) - c.mu().Unlock() + c.locker().Unlock() b := make([]byte, 15) n, err := io.ReadFull(r, b) - c.mu().Lock() + c.locker().Lock() // This will cause connection.writer to terminate. c.closed.Set() - c.mu().Unlock() + c.locker().Unlock() require.NoError(t, err) require.EqualValues(t, 15, n) // Here we see that the bitfield doesn't have piece 2 set, as that should @@ -87,29 +89,32 @@ func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) { } func BenchmarkConnectionMainReadLoop(b *testing.B) { - cl := &Client{ - config: &ClientConfig{ - DownloadRateLimiter: unlimited, - }, - } + c := quicktest.New(b) + var cl Client + cl.init(&ClientConfig{ + DownloadRateLimiter: unlimited, + }) cl.initLogger() ts := &torrentStorage{} - t := &Torrent{ - cl: cl, - storage: &storage.Torrent{TorrentImpl: ts}, - pieceStateChanges: pubsub.NewPubSub(), - } + t := cl.newTorrent(metainfo.Hash{}, nil) + t.initialPieceCheckDisabled = true require.NoError(b, t.setInfo(&metainfo.Info{ Pieces: make([]byte, 20), Length: 1 << 20, PieceLength: 1 << 20, })) - t.setChunkSize(defaultChunkSize) - t._pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority()) + t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}} + t.onSetInfo() + t._pendingPieces.Add(0) r, w := net.Pipe() - cn := cl.newConnection(r, true, nil, "") + cn := cl.newConnection(r, newConnectionOpts{ + outgoing: true, + remoteAddr: r.RemoteAddr(), + network: r.RemoteAddr().Network(), + connString: regularNetConnPeerConnConnString(r), + }) cn.setTorrent(t) - mrlErr := make(chan error) + mrlErrChan := make(chan error) msg := pp.Message{ Type: pp.Piece, Piece: make([]byte, defaultChunkSize), @@ -118,28 +123,246 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { cl.lock() err := cn.mainReadLoop() if err != nil { - mrlErr <- err + mrlErrChan <- err } - close(mrlErr) + close(mrlErrChan) }() wb := msg.MustMarshalBinary() b.SetBytes(int64(len(msg.Piece))) go func() { - defer w.Close() ts.writeSem.Lock() - for range iter.N(b.N) { + for i := 0; i < b.N; i += 1 { cl.lock() // The chunk must be written to storage everytime, to ensure the // writeSem is unlocked. - t.pieces[0]._dirtyChunks.Clear() - cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): {}} + t.pendAllChunkSpecs(0) + cn.validReceiveChunks = map[RequestIndex]int{ + t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1, + } cl.unlock() n, err := w.Write(wb) require.NoError(b, err) require.EqualValues(b, len(wb), n) ts.writeSem.Lock() } + if err := w.Close(); err != nil { + panic(err) + } }() - require.NoError(b, <-mrlErr) - require.EqualValues(b, b.N, cn._stats.ChunksReadUseful.Int64()) + mrlErr := <-mrlErrChan + if mrlErr != nil && !errors.Is(mrlErr, io.EOF) { + c.Fatal(mrlErr) + } + c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N)) +} + +func TestConnPexPeerFlags(t *testing.T) { + var ( + tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} + udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848} + ) + testcases := []struct { + conn *PeerConn + f pp.PexPeerFlags + }{ + {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0}, + {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption}, + {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn}, + {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption}, + {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp}, + {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp}, + {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn}, + {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0}, + } + for i, tc := range testcases { + f := tc.conn.pexPeerFlags() + require.EqualValues(t, tc.f, f, i) + } +} + +func TestConnPexEvent(t *testing.T) { + c := qt.New(t) + var ( + udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848} + tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} + dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747} + dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747} + ) + testcases := []struct { + t pexEventType + c *PeerConn + e pexEvent + }{ + { + pexAdd, + &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, + pexEvent{pexAdd, udpAddr.AddrPort(), pp.PexSupportsUtp, nil}, + }, + { + pexDrop, + &PeerConn{ + Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, + PeerListenPort: dialTcpAddr.Port, + }, + pexEvent{pexDrop, tcpAddr.AddrPort(), pp.PexOutgoingConn, nil}, + }, + { + pexAdd, + &PeerConn{ + Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, + PeerListenPort: dialTcpAddr.Port, + }, + pexEvent{pexAdd, dialTcpAddr.AddrPort(), 0, nil}, + }, + { + pexDrop, + &PeerConn{ + Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, + PeerListenPort: dialUdpAddr.Port, + }, + pexEvent{pexDrop, dialUdpAddr.AddrPort(), pp.PexSupportsUtp, nil}, + }, + } + for i, tc := range testcases { + c.Run(fmt.Sprintf("%v", i), func(c *qt.C) { + e, err := tc.c.pexEvent(tc.t) + c.Assert(err, qt.IsNil) + c.Check(e, qt.Equals, tc.e) + }) + } +} + +func TestHaveAllThenBitfield(t *testing.T) { + c := qt.New(t) + cl := newTestingClient(t) + tt := cl.newTorrentForTesting() + // cl.newConnection() + pc := PeerConn{ + Peer: Peer{t: tt}, + } + pc.initRequestState() + pc.peerImpl = &pc + tt.conns[&pc] = struct{}{} + c.Assert(pc.onPeerSentHaveAll(), qt.IsNil) + c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}}) + pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false}) + c.Check(pc.peerMinPieces, qt.Equals, 6) + c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0) + c.Assert(pc.t.setInfo(&metainfo.Info{ + PieceLength: 0, + Pieces: make([]byte, pieceHash.Size()*7), + }), qt.IsNil) + pc.t.onSetInfo() + c.Check(tt.numPieces(), qt.Equals, 7) + c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{ + // The last element of the bitfield is irrelevant, as the Torrent actually only has 7 + // pieces. + {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0}, + }) +} + +func TestApplyRequestStateWriteBufferConstraints(t *testing.T) { + c := qt.New(t) + c.Check(interestedMsgLen, qt.Equals, 5) + c.Check(requestMsgLen, qt.Equals, 17) + c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue) + c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests) +} + +func peerConnForPreferredNetworkDirection( + localPeerId, remotePeerId int, + outgoing, utp, ipv6 bool, +) *PeerConn { + pc := PeerConn{} + pc.outgoing = outgoing + if utp { + pc.Network = "udp" + } + if ipv6 { + pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP("::420")} + } else { + pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)} + } + binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId)) + cl := Client{} + binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId)) + pc.t = &Torrent{cl: &cl} + return &pc +} + +func TestPreferredNetworkDirection(t *testing.T) { + pc := peerConnForPreferredNetworkDirection + c := qt.New(t) + + // Prefer outgoing to lower peer ID + + c.Check( + pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), + qt.IsFalse, + ) + c.Check( + pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)), + qt.IsTrue, + ) + c.Check( + pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)), + qt.IsFalse, + ) + + // Don't prefer uTP + c.Check( + pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), + qt.IsFalse, + ) + // Prefer IPv6 + c.Check( + pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)), + qt.IsFalse, + ) + // No difference + c.Check( + pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), + qt.IsFalse, + ) +} + +func TestReceiveLargeRequest(t *testing.T) { + c := qt.New(t) + cl := newTestingClient(t) + pc := cl.newConnection(nil, newConnectionOpts{network: "test"}) + tor := cl.newTorrentForTesting() + tor.info = &metainfo.Info{PieceLength: 3 << 20} + pc.setTorrent(tor) + tor._completedPieces.Add(0) + pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true) + pc.choking = false + pc.initMessageWriter() + req := Request{} + req.Length = defaultChunkSize + c.Assert(pc.fastEnabled(), qt.IsTrue) + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.peerRequests, qt.HasLen, 1) + req.Length = 2 << 20 + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.peerRequests, qt.HasLen, 2) + pc.peerRequests = nil + pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize) + req.Length = defaultChunkSize + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.peerRequests, qt.HasLen, 1) + req.Length = 2 << 20 + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17) +} + +func TestChunkOverflowsPiece(t *testing.T) { + c := qt.New(t) + check := func(begin, length, limit pp.Integer, expected bool) { + c.Check(chunkOverflowsPiece(ChunkSpec{begin, length}, limit), qt.Equals, expected) + } + check(2, 3, 1, true) + check(2, pp.IntegerMax, 1, true) + check(2, pp.IntegerMax, 3, true) + check(2, pp.IntegerMax, pp.IntegerMax, true) + check(2, pp.IntegerMax-2, pp.IntegerMax, false) }