X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=peerconn_test.go;h=e294b6b620590de33886514facd5bfd46cae9bc6;hb=HEAD;hp=93bc7aa565c2523dedfd734b239e70889502f64b;hpb=eb2fc6dbef0e6a988fc4747ac1c941811334de43;p=btrtrc.git diff --git a/peerconn_test.go b/peerconn_test.go index 93bc7aa5..e294b6b6 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -1,15 +1,18 @@ package torrent import ( + "encoding/binary" "errors" + "fmt" "io" "net" "sync" "testing" - "github.com/anacrolix/missinggo/pubsub" "github.com/frankban/quicktest" + qt "github.com/frankban/quicktest" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" @@ -22,15 +25,15 @@ func TestSendBitfieldThenHave(t *testing.T) { var cl Client cl.init(TestingConfig(t)) cl.initLogger() - c := cl.newConnection(nil, false, nil, "io.Pipe", "") + c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"}) c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil { t.Log(err) } r, w := io.Pipe() - //c.r = r + // c.r = r c.w = w - c.startWriter() + c.startMessageWriter() c.locker().Lock() c.t._completedPieces.Add(1) c.postBitfield( /*[]bool{false, true, false}*/ ) @@ -93,20 +96,23 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { }) cl.initLogger() ts := &torrentStorage{} - t := &Torrent{ - cl: &cl, - storage: &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}, - pieceStateChanges: pubsub.NewPubSub(), - } - t.setChunkSize(defaultChunkSize) + t := cl.newTorrent(metainfo.Hash{}, nil) + t.initialPieceCheckDisabled = true require.NoError(b, t.setInfo(&metainfo.Info{ Pieces: make([]byte, 20), Length: 1 << 20, PieceLength: 1 << 20, })) + t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}} + t.onSetInfo() t._pendingPieces.Add(0) r, w := net.Pipe() - cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r)) + cn := cl.newConnection(r, newConnectionOpts{ + outgoing: true, + remoteAddr: r.RemoteAddr(), + network: r.RemoteAddr().Network(), + connString: regularNetConnPeerConnConnString(r), + }) cn.setTorrent(t) mrlErrChan := make(chan error) msg := pp.Message{ @@ -155,7 +161,7 @@ func TestConnPexPeerFlags(t *testing.T) { tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848} ) - var testcases = []struct { + testcases := []struct { conn *PeerConn f pp.PexPeerFlags }{ @@ -175,13 +181,14 @@ func TestConnPexPeerFlags(t *testing.T) { } func TestConnPexEvent(t *testing.T) { + c := qt.New(t) var ( udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848} tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747} dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747} ) - var testcases = []struct { + testcases := []struct { t pexEventType c *PeerConn e pexEvent @@ -189,26 +196,173 @@ func TestConnPexEvent(t *testing.T) { { pexAdd, &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, - pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp}, + pexEvent{pexAdd, udpAddr.AddrPort(), pp.PexSupportsUtp, nil}, }, { pexDrop, - &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}}, - pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn}, + &PeerConn{ + Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, + PeerListenPort: dialTcpAddr.Port, + }, + pexEvent{pexDrop, tcpAddr.AddrPort(), pp.PexOutgoingConn, nil}, }, { pexAdd, - &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}}, - pexEvent{pexAdd, dialTcpAddr, 0}, + &PeerConn{ + Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, + PeerListenPort: dialTcpAddr.Port, + }, + pexEvent{pexAdd, dialTcpAddr.AddrPort(), 0, nil}, }, { pexDrop, - &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}}, - pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp}, + &PeerConn{ + Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, + PeerListenPort: dialUdpAddr.Port, + }, + pexEvent{pexDrop, dialUdpAddr.AddrPort(), pp.PexSupportsUtp, nil}, }, } for i, tc := range testcases { - e := tc.c.pexEvent(tc.t) - require.EqualValues(t, tc.e, e, i) + c.Run(fmt.Sprintf("%v", i), func(c *qt.C) { + e, err := tc.c.pexEvent(tc.t) + c.Assert(err, qt.IsNil) + c.Check(e, qt.Equals, tc.e) + }) + } +} + +func TestHaveAllThenBitfield(t *testing.T) { + c := qt.New(t) + cl := newTestingClient(t) + tt := cl.newTorrentForTesting() + // cl.newConnection() + pc := PeerConn{ + Peer: Peer{t: tt}, + } + pc.initRequestState() + pc.peerImpl = &pc + tt.conns[&pc] = struct{}{} + c.Assert(pc.onPeerSentHaveAll(), qt.IsNil) + c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}}) + pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false}) + c.Check(pc.peerMinPieces, qt.Equals, 6) + c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0) + c.Assert(pc.t.setInfo(&metainfo.Info{ + PieceLength: 0, + Pieces: make([]byte, pieceHash.Size()*7), + }), qt.IsNil) + pc.t.onSetInfo() + c.Check(tt.numPieces(), qt.Equals, 7) + c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{ + // The last element of the bitfield is irrelevant, as the Torrent actually only has 7 + // pieces. + {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0}, + }) +} + +func TestApplyRequestStateWriteBufferConstraints(t *testing.T) { + c := qt.New(t) + c.Check(interestedMsgLen, qt.Equals, 5) + c.Check(requestMsgLen, qt.Equals, 17) + c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue) + c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests) +} + +func peerConnForPreferredNetworkDirection( + localPeerId, remotePeerId int, + outgoing, utp, ipv6 bool, +) *PeerConn { + pc := PeerConn{} + pc.outgoing = outgoing + if utp { + pc.Network = "udp" + } + if ipv6 { + pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP("::420")} + } else { + pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)} + } + binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId)) + cl := Client{} + binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId)) + pc.t = &Torrent{cl: &cl} + return &pc +} + +func TestPreferredNetworkDirection(t *testing.T) { + pc := peerConnForPreferredNetworkDirection + c := qt.New(t) + + // Prefer outgoing to lower peer ID + + c.Check( + pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), + qt.IsFalse, + ) + c.Check( + pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)), + qt.IsTrue, + ) + c.Check( + pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)), + qt.IsFalse, + ) + + // Don't prefer uTP + c.Check( + pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), + qt.IsFalse, + ) + // Prefer IPv6 + c.Check( + pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)), + qt.IsFalse, + ) + // No difference + c.Check( + pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), + qt.IsFalse, + ) +} + +func TestReceiveLargeRequest(t *testing.T) { + c := qt.New(t) + cl := newTestingClient(t) + pc := cl.newConnection(nil, newConnectionOpts{network: "test"}) + tor := cl.newTorrentForTesting() + tor.info = &metainfo.Info{PieceLength: 3 << 20} + pc.setTorrent(tor) + tor._completedPieces.Add(0) + pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true) + pc.choking = false + pc.initMessageWriter() + req := Request{} + req.Length = defaultChunkSize + c.Assert(pc.fastEnabled(), qt.IsTrue) + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.peerRequests, qt.HasLen, 1) + req.Length = 2 << 20 + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.peerRequests, qt.HasLen, 2) + pc.peerRequests = nil + pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize) + req.Length = defaultChunkSize + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.peerRequests, qt.HasLen, 1) + req.Length = 2 << 20 + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17) +} + +func TestChunkOverflowsPiece(t *testing.T) { + c := qt.New(t) + check := func(begin, length, limit pp.Integer, expected bool) { + c.Check(chunkOverflowsPiece(ChunkSpec{begin, length}, limit), qt.Equals, expected) } + check(2, 3, 1, true) + check(2, pp.IntegerMax, 1, true) + check(2, pp.IntegerMax, 3, true) + check(2, pp.IntegerMax, pp.IntegerMax, true) + check(2, pp.IntegerMax-2, pp.IntegerMax, false) }