From 94bb5d40ba35c977755adadf3dcf754c783e6a11 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 1 Dec 2021 14:38:47 +1100 Subject: [PATCH] Implement piece request ordering with retained state --- client.go | 2 + piece.go | 2 + request-strategy/order.go | 112 +++++++++++++++++------- request-strategy/piece-request-order.go | 88 +++++++++++++++++++ requesting.go | 41 ++++++--- torrent-piece-request-order.go | 7 ++ torrent.go | 34 +++++++ 7 files changed, 243 insertions(+), 43 deletions(-) create mode 100644 request-strategy/piece-request-order.go create mode 100644 torrent-piece-request-order.go diff --git a/client.go b/client.go index c491118b..8a686767 100644 --- a/client.go +++ b/client.go @@ -27,6 +27,7 @@ import ( "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/sync" + request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/davecgh/go-spew/spew" "github.com/dustin/go-humanize" "github.com/google/btree" @@ -74,6 +75,7 @@ type Client struct { dopplegangerAddrs map[string]struct{} badPeerIPs map[string]struct{} torrents map[InfoHash]*Torrent + pieceRequestOrder map[storage.TorrentCapacity]*request_strategy.PieceRequestOrder acceptLimiter map[ipStr]int dialRateLimiter *rate.Limiter diff --git a/piece.go b/piece.go index bef5f59e..6caa7628 100644 --- a/piece.go +++ b/piece.go @@ -77,11 +77,13 @@ func (p *Piece) numDirtyChunks() chunkIndexType { func (p *Piece) unpendChunkIndex(i chunkIndexType) { p.t.dirtyChunks.Add(p.requestIndexOffset() + i) + p.t.updatePieceRequestOrder(p.index) p.readerCond.Broadcast() } func (p *Piece) pendChunkIndex(i RequestIndex) { p.t.dirtyChunks.Remove(p.requestIndexOffset() + i) + p.t.updatePieceRequestOrder(p.index) } func (p *Piece) numChunks() chunkIndexType { diff --git a/request-strategy/order.go b/request-strategy/order.go index 91dfc74d..cacd7f96 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -3,12 +3,14 @@ package request_strategy import ( "bytes" "expvar" + "log" "runtime" "sort" "sync" "github.com/anacrolix/multiless" "github.com/anacrolix/torrent/metainfo" + "github.com/google/btree" "github.com/anacrolix/torrent/types" ) @@ -60,6 +62,15 @@ func (me pieceSorter) Swap(i, j int) { func (me pieceSorter) Less(_i, _j int) bool { i := me.get(_i) j := me.get(_j) + return pieceOrderLess(i.toPieceOrderInput(), j.toPieceOrderInput()).MustLess() +} + +type pieceOrderInput struct { + PieceRequestOrderState + PieceRequestOrderKey +} + +func pieceOrderLess(i, j pieceOrderInput) multiless.Computation { return multiless.New().Int( int(j.Priority), int(i.Priority), ).Bool( @@ -67,13 +78,13 @@ func (me pieceSorter) Less(_i, _j int) bool { ).Int64( i.Availability, j.Availability, ).Int( - i.index, j.index, + i.Index, j.Index, ).Lazy(func() multiless.Computation { return multiless.New().Cmp(bytes.Compare( - i.t.InfoHash[:], - j.t.InfoHash[:], + i.InfoHash[:], + j.InfoHash[:], )) - }).MustLess() + }) } type requestsPeer struct { @@ -120,6 +131,15 @@ type filterPiece struct { *Piece } +func (fp *filterPiece) toPieceOrderInput() (ret pieceOrderInput) { + ret.Partial = fp.Partial + ret.InfoHash = fp.t.InfoHash + ret.Availability = fp.Availability + ret.Priority = fp.Priority + ret.Index = fp.index + return +} + var ( sortsMu sync.Mutex sorts = map[*[]filterPiece][]int{} @@ -186,12 +206,45 @@ type pieceOrderingFinalizer struct { } // Calls f with requestable pieces in order. -func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) { - maxPieces := 0 - for i := range input.Torrents { - maxPieces += len(input.Torrents[i].Pieces) +func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(t *Torrent, p *Piece, pieceIndex int)) { + if false { + maxPieces := 0 + for i := range input.Torrents { + maxPieces += len(input.Torrents[i].Pieces) + } + pieces := make([]filterPiece, 0, maxPieces) + for _t := range input.Torrents { + // TODO: We could do metainfo requests here. + t := &input.Torrents[_t] + for i := range t.Pieces { + pieces = append(pieces, filterPiece{ + t: &input.Torrents[_t], + index: i, + Piece: &t.Pieces[i], + }) + } + } + pieces = getSortedFilterPieces(pieces) + { + if len(pieces) != pro.tree.Len() { + panic("length doesn't match") + } + pieces := pieces + pro.tree.Ascend(func(i btree.Item) bool { + _i := i.(pieceRequestOrderItem) + ii := pieceOrderInput{ + _i.state, + _i.key, + } + if pieces[0].toPieceOrderInput() != ii { + panic(_i) + } + pieces = pieces[1:] + return true + }) + } + log.Printf("%v pieces passed", len(pieces)) } - pieces := make([]filterPiece, 0, maxPieces) // Storage capacity left for this run, keyed by the storage capacity pointer on the storage // TorrentImpl. A nil value means no capacity limit. var storageLeft *int64 @@ -199,42 +252,41 @@ func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex i storageLeft = new(int64) *storageLeft = *input.Capacity } - for _t := range input.Torrents { - // TODO: We could do metainfo requests here. - t := &input.Torrents[_t] - for i := range t.Pieces { - pieces = append(pieces, filterPiece{ - t: &input.Torrents[_t], - index: i, - Piece: &t.Pieces[i], - }) - } - } - pieces = getSortedFilterPieces(pieces) var allTorrentsUnverifiedBytes int64 torrentUnverifiedBytes := map[metainfo.Hash]int64{} - for _, piece := range pieces { + pro.tree.Ascend(func(i btree.Item) bool { + _i := i.(pieceRequestOrderItem) + var piece *Piece + var t Torrent + for _, t = range input.Torrents { + if t.InfoHash == _i.key.InfoHash { + piece = &t.Pieces[_i.key.Index] + break + } + } if left := storageLeft; left != nil { if *left < piece.Length { - continue + return true } *left -= piece.Length } if !piece.Request || piece.NumPendingChunks == 0 { // TODO: Clarify exactly what is verified. Stuff that's being hashed should be // considered unverified and hold up further requests. - continue + + return true } - if piece.t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[piece.t.InfoHash]+piece.Length > piece.t.MaxUnverifiedBytes { - continue + if t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[t.InfoHash]+piece.Length > t.MaxUnverifiedBytes { + return true } if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes { - continue + return true } - torrentUnverifiedBytes[piece.t.InfoHash] += piece.Length + torrentUnverifiedBytes[t.InfoHash] += piece.Length allTorrentsUnverifiedBytes += piece.Length - f(piece.t, piece.Piece, piece.index) - } + f(&t, piece, _i.key.Index) + return true + }) return } diff --git a/request-strategy/piece-request-order.go b/request-strategy/piece-request-order.go new file mode 100644 index 00000000..efe75604 --- /dev/null +++ b/request-strategy/piece-request-order.go @@ -0,0 +1,88 @@ +package request_strategy + +import ( + "github.com/anacrolix/torrent/metainfo" + "github.com/google/btree" +) + +func NewPieceOrder() *PieceRequestOrder { + return &PieceRequestOrder{ + tree: btree.New(32), + keys: make(map[PieceRequestOrderKey]PieceRequestOrderState), + } +} + +type PieceRequestOrder struct { + tree *btree.BTree + keys map[PieceRequestOrderKey]PieceRequestOrderState +} + +type PieceRequestOrderKey struct { + InfoHash metainfo.Hash + Index int +} + +type PieceRequestOrderState struct { + Priority piecePriority + Partial bool + Availability int64 +} + +type pieceRequestOrderItem struct { + key PieceRequestOrderKey + state PieceRequestOrderState +} + +func (me pieceRequestOrderItem) Less(other btree.Item) bool { + otherConcrete := other.(pieceRequestOrderItem) + return pieceOrderLess( + pieceOrderInput{ + PieceRequestOrderState: me.state, + PieceRequestOrderKey: me.key, + }, + pieceOrderInput{ + PieceRequestOrderState: otherConcrete.state, + PieceRequestOrderKey: otherConcrete.key, + }, + ).Less() +} + +func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrderState) { + if _, ok := me.keys[key]; ok { + panic(key) + } + if me.tree.ReplaceOrInsert(pieceRequestOrderItem{ + key: key, + state: state, + }) != nil { + panic("shouldn't already have this") + } + me.keys[key] = state +} + +func (me *PieceRequestOrder) Update(key PieceRequestOrderKey, state PieceRequestOrderState) { + if me.tree.Delete(me.existingItemForKey(key)) == nil { + panic(key) + } + if me.tree.ReplaceOrInsert(pieceRequestOrderItem{ + key: key, + state: state, + }) != nil { + panic(key) + } + me.keys[key] = state +} + +func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceRequestOrderItem { + return pieceRequestOrderItem{ + key: key, + state: me.keys[key], + } +} + +func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) { + if me.tree.Delete(me.existingItemForKey(key)) == nil { + panic(key) + } + delete(me.keys, key) +} diff --git a/requesting.go b/requesting.go index 5ad98ea8..147e574d 100644 --- a/requesting.go +++ b/requesting.go @@ -27,9 +27,11 @@ func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input reques input.Capacity = &cap } } - if input.Capacity == nil { - input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()} - return + if false { + if input.Capacity == nil { + input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()} + return + } } input.Torrents = make([]request_strategy.Torrent, 0, len(cl.torrents)) for _, t := range cl.torrents { @@ -58,20 +60,32 @@ func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent { } rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces)) for i := range t.pieces { - p := &t.pieces[i] - rst.Pieces = append(rst.Pieces, request_strategy.Piece{ - Request: !t.ignorePieceForRequests(i), - Priority: p.purePriority(), - Partial: t.piecePartiallyDownloaded(i), - Availability: p.availability, - Length: int64(p.length()), - NumPendingChunks: int(t.pieceNumPendingChunks(i)), - IterPendingChunks: &p.undirtiedChunksIter, - }) + rst.Pieces = append(rst.Pieces, t.makeRequestStrategyPiece(i)) } return rst } +func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState { + return request_strategy.PieceRequestOrderState{ + Priority: t.piece(i).purePriority(), + Partial: t.piecePartiallyDownloaded(i), + Availability: t.piece(i).availability, + } +} + +func (t *Torrent) makeRequestStrategyPiece(i int) request_strategy.Piece { + p := &t.pieces[i] + return request_strategy.Piece{ + Request: !t.ignorePieceForRequests(i), + Priority: p.purePriority(), + Partial: t.piecePartiallyDownloaded(i), + Availability: p.availability, + Length: int64(p.length()), + NumPendingChunks: int(t.pieceNumPendingChunks(i)), + IterPendingChunks: &p.undirtiedChunksIter, + } +} + func init() { gob.Register(peerId{}) } @@ -205,6 +219,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { } request_strategy.GetRequestablePieces( input, + p.t.cl.pieceRequestOrder[p.t.storage.Capacity], func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) { if t.InfoHash != p.t.infoHash { return diff --git a/torrent-piece-request-order.go b/torrent-piece-request-order.go new file mode 100644 index 00000000..e1ab9670 --- /dev/null +++ b/torrent-piece-request-order.go @@ -0,0 +1,7 @@ +package torrent + +func (t *Torrent) updatePieceRequestOrder(pieceIndex int) { + t.cl.pieceRequestOrder[t.storage.Capacity].Update( + t.pieceRequestOrderKey(pieceIndex), + t.requestStrategyPieceOrderState(pieceIndex)) +} diff --git a/torrent.go b/torrent.go index 7ccf13f6..c4ecf07f 100644 --- a/torrent.go +++ b/torrent.go @@ -27,6 +27,7 @@ import ( "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/multiless" "github.com/anacrolix/sync" + request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/davecgh/go-spew/spew" "github.com/pion/datachannel" @@ -165,6 +166,7 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) { panic(p.availability) } p.availability-- + t.updatePieceRequestOrder(i) } func (t *Torrent) incPieceAvailability(i pieceIndex) { @@ -172,6 +174,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) { if t.haveInfo() { p := t.piece(i) p.availability++ + t.updatePieceRequestOrder(i) } } @@ -424,8 +427,21 @@ func (t *Torrent) setInfo(info *metainfo.Info) error { return nil } +func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey { + return request_strategy.PieceRequestOrderKey{ + InfoHash: t.infoHash, + Index: i, + } +} + // This seems to be all the follow-up tasks after info is set, that can't fail. func (t *Torrent) onSetInfo() { + if t.cl.pieceRequestOrder == nil { + t.cl.pieceRequestOrder = make(map[storage.TorrentCapacity]*request_strategy.PieceRequestOrder) + } + if t.cl.pieceRequestOrder[t.storage.Capacity] == nil { + t.cl.pieceRequestOrder[t.storage.Capacity] = request_strategy.NewPieceOrder() + } for i := range t.pieces { p := &t.pieces[i] // Need to add availability before updating piece completion, as that may result in conns @@ -434,6 +450,9 @@ func (t *Torrent) onSetInfo() { panic(p.availability) } p.availability = int64(t.pieceAvailabilityFromPeers(i)) + t.cl.pieceRequestOrder[t.storage.Capacity].Add( + t.pieceRequestOrderKey(i), + t.requestStrategyPieceOrderState(i)) t.updatePieceCompletion(pieceIndex(i)) if !t.initialPieceCheckDisabled && !p.storageCompletionOk { // t.logger.Printf("piece %s completion unknown, queueing check", p) @@ -797,6 +816,12 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) { return pieceIndex(t._completedPieces.GetCardinality()) } +func (t *Torrent) deletePieceRequestOrder() { + for i := 0; i < t.numPieces(); i++ { + t.cl.pieceRequestOrder[t.storage.Capacity].Delete(t.pieceRequestOrderKey(i)) + } +} + func (t *Torrent) close(wg *sync.WaitGroup) (err error) { t.closed.Set() if t.storage != nil { @@ -816,6 +841,9 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) { t.iterPeers(func(p *Peer) { p.close() }) + if t.storage != nil { + t.deletePieceRequestOrder() + } t.pex.Reset() t.cl.event.Broadcast() t.pieceStateChanges.Close() @@ -1102,6 +1130,11 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) { } func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) { + if !t.closed.IsSet() { + // It would be possible to filter on pure-priority changes here to avoid churning the piece + // request order. + t.updatePieceRequestOrder(piece) + } p := &t.pieces[piece] newPrio := p.uncachedPriority() // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio) @@ -1238,6 +1271,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool { } else { t._completedPieces.Remove(x) } + p.t.updatePieceRequestOrder(piece) t.updateComplete() if complete && len(p.dirtiers) != 0 { t.logger.Printf("marked piece %v complete but still has dirtiers", piece) -- 2.48.1