From ad298364aa93175dfa86e08bc5614ddf6ed530a4 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 14 May 2021 13:40:09 +1000 Subject: [PATCH] Add client-level max unverified bytes --- cmd/torrent/main.go | 28 +++++++++++++++------------- config.go | 2 ++ request-strategy.go | 8 +++++--- request-strategy/order.go | 21 +++++++++++++++++---- request-strategy/order_test.go | 24 ++++++++++++------------ 5 files changed, 51 insertions(+), 32 deletions(-) diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go index 02020e77..374f8b1a 100644 --- a/cmd/torrent/main.go +++ b/cmd/torrent/main.go @@ -169,19 +169,20 @@ var flags struct { type SpewBencodingCmd struct{} type DownloadCmd struct { - Mmap bool `help:"memory-map torrent data"` - TestPeer []string `help:"addresses of some starting peers"` - Seed bool `help:"seed after download is complete"` - Addr string `help:"network listen addr"` - UploadRate *tagflag.Bytes `help:"max piece bytes to send per second"` - DownloadRate *tagflag.Bytes `help:"max bytes per second down from peers"` - PackedBlocklist string - PublicIP net.IP - Progress bool `default:"true"` - PieceStates bool - Quiet bool `help:"discard client logging"` - Stats *bool `help:"print stats at termination"` - Dht bool `default:"true"` + Mmap bool `help:"memory-map torrent data"` + TestPeer []string `help:"addresses of some starting peers"` + Seed bool `help:"seed after download is complete"` + Addr string `help:"network listen addr"` + MaxUnverifiedBytes tagflag.Bytes `help:"maximum number bytes to have pending verification"` + UploadRate *tagflag.Bytes `help:"max piece bytes to send per second"` + DownloadRate *tagflag.Bytes `help:"max bytes per second down from peers"` + PackedBlocklist string + PublicIP net.IP + Progress bool `default:"true"` + PieceStates bool + Quiet bool `help:"discard client logging"` + Stats *bool `help:"print stats at termination"` + Dht bool `default:"true"` TcpPeers bool `default:"true"` UtpPeers bool `default:"true"` @@ -311,6 +312,7 @@ func downloadErr() error { if flags.Quiet { clientConfig.Logger = log.Discard } + clientConfig.MaxUnverifiedBytes = flags.MaxUnverifiedBytes.Int64() var stop missinggo.SynchronizedEvent defer func() { diff --git a/config.go b/config.go index 5aeef38b..37b2b714 100644 --- a/config.go +++ b/config.go @@ -59,6 +59,8 @@ type ClientConfig struct { // (~4096), and the requested chunk size (~16KiB, see // TorrentSpec.ChunkSize). DownloadRateLimiter *rate.Limiter + // Maximum unverified bytes across all torrents. Not used if zero. + MaxUnverifiedBytes int64 // User-provided Client peer ID. If not present, one is generated automatically. PeerID string diff --git a/request-strategy.go b/request-strategy.go index 8d8b3b23..7c7660bc 100644 --- a/request-strategy.go +++ b/request-strategy.go @@ -27,8 +27,7 @@ func (cl *Client) doRequests() { ts := make([]request_strategy.Torrent, 0, len(cl.torrents)) for _, t := range cl.torrents { rst := request_strategy.Torrent{ - StableId: uintptr(unsafe.Pointer(t)), - MaxUnverifiedBytes: 10 << 20, + StableId: uintptr(unsafe.Pointer(t)), } if t.storage != nil { rst.Capacity = t.storage.Capacity @@ -72,7 +71,10 @@ func (cl *Client) doRequests() { }) ts = append(ts, rst) } - nextPeerStates := cl.pieceRequestOrder.DoRequests(ts) + nextPeerStates := request_strategy.Run(request_strategy.Input{ + Torrents: ts, + MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes, + }) for p, state := range nextPeerStates { applyPeerNextRequestState(p, state) } diff --git a/request-strategy/order.go b/request-strategy/order.go index 9052fb0c..54bb5285 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -84,12 +84,12 @@ type filterPiece struct { Piece } -func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) { +func getRequestablePieces(input Input) (ret []requestablePiece) { // Storage capacity left for this run, keyed by the storage capacity pointer on the storage // TorrentImpl. storageLeft := make(map[*func() *int64]*int64) var pieces []filterPiece - for _, _t := range torrents { + for _, _t := range input.Torrents { // TODO: We could do metainfo requests here. t := &filterTorrent{ Torrent: _t, @@ -111,6 +111,7 @@ func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) { } } sortFilterPieces(pieces) + var allTorrentsUnverifiedBytes int64 for _, piece := range pieces { if left := piece.t.storageLeft; left != nil { if *left < int64(piece.Length) { @@ -119,12 +120,18 @@ func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) { *left -= int64(piece.Length) } if !piece.Request || piece.NumPendingChunks == 0 { + // TODO: Clarify exactly what is verified. Stuff that's being hashed should be + // considered unverified and hold up further requests. continue } if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes { continue } + if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes { + continue + } piece.t.unverifiedBytes += piece.Length + allTorrentsUnverifiedBytes += piece.Length ret = append(ret, requestablePiece{ index: piece.index, t: piece.t.Torrent, @@ -135,9 +142,15 @@ func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) { return } +type Input struct { + Torrents []Torrent + MaxUnverifiedBytes int64 +} + // TODO: We could do metainfo requests here. -func (requestOrder *ClientPieceOrder) DoRequests(torrents []Torrent) map[PeerId]PeerNextRequestState { - requestPieces := getRequestablePieces(torrents) +func Run(input Input) map[PeerId]PeerNextRequestState { + requestPieces := getRequestablePieces(input) + torrents := input.Torrents allPeers := make(map[uintptr][]*requestsPeer, len(torrents)) for _, t := range torrents { peers := make([]*requestsPeer, 0, len(t.Peers)) diff --git a/request-strategy/order_test.go b/request-strategy/order_test.go index fd8b53f0..adc0c478 100644 --- a/request-strategy/order_test.go +++ b/request-strategy/order_test.go @@ -45,7 +45,6 @@ func (i intPeerId) Uintptr() uintptr { func TestStealingFromSlowerPeer(t *testing.T) { c := qt.New(t) - order := ClientPieceOrder{} basePeer := Peer{ HasPiece: func(i pieceIndex) bool { return true @@ -64,7 +63,7 @@ func TestStealingFromSlowerPeer(t *testing.T) { firstStealer.Id = intPeerId(2) secondStealer := basePeer secondStealer.Id = intPeerId(3) - results := order.DoRequests([]Torrent{{ + results := Run(Input{Torrents: []Torrent{{ Pieces: []Piece{{ Request: true, NumPendingChunks: 5, @@ -75,7 +74,8 @@ func TestStealingFromSlowerPeer(t *testing.T) { firstStealer, secondStealer, }, - }}) + }}}) + c.Assert(results, qt.HasLen, 3) check := func(p PeerId, l int) { c.Check(results[p].Requests, qt.HasLen, l) @@ -93,7 +93,6 @@ func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num int, in func TestStealingFromSlowerPeersBasic(t *testing.T) { c := qt.New(t) - order := ClientPieceOrder{} basePeer := Peer{ HasPiece: func(i pieceIndex) bool { return true @@ -111,7 +110,7 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) { firstStealer.Id = intPeerId(2) secondStealer := basePeer secondStealer.Id = intPeerId(3) - results := order.DoRequests([]Torrent{{ + results := Run(Input{Torrents: []Torrent{{ Pieces: []Piece{{ Request: true, NumPendingChunks: 2, @@ -122,7 +121,8 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) { firstStealer, secondStealer, }, - }}) + }}}) + checkNumRequestsAndInterest(c, results[firstStealer.Id], 1, true) checkNumRequestsAndInterest(c, results[secondStealer.Id], 1, true) checkNumRequestsAndInterest(c, results[stealee.Id], 0, false) @@ -130,7 +130,6 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) { func TestPeerKeepsExistingIfReasonable(t *testing.T) { c := qt.New(t) - order := ClientPieceOrder{} basePeer := Peer{ HasPiece: func(i pieceIndex) bool { return true @@ -150,7 +149,7 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) { firstStealer.Id = intPeerId(2) secondStealer := basePeer secondStealer.Id = intPeerId(3) - results := order.DoRequests([]Torrent{{ + results := Run(Input{Torrents: []Torrent{{ Pieces: []Piece{{ Request: true, NumPendingChunks: 4, @@ -161,7 +160,8 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) { firstStealer, secondStealer, }, - }}) + }}}) + c.Assert(results, qt.HasLen, 3) check := func(p PeerId, l int) { c.Check(results[p].Requests, qt.HasLen, l) @@ -177,7 +177,6 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) { func TestDontStealUnnecessarily(t *testing.T) { c := qt.New(t) - order := ClientPieceOrder{} basePeer := Peer{ HasPiece: func(i pieceIndex) bool { return true @@ -198,7 +197,7 @@ func TestDontStealUnnecessarily(t *testing.T) { firstStealer.Id = intPeerId(2) secondStealer := basePeer secondStealer.Id = intPeerId(3) - results := order.DoRequests([]Torrent{{ + results := Run(Input{Torrents: []Torrent{{ Pieces: []Piece{{ Request: true, NumPendingChunks: 9, @@ -209,7 +208,8 @@ func TestDontStealUnnecessarily(t *testing.T) { stealee, secondStealer, }, - }}) + }}}) + c.Assert(results, qt.HasLen, 3) check := func(p PeerId, l int) { c.Check(results[p].Requests, qt.HasLen, l) -- 2.44.0