From 67fe9f415595493c586d8e8575082e4515443f69 Mon Sep 17 00:00:00 2001
From: Matt Joiner <anacrolix@gmail.com>
Date: Mon, 28 Apr 2025 11:05:44 +1000
Subject: [PATCH] Update requests when piece order changes if there's a storage
cap
---
request-strategy-impls_test.go | 2 +-
request-strategy/piece-request-order.go | 3 +-
requesting.go | 3 +
torrent-piece-request-order.go | 17 ++--
torrent.go | 113 ++++++++++++++++++------
5 files changed, 100 insertions(+), 38 deletions(-)
diff --git a/request-strategy-impls_test.go b/request-strategy-impls_test.go
index aab3b34e..daa52217 100644
--- a/request-strategy-impls_test.go
+++ b/request-strategy-impls_test.go
@@ -108,7 +108,7 @@ func BenchmarkRequestStrategy(b *testing.B) {
peer.onPeerHasAllPiecesNoTriggers()
for i := 0; i < tor.numPieces(); i++ {
tor.pieces[i].priority.Raise(PiecePriorityNormal)
- tor.updatePiecePriorityNoTriggers(i)
+ tor.updatePiecePriorityNoRequests(i)
}
peer.peerChoking = false
//b.StopTimer()
diff --git a/request-strategy/piece-request-order.go b/request-strategy/piece-request-order.go
index 7f271ee3..5f152466 100644
--- a/request-strategy/piece-request-order.go
+++ b/request-strategy/piece-request-order.go
@@ -1,9 +1,10 @@
package requestStrategy
import (
- g "github.com/anacrolix/generics"
"iter"
+ g "github.com/anacrolix/generics"
+
"github.com/anacrolix/torrent/metainfo"
)
diff --git a/requesting.go b/requesting.go
index 8b57b21d..7d32012e 100644
--- a/requesting.go
+++ b/requesting.go
@@ -26,6 +26,7 @@ type (
)
func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState {
+ t.slogger().Debug("requestStrategyPieceOrderState", "pieceIndex", i)
return requestStrategy.PieceRequestOrderState{
Priority: t.piece(i).purePriority(),
Partial: t.piecePartiallyDownloaded(i),
@@ -185,6 +186,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
requestIndexes: t.requestIndexes,
}
clear(requestHeap.pieceStates)
+ t.logPieceRequestOrder()
// Caller-provided allocation for roaring bitmap iteration.
var it typedRoaring.Iterator[RequestIndex]
requestStrategy.GetRequestablePieces(
@@ -242,6 +244,7 @@ func (p *Peer) maybeUpdateActualRequestState() {
panic(since)
}
}
+ p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate)
pprof.Do(
context.Background(),
pprof.Labels("update request", string(p.needRequestUpdate)),
diff --git a/torrent-piece-request-order.go b/torrent-piece-request-order.go
index ac2b8cc1..eb4e00d7 100644
--- a/torrent-piece-request-order.go
+++ b/torrent-piece-request-order.go
@@ -7,24 +7,27 @@ import (
"github.com/anacrolix/torrent/storage"
)
-func (t *Torrent) updatePieceRequestOrderPiece(pieceIndex int) {
+// It's probably possible to track whether the piece moves around in the btree to be more efficient
+// about triggering request updates.
+func (t *Torrent) updatePieceRequestOrderPiece(pieceIndex int) (changed bool) {
if t.storage == nil {
- return
+ return false
}
pro, ok := t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()]
if !ok {
- return
+ return false
}
key := t.pieceRequestOrderKey(pieceIndex)
if t.hasStorageCap() {
- pro.Update(key, t.requestStrategyPieceOrderState(pieceIndex))
- return
+ return pro.Update(key, t.requestStrategyPieceOrderState(pieceIndex))
}
pending := !t.ignorePieceForRequests(pieceIndex)
if pending {
- pro.Add(key, t.requestStrategyPieceOrderState(pieceIndex))
+ newState := t.requestStrategyPieceOrderState(pieceIndex)
+ old := pro.Add(key, newState)
+ return old.Ok && old.Value != newState
} else {
- pro.Delete(key)
+ return pro.Delete(key)
}
}
diff --git a/torrent.go b/torrent.go
index 3903e0ec..da83d107 100644
--- a/torrent.go
+++ b/torrent.go
@@ -10,6 +10,7 @@ import (
"hash"
"io"
"iter"
+ "log/slog"
"maps"
"math/rand"
"net/netip"
@@ -180,6 +181,7 @@ type Torrent struct {
requestPieceStates []g.Option[request_strategy.PieceRequestOrderState]
requestIndexes []RequestIndex
+ // Disable actions after updating piece priorities, for benchmarking.
disableTriggers bool
}
@@ -1435,37 +1437,39 @@ func (t *Torrent) maybeNewConns() {
t.openNewConns()
}
-func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason updateRequestReason) {
- if t._pendingPieces.Contains(uint32(piece)) {
- t.iterPeers(func(c *Peer) {
- // if c.requestState.Interested {
- // return
- // }
- if !c.isLowOnRequests() {
- return
- }
- if !c.peerHasPiece(piece) {
- return
- }
- if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
- return
- }
- c.updateRequests(reason)
- })
+func (t *Torrent) updatePeerRequestsForPiece(piece pieceIndex, reason updateRequestReason) {
+ if !t._pendingPieces.Contains(uint32(piece)) {
+ // Non-pending pieces are usually cancelled more synchronously.
+ return
}
+ t.iterPeers(func(c *Peer) {
+ // if c.requestState.Interested {
+ // return
+ // }
+ if !c.isLowOnRequests() {
+ return
+ }
+ if !c.peerHasPiece(piece) {
+ return
+ }
+ if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
+ return
+ }
+ c.updateRequests(reason)
+ })
+}
+
+// Stuff we don't want to run when the pending pieces change while benchmarking.
+func (t *Torrent) onPiecePendingTriggers(piece pieceIndex) {
t.maybeNewConns()
t.publishPieceStateChange(piece)
}
-func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChanged bool) {
- if !t.closed.IsSet() {
- // It would be possible to filter on pure-priority changes here to avoid churning the piece
- // request order.
- t.updatePieceRequestOrderPiece(piece)
- }
+// Pending pieces is an old bitmap of stuff we want. I think it's more nuanced than that now with
+// storage caps and cross-Torrent priorities.
+func (t *Torrent) updatePendingPieces(piece pieceIndex) bool {
p := t.piece(piece)
newPrio := p.effectivePriority()
- // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
if newPrio == PiecePriorityNone && p.haveHash() {
return t._pendingPieces.CheckedRemove(uint32(piece))
} else {
@@ -1473,23 +1477,64 @@ func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChange
}
}
+// Maybe return whether peer requests should be updated so reason doesn't have to be passed?
+func (t *Torrent) updatePiecePriorityNoRequests(piece pieceIndex) (updateRequests bool) {
+ // I think because the piece request order gets removed at close.
+ if !t.closed.IsSet() {
+ // It would be possible to filter on pure-priority changes here to avoid churning the piece
+ // request order. If there's a storage cap then it's possible that pieces are moved around
+ // so that new requests can be issued.
+ updateRequests = t.updatePieceRequestOrderPiece(piece) && t.hasStorageCap()
+ }
+ if t.updatePendingPieces(piece) {
+ if !t.disableTriggers {
+ // This used to happen after updating requests, but I don't think the order matters.
+ t.onPiecePendingTriggers(piece)
+ }
+ // Something was added or removed.
+ updateRequests = true
+ }
+ return
+}
+
func (t *Torrent) updatePiecePriority(piece pieceIndex, reason updateRequestReason) {
- if t.updatePiecePriorityNoTriggers(piece) && !t.disableTriggers {
- t.onPiecePendingTriggers(piece, reason)
+ t.logger.Slogger().Debug("updatePiecePriority", "piece", piece, "reason", reason)
+ if t.updatePiecePriorityNoRequests(piece) && !t.disableTriggers {
+ t.updatePeerRequestsForPiece(piece, reason)
}
- t.updatePieceRequestOrderPiece(piece)
}
func (t *Torrent) updateAllPiecePriorities(reason updateRequestReason) {
t.updatePiecePriorities(0, t.numPieces(), reason)
}
-// Update all piece priorities in one hit. This function should have the same
-// output as updatePiecePriority, but across all pieces.
+// Update all piece priorities in one hit. This function should have the same output as
+// updatePiecePriority, but across all pieces.
func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason updateRequestReason) {
+ t.logger.Slogger().Debug("updating piece priorities", "begin", begin, "end", end)
for i := begin; i < end; i++ {
t.updatePiecePriority(i, reason)
}
+ t.logPieceRequestOrder()
+}
+
+// Helps debug piece priorities for capped storage.
+func (t *Torrent) logPieceRequestOrder() {
+ level := slog.LevelDebug
+ logger := t.slogger()
+ if !logger.Enabled(context.Background(), level) {
+ return
+ }
+ pro := t.getPieceRequestOrder()
+ if pro != nil {
+ for item := range pro.Iter() {
+ t.slogger().Debug(
+ "piece request order item", "infohash",
+ item.Key.InfoHash, "piece",
+ item.Key.Index, "state",
+ item.State)
+ }
+ }
}
// Returns the range of pieces [begin, end) that contains the extent of bytes.
@@ -2808,6 +2853,12 @@ func (t *Torrent) piece(i int) *Piece {
return &t.pieces[i]
}
+func (t *Torrent) pieceForOffset(off int64) *Piece {
+ // Avoid conversion to int by doing indexing directly. Should we check the offset is allowed for
+ // that piece?
+ return &t.pieces[off/t.info.PieceLength]
+}
+
func (t *Torrent) onWriteChunkErr(err error) {
if t.userOnWriteChunkErr != nil {
go t.userOnWriteChunkErr(err)
@@ -3345,3 +3396,7 @@ file:
func (t *Torrent) Complete() chansync.ReadOnlyFlag {
return &t.complete
}
+
+func (t *Torrent) slogger() *slog.Logger {
+ return t.logger.Slogger()
+}
--
2.52.0