]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Update requests when piece order changes if there's a storage cap
authorMatt Joiner <anacrolix@gmail.com>
Mon, 28 Apr 2025 01:05:44 +0000 (11:05 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 28 Apr 2025 01:13:20 +0000 (11:13 +1000)
request-strategy-impls_test.go
request-strategy/piece-request-order.go
requesting.go
torrent-piece-request-order.go
torrent.go

index aab3b34ef2afc5ca38e08e002849dc62219f8192..daa5221754f1658226b0cb55aae30a551a26a64f 100644 (file)
@@ -108,7 +108,7 @@ func BenchmarkRequestStrategy(b *testing.B) {
        peer.onPeerHasAllPiecesNoTriggers()
        for i := 0; i < tor.numPieces(); i++ {
                tor.pieces[i].priority.Raise(PiecePriorityNormal)
-               tor.updatePiecePriorityNoTriggers(i)
+               tor.updatePiecePriorityNoRequests(i)
        }
        peer.peerChoking = false
        //b.StopTimer()
index 7f271ee399d1b33e69a7abc5994e62ba74aefd72..5f15246643201b14220ac272a8f3d2809642ba54 100644 (file)
@@ -1,9 +1,10 @@
 package requestStrategy
 
 import (
-       g "github.com/anacrolix/generics"
        "iter"
 
+       g "github.com/anacrolix/generics"
+
        "github.com/anacrolix/torrent/metainfo"
 )
 
index 8b57b21d9dd0733a35420dbdfd762042ce3c5908..7d32012edd231d12ee20aec20e7b8df58ef0f24d 100644 (file)
@@ -26,6 +26,7 @@ type (
 )
 
 func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState {
+       t.slogger().Debug("requestStrategyPieceOrderState", "pieceIndex", i)
        return requestStrategy.PieceRequestOrderState{
                Priority:     t.piece(i).purePriority(),
                Partial:      t.piecePartiallyDownloaded(i),
@@ -185,6 +186,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
                requestIndexes: t.requestIndexes,
        }
        clear(requestHeap.pieceStates)
+       t.logPieceRequestOrder()
        // Caller-provided allocation for roaring bitmap iteration.
        var it typedRoaring.Iterator[RequestIndex]
        requestStrategy.GetRequestablePieces(
@@ -242,6 +244,7 @@ func (p *Peer) maybeUpdateActualRequestState() {
                        panic(since)
                }
        }
+       p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate)
        pprof.Do(
                context.Background(),
                pprof.Labels("update request", string(p.needRequestUpdate)),
index ac2b8cc1b4953253495650649ebcd01147388173..eb4e00d70fead290850ed93d67ee4ad600947eb5 100644 (file)
@@ -7,24 +7,27 @@ import (
        "github.com/anacrolix/torrent/storage"
 )
 
-func (t *Torrent) updatePieceRequestOrderPiece(pieceIndex int) {
+// It's probably possible to track whether the piece moves around in the btree to be more efficient
+// about triggering request updates.
+func (t *Torrent) updatePieceRequestOrderPiece(pieceIndex int) (changed bool) {
        if t.storage == nil {
-               return
+               return false
        }
        pro, ok := t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()]
        if !ok {
-               return
+               return false
        }
        key := t.pieceRequestOrderKey(pieceIndex)
        if t.hasStorageCap() {
-               pro.Update(key, t.requestStrategyPieceOrderState(pieceIndex))
-               return
+               return pro.Update(key, t.requestStrategyPieceOrderState(pieceIndex))
        }
        pending := !t.ignorePieceForRequests(pieceIndex)
        if pending {
-               pro.Add(key, t.requestStrategyPieceOrderState(pieceIndex))
+               newState := t.requestStrategyPieceOrderState(pieceIndex)
+               old := pro.Add(key, newState)
+               return old.Ok && old.Value != newState
        } else {
-               pro.Delete(key)
+               return pro.Delete(key)
        }
 }
 
index 3903e0ec201cd07f8f0ee21a9a251dd5185f570d..da83d1077351f44e70de4684522ee463444c8a75 100644 (file)
@@ -10,6 +10,7 @@ import (
        "hash"
        "io"
        "iter"
+       "log/slog"
        "maps"
        "math/rand"
        "net/netip"
@@ -180,6 +181,7 @@ type Torrent struct {
        requestPieceStates []g.Option[request_strategy.PieceRequestOrderState]
        requestIndexes     []RequestIndex
 
+       // Disable actions after updating piece priorities, for benchmarking.
        disableTriggers bool
 }
 
@@ -1435,37 +1437,39 @@ func (t *Torrent) maybeNewConns() {
        t.openNewConns()
 }
 
-func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason updateRequestReason) {
-       if t._pendingPieces.Contains(uint32(piece)) {
-               t.iterPeers(func(c *Peer) {
-                       // if c.requestState.Interested {
-                       //      return
-                       // }
-                       if !c.isLowOnRequests() {
-                               return
-                       }
-                       if !c.peerHasPiece(piece) {
-                               return
-                       }
-                       if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
-                               return
-                       }
-                       c.updateRequests(reason)
-               })
+func (t *Torrent) updatePeerRequestsForPiece(piece pieceIndex, reason updateRequestReason) {
+       if !t._pendingPieces.Contains(uint32(piece)) {
+               // Non-pending pieces are usually cancelled more synchronously.
+               return
        }
+       t.iterPeers(func(c *Peer) {
+               // if c.requestState.Interested {
+               //      return
+               // }
+               if !c.isLowOnRequests() {
+                       return
+               }
+               if !c.peerHasPiece(piece) {
+                       return
+               }
+               if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
+                       return
+               }
+               c.updateRequests(reason)
+       })
+}
+
+// Stuff we don't want to run when the pending pieces change while benchmarking.
+func (t *Torrent) onPiecePendingTriggers(piece pieceIndex) {
        t.maybeNewConns()
        t.publishPieceStateChange(piece)
 }
 
-func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChanged bool) {
-       if !t.closed.IsSet() {
-               // It would be possible to filter on pure-priority changes here to avoid churning the piece
-               // request order.
-               t.updatePieceRequestOrderPiece(piece)
-       }
+// Pending pieces is an old bitmap of stuff we want. I think it's more nuanced than that now with
+// storage caps and cross-Torrent priorities.
+func (t *Torrent) updatePendingPieces(piece pieceIndex) bool {
        p := t.piece(piece)
        newPrio := p.effectivePriority()
-       // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
        if newPrio == PiecePriorityNone && p.haveHash() {
                return t._pendingPieces.CheckedRemove(uint32(piece))
        } else {
@@ -1473,23 +1477,64 @@ func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChange
        }
 }
 
+// Maybe return whether peer requests should be updated so reason doesn't have to be passed?
+func (t *Torrent) updatePiecePriorityNoRequests(piece pieceIndex) (updateRequests bool) {
+       // I think because the piece request order gets removed at close.
+       if !t.closed.IsSet() {
+               // It would be possible to filter on pure-priority changes here to avoid churning the piece
+               // request order. If there's a storage cap then it's possible that pieces are moved around
+               // so that new requests can be issued.
+               updateRequests = t.updatePieceRequestOrderPiece(piece) && t.hasStorageCap()
+       }
+       if t.updatePendingPieces(piece) {
+               if !t.disableTriggers {
+                       // This used to happen after updating requests, but I don't think the order matters.
+                       t.onPiecePendingTriggers(piece)
+               }
+               // Something was added or removed.
+               updateRequests = true
+       }
+       return
+}
+
 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason updateRequestReason) {
-       if t.updatePiecePriorityNoTriggers(piece) && !t.disableTriggers {
-               t.onPiecePendingTriggers(piece, reason)
+       t.logger.Slogger().Debug("updatePiecePriority", "piece", piece, "reason", reason)
+       if t.updatePiecePriorityNoRequests(piece) && !t.disableTriggers {
+               t.updatePeerRequestsForPiece(piece, reason)
        }
-       t.updatePieceRequestOrderPiece(piece)
 }
 
 func (t *Torrent) updateAllPiecePriorities(reason updateRequestReason) {
        t.updatePiecePriorities(0, t.numPieces(), reason)
 }
 
-// Update all piece priorities in one hit. This function should have the same
-// output as updatePiecePriority, but across all pieces.
+// Update all piece priorities in one hit. This function should have the same output as
+// updatePiecePriority, but across all pieces.
 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason updateRequestReason) {
+       t.logger.Slogger().Debug("updating piece priorities", "begin", begin, "end", end)
        for i := begin; i < end; i++ {
                t.updatePiecePriority(i, reason)
        }
+       t.logPieceRequestOrder()
+}
+
+// Helps debug piece priorities for capped storage.
+func (t *Torrent) logPieceRequestOrder() {
+       level := slog.LevelDebug
+       logger := t.slogger()
+       if !logger.Enabled(context.Background(), level) {
+               return
+       }
+       pro := t.getPieceRequestOrder()
+       if pro != nil {
+               for item := range pro.Iter() {
+                       t.slogger().Debug(
+                               "piece request order item", "infohash",
+                               item.Key.InfoHash, "piece",
+                               item.Key.Index, "state",
+                               item.State)
+               }
+       }
 }
 
 // Returns the range of pieces [begin, end) that contains the extent of bytes.
@@ -2808,6 +2853,12 @@ func (t *Torrent) piece(i int) *Piece {
        return &t.pieces[i]
 }
 
+func (t *Torrent) pieceForOffset(off int64) *Piece {
+       // Avoid conversion to int by doing indexing directly. Should we check the offset is allowed for
+       // that piece?
+       return &t.pieces[off/t.info.PieceLength]
+}
+
 func (t *Torrent) onWriteChunkErr(err error) {
        if t.userOnWriteChunkErr != nil {
                go t.userOnWriteChunkErr(err)
@@ -3345,3 +3396,7 @@ file:
 func (t *Torrent) Complete() chansync.ReadOnlyFlag {
        return &t.complete
 }
+
+func (t *Torrent) slogger() *slog.Logger {
+       return t.logger.Slogger()
+}