From 67fe9f415595493c586d8e8575082e4515443f69 Mon Sep 17 00:00:00 2001
From: Matt Joiner <anacrolix@gmail.com>
Date: Mon, 28 Apr 2025 11:05:44 +1000
Subject: [PATCH] Update requests when piece order changes if there's a storage
 cap

---
 request-strategy-impls_test.go          |   2 +-
 request-strategy/piece-request-order.go |   3 +-
 requesting.go                           |   3 +
 torrent-piece-request-order.go          |  17 ++--
 torrent.go                              | 113 ++++++++++++++++++------
 5 files changed, 100 insertions(+), 38 deletions(-)

diff --git a/request-strategy-impls_test.go b/request-strategy-impls_test.go
index aab3b34e..daa52217 100644
--- a/request-strategy-impls_test.go
+++ b/request-strategy-impls_test.go
@@ -108,7 +108,7 @@ func BenchmarkRequestStrategy(b *testing.B) {
 	peer.onPeerHasAllPiecesNoTriggers()
 	for i := 0; i < tor.numPieces(); i++ {
 		tor.pieces[i].priority.Raise(PiecePriorityNormal)
-		tor.updatePiecePriorityNoTriggers(i)
+		tor.updatePiecePriorityNoRequests(i)
 	}
 	peer.peerChoking = false
 	//b.StopTimer()
diff --git a/request-strategy/piece-request-order.go b/request-strategy/piece-request-order.go
index 7f271ee3..5f152466 100644
--- a/request-strategy/piece-request-order.go
+++ b/request-strategy/piece-request-order.go
@@ -1,9 +1,10 @@
 package requestStrategy
 
 import (
-	g "github.com/anacrolix/generics"
 	"iter"
 
+	g "github.com/anacrolix/generics"
+
 	"github.com/anacrolix/torrent/metainfo"
 )
 
diff --git a/requesting.go b/requesting.go
index 8b57b21d..7d32012e 100644
--- a/requesting.go
+++ b/requesting.go
@@ -26,6 +26,7 @@ type (
 )
 
 func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState {
+	t.slogger().Debug("requestStrategyPieceOrderState", "pieceIndex", i)
 	return requestStrategy.PieceRequestOrderState{
 		Priority:     t.piece(i).purePriority(),
 		Partial:      t.piecePartiallyDownloaded(i),
@@ -185,6 +186,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
 		requestIndexes: t.requestIndexes,
 	}
 	clear(requestHeap.pieceStates)
+	t.logPieceRequestOrder()
 	// Caller-provided allocation for roaring bitmap iteration.
 	var it typedRoaring.Iterator[RequestIndex]
 	requestStrategy.GetRequestablePieces(
@@ -242,6 +244,7 @@ func (p *Peer) maybeUpdateActualRequestState() {
 			panic(since)
 		}
 	}
+	p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate)
 	pprof.Do(
 		context.Background(),
 		pprof.Labels("update request", string(p.needRequestUpdate)),
diff --git a/torrent-piece-request-order.go b/torrent-piece-request-order.go
index ac2b8cc1..eb4e00d7 100644
--- a/torrent-piece-request-order.go
+++ b/torrent-piece-request-order.go
@@ -7,24 +7,27 @@ import (
 	"github.com/anacrolix/torrent/storage"
 )
 
-func (t *Torrent) updatePieceRequestOrderPiece(pieceIndex int) {
+// It's probably possible to track whether the piece moves around in the btree to be more efficient
+// about triggering request updates.
+func (t *Torrent) updatePieceRequestOrderPiece(pieceIndex int) (changed bool) {
 	if t.storage == nil {
-		return
+		return false
 	}
 	pro, ok := t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()]
 	if !ok {
-		return
+		return false
 	}
 	key := t.pieceRequestOrderKey(pieceIndex)
 	if t.hasStorageCap() {
-		pro.Update(key, t.requestStrategyPieceOrderState(pieceIndex))
-		return
+		return pro.Update(key, t.requestStrategyPieceOrderState(pieceIndex))
 	}
 	pending := !t.ignorePieceForRequests(pieceIndex)
 	if pending {
-		pro.Add(key, t.requestStrategyPieceOrderState(pieceIndex))
+		newState := t.requestStrategyPieceOrderState(pieceIndex)
+		old := pro.Add(key, newState)
+		return old.Ok && old.Value != newState
 	} else {
-		pro.Delete(key)
+		return pro.Delete(key)
 	}
 }
 
diff --git a/torrent.go b/torrent.go
index 3903e0ec..da83d107 100644
--- a/torrent.go
+++ b/torrent.go
@@ -10,6 +10,7 @@ import (
 	"hash"
 	"io"
 	"iter"
+	"log/slog"
 	"maps"
 	"math/rand"
 	"net/netip"
@@ -180,6 +181,7 @@ type Torrent struct {
 	requestPieceStates []g.Option[request_strategy.PieceRequestOrderState]
 	requestIndexes     []RequestIndex
 
+	// Disable actions after updating piece priorities, for benchmarking.
 	disableTriggers bool
 }
 
@@ -1435,37 +1437,39 @@ func (t *Torrent) maybeNewConns() {
 	t.openNewConns()
 }
 
-func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason updateRequestReason) {
-	if t._pendingPieces.Contains(uint32(piece)) {
-		t.iterPeers(func(c *Peer) {
-			// if c.requestState.Interested {
-			// 	return
-			// }
-			if !c.isLowOnRequests() {
-				return
-			}
-			if !c.peerHasPiece(piece) {
-				return
-			}
-			if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
-				return
-			}
-			c.updateRequests(reason)
-		})
+func (t *Torrent) updatePeerRequestsForPiece(piece pieceIndex, reason updateRequestReason) {
+	if !t._pendingPieces.Contains(uint32(piece)) {
+		// Non-pending pieces are usually cancelled more synchronously.
+		return
 	}
+	t.iterPeers(func(c *Peer) {
+		// if c.requestState.Interested {
+		// 	return
+		// }
+		if !c.isLowOnRequests() {
+			return
+		}
+		if !c.peerHasPiece(piece) {
+			return
+		}
+		if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
+			return
+		}
+		c.updateRequests(reason)
+	})
+}
+
+// Stuff we don't want to run when the pending pieces change while benchmarking.
+func (t *Torrent) onPiecePendingTriggers(piece pieceIndex) {
 	t.maybeNewConns()
 	t.publishPieceStateChange(piece)
 }
 
-func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChanged bool) {
-	if !t.closed.IsSet() {
-		// It would be possible to filter on pure-priority changes here to avoid churning the piece
-		// request order.
-		t.updatePieceRequestOrderPiece(piece)
-	}
+// Pending pieces is an old bitmap of stuff we want. I think it's more nuanced than that now with
+// storage caps and cross-Torrent priorities.
+func (t *Torrent) updatePendingPieces(piece pieceIndex) bool {
 	p := t.piece(piece)
 	newPrio := p.effectivePriority()
-	// t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
 	if newPrio == PiecePriorityNone && p.haveHash() {
 		return t._pendingPieces.CheckedRemove(uint32(piece))
 	} else {
@@ -1473,23 +1477,64 @@ func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChange
 	}
 }
 
+// Maybe return whether peer requests should be updated so reason doesn't have to be passed?
+func (t *Torrent) updatePiecePriorityNoRequests(piece pieceIndex) (updateRequests bool) {
+	// I think because the piece request order gets removed at close.
+	if !t.closed.IsSet() {
+		// It would be possible to filter on pure-priority changes here to avoid churning the piece
+		// request order. If there's a storage cap then it's possible that pieces are moved around
+		// so that new requests can be issued.
+		updateRequests = t.updatePieceRequestOrderPiece(piece) && t.hasStorageCap()
+	}
+	if t.updatePendingPieces(piece) {
+		if !t.disableTriggers {
+			// This used to happen after updating requests, but I don't think the order matters.
+			t.onPiecePendingTriggers(piece)
+		}
+		// Something was added or removed.
+		updateRequests = true
+	}
+	return
+}
+
 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason updateRequestReason) {
-	if t.updatePiecePriorityNoTriggers(piece) && !t.disableTriggers {
-		t.onPiecePendingTriggers(piece, reason)
+	t.logger.Slogger().Debug("updatePiecePriority", "piece", piece, "reason", reason)
+	if t.updatePiecePriorityNoRequests(piece) && !t.disableTriggers {
+		t.updatePeerRequestsForPiece(piece, reason)
 	}
-	t.updatePieceRequestOrderPiece(piece)
 }
 
 func (t *Torrent) updateAllPiecePriorities(reason updateRequestReason) {
 	t.updatePiecePriorities(0, t.numPieces(), reason)
 }
 
-// Update all piece priorities in one hit. This function should have the same
-// output as updatePiecePriority, but across all pieces.
+// Update all piece priorities in one hit. This function should have the same output as
+// updatePiecePriority, but across all pieces.
 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason updateRequestReason) {
+	t.logger.Slogger().Debug("updating piece priorities", "begin", begin, "end", end)
 	for i := begin; i < end; i++ {
 		t.updatePiecePriority(i, reason)
 	}
+	t.logPieceRequestOrder()
+}
+
+// Helps debug piece priorities for capped storage.
+func (t *Torrent) logPieceRequestOrder() {
+	level := slog.LevelDebug
+	logger := t.slogger()
+	if !logger.Enabled(context.Background(), level) {
+		return
+	}
+	pro := t.getPieceRequestOrder()
+	if pro != nil {
+		for item := range pro.Iter() {
+			t.slogger().Debug(
+				"piece request order item", "infohash",
+				item.Key.InfoHash, "piece",
+				item.Key.Index, "state",
+				item.State)
+		}
+	}
 }
 
 // Returns the range of pieces [begin, end) that contains the extent of bytes.
@@ -2808,6 +2853,12 @@ func (t *Torrent) piece(i int) *Piece {
 	return &t.pieces[i]
 }
 
+func (t *Torrent) pieceForOffset(off int64) *Piece {
+	// Avoid conversion to int by doing indexing directly. Should we check the offset is allowed for
+	// that piece?
+	return &t.pieces[off/t.info.PieceLength]
+}
+
 func (t *Torrent) onWriteChunkErr(err error) {
 	if t.userOnWriteChunkErr != nil {
 		go t.userOnWriteChunkErr(err)
@@ -3345,3 +3396,7 @@ file:
 func (t *Torrent) Complete() chansync.ReadOnlyFlag {
 	return &t.complete
 }
+
+func (t *Torrent) slogger() *slog.Logger {
+	return t.logger.Slogger()
+}
-- 
2.52.0