"hash"
"io"
"iter"
+ "log/slog"
"maps"
"math/rand"
"net/netip"
requestPieceStates []g.Option[request_strategy.PieceRequestOrderState]
requestIndexes []RequestIndex
+ // Disable actions after updating piece priorities, for benchmarking.
disableTriggers bool
}
t.openNewConns()
}
-func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason updateRequestReason) {
- if t._pendingPieces.Contains(uint32(piece)) {
- t.iterPeers(func(c *Peer) {
- // if c.requestState.Interested {
- // return
- // }
- if !c.isLowOnRequests() {
- return
- }
- if !c.peerHasPiece(piece) {
- return
- }
- if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
- return
- }
- c.updateRequests(reason)
- })
+func (t *Torrent) updatePeerRequestsForPiece(piece pieceIndex, reason updateRequestReason) {
+ if !t._pendingPieces.Contains(uint32(piece)) {
+ // Non-pending pieces are usually cancelled more synchronously.
+ return
}
+ t.iterPeers(func(c *Peer) {
+ // if c.requestState.Interested {
+ // return
+ // }
+ if !c.isLowOnRequests() {
+ return
+ }
+ if !c.peerHasPiece(piece) {
+ return
+ }
+ if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
+ return
+ }
+ c.updateRequests(reason)
+ })
+}
+
+// Stuff we don't want to run when the pending pieces change while benchmarking.
+func (t *Torrent) onPiecePendingTriggers(piece pieceIndex) {
t.maybeNewConns()
t.publishPieceStateChange(piece)
}
-func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChanged bool) {
- if !t.closed.IsSet() {
- // It would be possible to filter on pure-priority changes here to avoid churning the piece
- // request order.
- t.updatePieceRequestOrderPiece(piece)
- }
+// Pending pieces is an old bitmap of stuff we want. I think it's more nuanced than that now with
+// storage caps and cross-Torrent priorities.
+func (t *Torrent) updatePendingPieces(piece pieceIndex) bool {
p := t.piece(piece)
newPrio := p.effectivePriority()
- // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
if newPrio == PiecePriorityNone && p.haveHash() {
return t._pendingPieces.CheckedRemove(uint32(piece))
} else {
}
}
+// Maybe return whether peer requests should be updated so reason doesn't have to be passed?
+func (t *Torrent) updatePiecePriorityNoRequests(piece pieceIndex) (updateRequests bool) {
+ // I think because the piece request order gets removed at close.
+ if !t.closed.IsSet() {
+ // It would be possible to filter on pure-priority changes here to avoid churning the piece
+ // request order. If there's a storage cap then it's possible that pieces are moved around
+ // so that new requests can be issued.
+ updateRequests = t.updatePieceRequestOrderPiece(piece) && t.hasStorageCap()
+ }
+ if t.updatePendingPieces(piece) {
+ if !t.disableTriggers {
+ // This used to happen after updating requests, but I don't think the order matters.
+ t.onPiecePendingTriggers(piece)
+ }
+ // Something was added or removed.
+ updateRequests = true
+ }
+ return
+}
+
func (t *Torrent) updatePiecePriority(piece pieceIndex, reason updateRequestReason) {
- if t.updatePiecePriorityNoTriggers(piece) && !t.disableTriggers {
- t.onPiecePendingTriggers(piece, reason)
+ t.logger.Slogger().Debug("updatePiecePriority", "piece", piece, "reason", reason)
+ if t.updatePiecePriorityNoRequests(piece) && !t.disableTriggers {
+ t.updatePeerRequestsForPiece(piece, reason)
}
- t.updatePieceRequestOrderPiece(piece)
}
func (t *Torrent) updateAllPiecePriorities(reason updateRequestReason) {
t.updatePiecePriorities(0, t.numPieces(), reason)
}
-// Update all piece priorities in one hit. This function should have the same
-// output as updatePiecePriority, but across all pieces.
+// Update all piece priorities in one hit. This function should have the same output as
+// updatePiecePriority, but across all pieces.
func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason updateRequestReason) {
+ t.logger.Slogger().Debug("updating piece priorities", "begin", begin, "end", end)
for i := begin; i < end; i++ {
t.updatePiecePriority(i, reason)
}
+ t.logPieceRequestOrder()
+}
+
+// Helps debug piece priorities for capped storage.
+func (t *Torrent) logPieceRequestOrder() {
+ level := slog.LevelDebug
+ logger := t.slogger()
+ if !logger.Enabled(context.Background(), level) {
+ return
+ }
+ pro := t.getPieceRequestOrder()
+ if pro != nil {
+ for item := range pro.Iter() {
+ t.slogger().Debug(
+ "piece request order item", "infohash",
+ item.Key.InfoHash, "piece",
+ item.Key.Index, "state",
+ item.State)
+ }
+ }
}
// Returns the range of pieces [begin, end) that contains the extent of bytes.
return &t.pieces[i]
}
+func (t *Torrent) pieceForOffset(off int64) *Piece {
+ // Avoid conversion to int by doing indexing directly. Should we check the offset is allowed for
+ // that piece?
+ return &t.pieces[off/t.info.PieceLength]
+}
+
func (t *Torrent) onWriteChunkErr(err error) {
if t.userOnWriteChunkErr != nil {
go t.userOnWriteChunkErr(err)
func (t *Torrent) Complete() chansync.ReadOnlyFlag {
return &t.complete
}
+
+func (t *Torrent) slogger() *slog.Logger {
+ return t.logger.Slogger()
+}