package torrent
import (
+ "encoding/binary"
+ "errors"
+ "fmt"
"io"
"net"
"sync"
"testing"
- "github.com/anacrolix/missinggo/pubsub"
"github.com/frankban/quicktest"
+ qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/require"
+ "golang.org/x/time/rate"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
var cl Client
cl.init(TestingConfig(t))
cl.initLogger()
- c := cl.newConnection(nil, false, nil, "io.Pipe", "")
+ c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"})
c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
t.Log(err)
}
r, w := io.Pipe()
- //c.r = r
+ // c.r = r
c.w = w
- c.startWriter()
+ c.startMessageWriter()
c.locker().Lock()
c.t._completedPieces.Add(1)
c.postBitfield( /*[]bool{false, true, false}*/ )
})
cl.initLogger()
ts := &torrentStorage{}
- t := &Torrent{
- cl: &cl,
- storage: &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}},
- pieceStateChanges: pubsub.NewPubSub(),
- }
+ t := cl.newTorrent(metainfo.Hash{}, nil)
+ t.initialPieceCheckDisabled = true
require.NoError(b, t.setInfo(&metainfo.Info{
Pieces: make([]byte, 20),
Length: 1 << 20,
PieceLength: 1 << 20,
}))
- t.setChunkSize(defaultChunkSize)
- t._pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
+ t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
+ t.onSetInfo()
+ t._pendingPieces.Add(0)
r, w := net.Pipe()
- cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r))
+ cn := cl.newConnection(r, newConnectionOpts{
+ outgoing: true,
+ remoteAddr: r.RemoteAddr(),
+ network: r.RemoteAddr().Network(),
+ connString: regularNetConnPeerConnConnString(r),
+ })
cn.setTorrent(t)
- mrlErr := make(chan error)
+ mrlErrChan := make(chan error)
msg := pp.Message{
Type: pp.Piece,
Piece: make([]byte, defaultChunkSize),
cl.lock()
err := cn.mainReadLoop()
if err != nil {
- mrlErr <- err
+ mrlErrChan <- err
}
- close(mrlErr)
+ close(mrlErrChan)
}()
wb := msg.MustMarshalBinary()
b.SetBytes(int64(len(msg.Piece)))
cl.lock()
// The chunk must be written to storage everytime, to ensure the
// writeSem is unlocked.
- t.pieces[0]._dirtyChunks.Clear()
+ t.pendAllChunkSpecs(0)
cn.validReceiveChunks = map[RequestIndex]int{
t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
}
panic(err)
}
}()
- c.Assert([]error{nil, io.EOF}, quicktest.Contains, <-mrlErr)
+ mrlErr := <-mrlErrChan
+ if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
+ c.Fatal(mrlErr)
+ }
c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
}
tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
)
- var testcases = []struct {
+ testcases := []struct {
conn *PeerConn
f pp.PexPeerFlags
}{
}
func TestConnPexEvent(t *testing.T) {
+ c := qt.New(t)
var (
udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
)
- var testcases = []struct {
+ testcases := []struct {
t pexEventType
c *PeerConn
e pexEvent
{
pexAdd,
&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
- pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp},
+ pexEvent{pexAdd, udpAddr.AddrPort(), pp.PexSupportsUtp, nil},
},
{
pexDrop,
- &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
- pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn},
+ &PeerConn{
+ Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true},
+ PeerListenPort: dialTcpAddr.Port,
+ },
+ pexEvent{pexDrop, tcpAddr.AddrPort(), pp.PexOutgoingConn, nil},
},
{
pexAdd,
- &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
- pexEvent{pexAdd, dialTcpAddr, 0},
+ &PeerConn{
+ Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()},
+ PeerListenPort: dialTcpAddr.Port,
+ },
+ pexEvent{pexAdd, dialTcpAddr.AddrPort(), 0, nil},
},
{
pexDrop,
- &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
- pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp},
+ &PeerConn{
+ Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()},
+ PeerListenPort: dialUdpAddr.Port,
+ },
+ pexEvent{pexDrop, dialUdpAddr.AddrPort(), pp.PexSupportsUtp, nil},
},
}
for i, tc := range testcases {
- e := tc.c.pexEvent(tc.t)
- require.EqualValues(t, tc.e, e, i)
+ c.Run(fmt.Sprintf("%v", i), func(c *qt.C) {
+ e, err := tc.c.pexEvent(tc.t)
+ c.Assert(err, qt.IsNil)
+ c.Check(e, qt.Equals, tc.e)
+ })
+ }
+}
+
+func TestHaveAllThenBitfield(t *testing.T) {
+ c := qt.New(t)
+ cl := newTestingClient(t)
+ tt := cl.newTorrentForTesting()
+ // cl.newConnection()
+ pc := PeerConn{
+ Peer: Peer{t: tt},
+ }
+ pc.initRequestState()
+ pc.peerImpl = &pc
+ tt.conns[&pc] = struct{}{}
+ c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
+ c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
+ pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
+ c.Check(pc.peerMinPieces, qt.Equals, 6)
+ c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
+ c.Assert(pc.t.setInfo(&metainfo.Info{
+ PieceLength: 0,
+ Pieces: make([]byte, pieceHash.Size()*7),
+ }), qt.IsNil)
+ pc.t.onSetInfo()
+ c.Check(tt.numPieces(), qt.Equals, 7)
+ c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
+ // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
+ // pieces.
+ {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
+ })
+}
+
+func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
+ c := qt.New(t)
+ c.Check(interestedMsgLen, qt.Equals, 5)
+ c.Check(requestMsgLen, qt.Equals, 17)
+ c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
+ c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
+}
+
+func peerConnForPreferredNetworkDirection(
+ localPeerId, remotePeerId int,
+ outgoing, utp, ipv6 bool,
+) *PeerConn {
+ pc := PeerConn{}
+ pc.outgoing = outgoing
+ if utp {
+ pc.Network = "udp"
+ }
+ if ipv6 {
+ pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP("::420")}
+ } else {
+ pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)}
+ }
+ binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId))
+ cl := Client{}
+ binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId))
+ pc.t = &Torrent{cl: &cl}
+ return &pc
+}
+
+func TestPreferredNetworkDirection(t *testing.T) {
+ pc := peerConnForPreferredNetworkDirection
+ c := qt.New(t)
+
+ // Prefer outgoing to lower peer ID
+
+ c.Check(
+ pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
+ qt.IsFalse,
+ )
+ c.Check(
+ pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)),
+ qt.IsTrue,
+ )
+ c.Check(
+ pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)),
+ qt.IsFalse,
+ )
+
+ // Don't prefer uTP
+ c.Check(
+ pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
+ qt.IsFalse,
+ )
+ // Prefer IPv6
+ c.Check(
+ pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)),
+ qt.IsFalse,
+ )
+ // No difference
+ c.Check(
+ pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)),
+ qt.IsFalse,
+ )
+}
+
+func TestReceiveLargeRequest(t *testing.T) {
+ c := qt.New(t)
+ cl := newTestingClient(t)
+ pc := cl.newConnection(nil, newConnectionOpts{network: "test"})
+ tor := cl.newTorrentForTesting()
+ tor.info = &metainfo.Info{PieceLength: 3 << 20}
+ pc.setTorrent(tor)
+ tor._completedPieces.Add(0)
+ pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true)
+ pc.choking = false
+ pc.initMessageWriter()
+ req := Request{}
+ req.Length = defaultChunkSize
+ c.Assert(pc.fastEnabled(), qt.IsTrue)
+ c.Check(pc.onReadRequest(req, false), qt.IsNil)
+ c.Check(pc.peerRequests, qt.HasLen, 1)
+ req.Length = 2 << 20
+ c.Check(pc.onReadRequest(req, false), qt.IsNil)
+ c.Check(pc.peerRequests, qt.HasLen, 2)
+ pc.peerRequests = nil
+ pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize)
+ req.Length = defaultChunkSize
+ c.Check(pc.onReadRequest(req, false), qt.IsNil)
+ c.Check(pc.peerRequests, qt.HasLen, 1)
+ req.Length = 2 << 20
+ c.Check(pc.onReadRequest(req, false), qt.IsNil)
+ c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17)
+}
+
+func TestChunkOverflowsPiece(t *testing.T) {
+ c := qt.New(t)
+ check := func(begin, length, limit pp.Integer, expected bool) {
+ c.Check(chunkOverflowsPiece(ChunkSpec{begin, length}, limit), qt.Equals, expected)
}
+ check(2, 3, 1, true)
+ check(2, pp.IntegerMax, 1, true)
+ check(2, pp.IntegerMax, 3, true)
+ check(2, pp.IntegerMax, pp.IntegerMax, true)
+ check(2, pp.IntegerMax-2, pp.IntegerMax, false)
}