From 67ed5d0032d73db95fe72933fa0d788674eb0d3e Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 29 Nov 2021 13:07:18 +1100 Subject: [PATCH] Partition piece request strategy by storage capacity key --- request-strategy/order.go | 36 ++++++++--------- request-strategy/torrent.go | 6 +-- requesting.go | 77 +++++++++++++++++++++++-------------- 3 files changed, 66 insertions(+), 53 deletions(-) diff --git a/request-strategy/order.go b/request-strategy/order.go index 1e422868..9fb45002 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/anacrolix/multiless" - "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/types" ) @@ -26,8 +25,6 @@ type ClientPieceOrder struct{} type filterTorrent struct { *Torrent unverifiedBytes int64 - // Potentially shared with other torrents. - storageLeft *int64 } func sortFilterPieces(pieces []filterPiece) { @@ -104,25 +101,17 @@ func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex i pieces := make([]filterPiece, 0, maxPieces) // Storage capacity left for this run, keyed by the storage capacity pointer on the storage // TorrentImpl. A nil value means no capacity limit. - storageLeft := make(map[storage.TorrentCapacity]*int64) + var storageLeft *int64 + if input.Capacity != nil { + storageLeft = new(int64) + *storageLeft = *input.Capacity + } for _t := range input.Torrents { // TODO: We could do metainfo requests here. t := &filterTorrent{ Torrent: &input.Torrents[_t], unverifiedBytes: 0, } - key := t.Capacity - if key != nil { - if _, ok := storageLeft[key]; !ok { - capacity, ok := (*key)() - if ok { - storageLeft[key] = &capacity - } else { - storageLeft[key] = nil - } - } - t.storageLeft = storageLeft[key] - } for i := range t.Pieces { pieces = append(pieces, filterPiece{ t: t, @@ -134,11 +123,11 @@ func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex i sortFilterPieces(pieces) var allTorrentsUnverifiedBytes int64 for _, piece := range pieces { - if left := piece.t.storageLeft; left != nil { - if *left < int64(piece.Length) { + if left := storageLeft; left != nil { + if *left < piece.Length { continue } - *left -= int64(piece.Length) + *left -= piece.Length } if !piece.Request || piece.NumPendingChunks == 0 { // TODO: Clarify exactly what is verified. Stuff that's being hashed should be @@ -159,7 +148,14 @@ func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex i } type Input struct { - Torrents []Torrent + // This is all torrents that share the same capacity below (or likely a single torrent if there + // is infinite capacity, since you could just run it separately for each Torrent if that's the + // case). + Torrents []Torrent + // Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents + // that share the same capacity key must be incorporated in piece ordering. + Capacity *int64 + // Across all the Torrents. This might be partitioned by storage capacity key now. MaxUnverifiedBytes int64 } diff --git a/request-strategy/torrent.go b/request-strategy/torrent.go index f547e1d7..dbb41df3 100644 --- a/request-strategy/torrent.go +++ b/request-strategy/torrent.go @@ -2,15 +2,13 @@ package request_strategy import ( "github.com/anacrolix/torrent/metainfo" - "github.com/anacrolix/torrent/storage" ) type Torrent struct { - Pieces []Piece - Capacity storage.TorrentCapacity + Pieces []Piece // Some value that's unique and stable between runs. InfoHash metainfo.Hash ChunksPerPiece uint32 - + // TODO: This isn't actually configurable anywhere yet. MaxUnverifiedBytes int64 } diff --git a/requesting.go b/requesting.go index a330df53..5ad98ea8 100644 --- a/requesting.go +++ b/requesting.go @@ -16,41 +16,60 @@ import ( request_strategy "github.com/anacrolix/torrent/request-strategy" ) -func (cl *Client) getRequestStrategyInput() request_strategy.Input { - ts := make([]request_strategy.Torrent, 0, len(cl.torrents)) +// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent. +func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) { + input.MaxUnverifiedBytes = cl.config.MaxUnverifiedBytes + if !primaryTorrent.haveInfo() { + return + } + if capFunc := primaryTorrent.storage.Capacity; capFunc != nil { + if cap, ok := (*capFunc)(); ok { + input.Capacity = &cap + } + } + if input.Capacity == nil { + input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()} + return + } + input.Torrents = make([]request_strategy.Torrent, 0, len(cl.torrents)) for _, t := range cl.torrents { if !t.haveInfo() { - // This would be removed if metadata is handled here. We have to guard against not - // knowing the piece size. If we have no info, we have no pieces too, so the end result - // is the same. + // This would be removed if metadata is handled here. Determining chunks per piece + // requires the info. If we have no info, we have no pieces too, so the end result is + // the same. continue } - rst := request_strategy.Torrent{ - InfoHash: t.infoHash, - ChunksPerPiece: t.chunksPerRegularPiece(), - } - if t.storage != nil { - rst.Capacity = t.storage.Capacity - } - rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces)) - for i := range t.pieces { - p := &t.pieces[i] - rst.Pieces = append(rst.Pieces, request_strategy.Piece{ - Request: !t.ignorePieceForRequests(i), - Priority: p.purePriority(), - Partial: t.piecePartiallyDownloaded(i), - Availability: p.availability, - Length: int64(p.length()), - NumPendingChunks: int(t.pieceNumPendingChunks(i)), - IterPendingChunks: &p.undirtiedChunksIter, - }) + if t.storage.Capacity != primaryTorrent.storage.Capacity { + continue } - ts = append(ts, rst) + input.Torrents = append(input.Torrents, t.requestStrategyTorrentInput()) + } + return +} + +func (t *Torrent) getRequestStrategyInput() request_strategy.Input { + return t.cl.getRequestStrategyInput(t) +} + +func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent { + rst := request_strategy.Torrent{ + InfoHash: t.infoHash, + ChunksPerPiece: t.chunksPerRegularPiece(), } - return request_strategy.Input{ - Torrents: ts, - MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes, + rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces)) + for i := range t.pieces { + p := &t.pieces[i] + rst.Pieces = append(rst.Pieces, request_strategy.Piece{ + Request: !t.ignorePieceForRequests(i), + Priority: p.purePriority(), + Partial: t.piecePartiallyDownloaded(i), + Availability: p.availability, + Length: int64(p.length()), + NumPendingChunks: int(t.pieceNumPendingChunks(i)), + IterPendingChunks: &p.undirtiedChunksIter, + }) } + return rst } func init() { @@ -173,7 +192,7 @@ type desiredRequestState struct { } func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { - input := p.t.cl.getRequestStrategyInput() + input := p.t.getRequestStrategyInput() requestHeap := peerRequests{ peer: p, } -- 2.44.0