From: Matt Joiner Date: Mon, 28 Apr 2025 01:05:44 +0000 (+1000) Subject: Update requests when piece order changes if there's a storage cap X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=67fe9f415595493c586d8e8575082e4515443f69;p=btrtrc.git Update requests when piece order changes if there's a storage cap --- diff --git a/request-strategy-impls_test.go b/request-strategy-impls_test.go index aab3b34e..daa52217 100644 --- a/request-strategy-impls_test.go +++ b/request-strategy-impls_test.go @@ -108,7 +108,7 @@ func BenchmarkRequestStrategy(b *testing.B) { peer.onPeerHasAllPiecesNoTriggers() for i := 0; i < tor.numPieces(); i++ { tor.pieces[i].priority.Raise(PiecePriorityNormal) - tor.updatePiecePriorityNoTriggers(i) + tor.updatePiecePriorityNoRequests(i) } peer.peerChoking = false //b.StopTimer() diff --git a/request-strategy/piece-request-order.go b/request-strategy/piece-request-order.go index 7f271ee3..5f152466 100644 --- a/request-strategy/piece-request-order.go +++ b/request-strategy/piece-request-order.go @@ -1,9 +1,10 @@ package requestStrategy import ( - g "github.com/anacrolix/generics" "iter" + g "github.com/anacrolix/generics" + "github.com/anacrolix/torrent/metainfo" ) diff --git a/requesting.go b/requesting.go index 8b57b21d..7d32012e 100644 --- a/requesting.go +++ b/requesting.go @@ -26,6 +26,7 @@ type ( ) func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState { + t.slogger().Debug("requestStrategyPieceOrderState", "pieceIndex", i) return requestStrategy.PieceRequestOrderState{ Priority: t.piece(i).purePriority(), Partial: t.piecePartiallyDownloaded(i), @@ -185,6 +186,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { requestIndexes: t.requestIndexes, } clear(requestHeap.pieceStates) + t.logPieceRequestOrder() // Caller-provided allocation for roaring bitmap iteration. var it typedRoaring.Iterator[RequestIndex] requestStrategy.GetRequestablePieces( @@ -242,6 +244,7 @@ func (p *Peer) maybeUpdateActualRequestState() { panic(since) } } + p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate) pprof.Do( context.Background(), pprof.Labels("update request", string(p.needRequestUpdate)), diff --git a/torrent-piece-request-order.go b/torrent-piece-request-order.go index ac2b8cc1..eb4e00d7 100644 --- a/torrent-piece-request-order.go +++ b/torrent-piece-request-order.go @@ -7,24 +7,27 @@ import ( "github.com/anacrolix/torrent/storage" ) -func (t *Torrent) updatePieceRequestOrderPiece(pieceIndex int) { +// It's probably possible to track whether the piece moves around in the btree to be more efficient +// about triggering request updates. +func (t *Torrent) updatePieceRequestOrderPiece(pieceIndex int) (changed bool) { if t.storage == nil { - return + return false } pro, ok := t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()] if !ok { - return + return false } key := t.pieceRequestOrderKey(pieceIndex) if t.hasStorageCap() { - pro.Update(key, t.requestStrategyPieceOrderState(pieceIndex)) - return + return pro.Update(key, t.requestStrategyPieceOrderState(pieceIndex)) } pending := !t.ignorePieceForRequests(pieceIndex) if pending { - pro.Add(key, t.requestStrategyPieceOrderState(pieceIndex)) + newState := t.requestStrategyPieceOrderState(pieceIndex) + old := pro.Add(key, newState) + return old.Ok && old.Value != newState } else { - pro.Delete(key) + return pro.Delete(key) } } diff --git a/torrent.go b/torrent.go index 3903e0ec..da83d107 100644 --- a/torrent.go +++ b/torrent.go @@ -10,6 +10,7 @@ import ( "hash" "io" "iter" + "log/slog" "maps" "math/rand" "net/netip" @@ -180,6 +181,7 @@ type Torrent struct { requestPieceStates []g.Option[request_strategy.PieceRequestOrderState] requestIndexes []RequestIndex + // Disable actions after updating piece priorities, for benchmarking. disableTriggers bool } @@ -1435,37 +1437,39 @@ func (t *Torrent) maybeNewConns() { t.openNewConns() } -func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason updateRequestReason) { - if t._pendingPieces.Contains(uint32(piece)) { - t.iterPeers(func(c *Peer) { - // if c.requestState.Interested { - // return - // } - if !c.isLowOnRequests() { - return - } - if !c.peerHasPiece(piece) { - return - } - if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) { - return - } - c.updateRequests(reason) - }) +func (t *Torrent) updatePeerRequestsForPiece(piece pieceIndex, reason updateRequestReason) { + if !t._pendingPieces.Contains(uint32(piece)) { + // Non-pending pieces are usually cancelled more synchronously. + return } + t.iterPeers(func(c *Peer) { + // if c.requestState.Interested { + // return + // } + if !c.isLowOnRequests() { + return + } + if !c.peerHasPiece(piece) { + return + } + if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) { + return + } + c.updateRequests(reason) + }) +} + +// Stuff we don't want to run when the pending pieces change while benchmarking. +func (t *Torrent) onPiecePendingTriggers(piece pieceIndex) { t.maybeNewConns() t.publishPieceStateChange(piece) } -func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChanged bool) { - if !t.closed.IsSet() { - // It would be possible to filter on pure-priority changes here to avoid churning the piece - // request order. - t.updatePieceRequestOrderPiece(piece) - } +// Pending pieces is an old bitmap of stuff we want. I think it's more nuanced than that now with +// storage caps and cross-Torrent priorities. +func (t *Torrent) updatePendingPieces(piece pieceIndex) bool { p := t.piece(piece) newPrio := p.effectivePriority() - // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio) if newPrio == PiecePriorityNone && p.haveHash() { return t._pendingPieces.CheckedRemove(uint32(piece)) } else { @@ -1473,23 +1477,64 @@ func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChange } } +// Maybe return whether peer requests should be updated so reason doesn't have to be passed? +func (t *Torrent) updatePiecePriorityNoRequests(piece pieceIndex) (updateRequests bool) { + // I think because the piece request order gets removed at close. + if !t.closed.IsSet() { + // It would be possible to filter on pure-priority changes here to avoid churning the piece + // request order. If there's a storage cap then it's possible that pieces are moved around + // so that new requests can be issued. + updateRequests = t.updatePieceRequestOrderPiece(piece) && t.hasStorageCap() + } + if t.updatePendingPieces(piece) { + if !t.disableTriggers { + // This used to happen after updating requests, but I don't think the order matters. + t.onPiecePendingTriggers(piece) + } + // Something was added or removed. + updateRequests = true + } + return +} + func (t *Torrent) updatePiecePriority(piece pieceIndex, reason updateRequestReason) { - if t.updatePiecePriorityNoTriggers(piece) && !t.disableTriggers { - t.onPiecePendingTriggers(piece, reason) + t.logger.Slogger().Debug("updatePiecePriority", "piece", piece, "reason", reason) + if t.updatePiecePriorityNoRequests(piece) && !t.disableTriggers { + t.updatePeerRequestsForPiece(piece, reason) } - t.updatePieceRequestOrderPiece(piece) } func (t *Torrent) updateAllPiecePriorities(reason updateRequestReason) { t.updatePiecePriorities(0, t.numPieces(), reason) } -// Update all piece priorities in one hit. This function should have the same -// output as updatePiecePriority, but across all pieces. +// Update all piece priorities in one hit. This function should have the same output as +// updatePiecePriority, but across all pieces. func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason updateRequestReason) { + t.logger.Slogger().Debug("updating piece priorities", "begin", begin, "end", end) for i := begin; i < end; i++ { t.updatePiecePriority(i, reason) } + t.logPieceRequestOrder() +} + +// Helps debug piece priorities for capped storage. +func (t *Torrent) logPieceRequestOrder() { + level := slog.LevelDebug + logger := t.slogger() + if !logger.Enabled(context.Background(), level) { + return + } + pro := t.getPieceRequestOrder() + if pro != nil { + for item := range pro.Iter() { + t.slogger().Debug( + "piece request order item", "infohash", + item.Key.InfoHash, "piece", + item.Key.Index, "state", + item.State) + } + } } // Returns the range of pieces [begin, end) that contains the extent of bytes. @@ -2808,6 +2853,12 @@ func (t *Torrent) piece(i int) *Piece { return &t.pieces[i] } +func (t *Torrent) pieceForOffset(off int64) *Piece { + // Avoid conversion to int by doing indexing directly. Should we check the offset is allowed for + // that piece? + return &t.pieces[off/t.info.PieceLength] +} + func (t *Torrent) onWriteChunkErr(err error) { if t.userOnWriteChunkErr != nil { go t.userOnWriteChunkErr(err) @@ -3345,3 +3396,7 @@ file: func (t *Torrent) Complete() chansync.ReadOnlyFlag { return &t.complete } + +func (t *Torrent) slogger() *slog.Logger { + return t.logger.Slogger() +}