"unsafe"
"github.com/RoaringBitmap/roaring"
+ g "github.com/anacrolix/generics"
"github.com/anacrolix/generics/heap"
"github.com/anacrolix/log"
"github.com/anacrolix/multiless"
)
func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState {
+ t.slogger().Debug("requestStrategyPieceOrderState", "pieceIndex", i)
return requestStrategy.PieceRequestOrderState{
Priority: t.piece(i).purePriority(),
Partial: t.piecePartiallyDownloaded(i),
type desiredPeerRequests struct {
requestIndexes []RequestIndex
peer *Peer
- pieceStates []requestStrategy.PieceRequestOrderState
+ pieceStates []g.Option[requestStrategy.PieceRequestOrderState]
}
func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex) bool {
!p.peer.peerAllowedFast.Contains(rightPieceIndex),
)
}
- leftPiece := &p.pieceStates[leftPieceIndex]
- rightPiece := &p.pieceStates[rightPieceIndex]
+ leftPiece := p.pieceStates[leftPieceIndex].UnwrapPtr()
+ rightPiece := p.pieceStates[rightPieceIndex].UnwrapPtr()
// Putting this first means we can steal requests from lesser-performing peers for our first few
// new requests.
priority := func() PiecePriority {
pieceStates: t.requestPieceStates,
requestIndexes: t.requestIndexes,
}
+ clear(requestHeap.pieceStates)
+ t.logPieceRequestOrder()
// Caller-provided allocation for roaring bitmap iteration.
var it typedRoaring.Iterator[RequestIndex]
requestStrategy.GetRequestablePieces(
input,
t.getPieceRequestOrder(),
- func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) {
+ func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) bool {
if ih != *t.canonicalShortInfohash() {
- return
+ return false
}
if !p.peerHasPiece(pieceIndex) {
- return
+ return false
}
- requestHeap.pieceStates[pieceIndex] = pieceExtra
+ requestHeap.pieceStates[pieceIndex].Set(pieceExtra)
allowedFast := p.peerAllowedFast.Contains(pieceIndex)
t.iterUndirtiedRequestIndexesInPiece(&it, pieceIndex, func(r requestStrategy.RequestIndex) {
if !allowedFast {
}
requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
})
+ return true
},
)
t.assertPendingRequests()
panic(since)
}
}
+ p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate)
pprof.Do(
context.Background(),
- pprof.Labels("update request", p.needRequestUpdate),
+ pprof.Labels("update request", string(p.needRequestUpdate)),
func(_ context.Context) {
next := p.getDesiredRequestState()
p.applyRequestState(next)
if cap(next.Requests.requestIndexes) != cap(orig) {
panic("changed")
}
+
+ // don't add requests on reciept of a reject - because this causes request back
+ // to potentially permanently unresponive peers - which just adds network noise. If
+ // the peer can handle more requests it will send an "unchoked" message - which
+ // will cause it to get added back to the request queue
+ if p.needRequestUpdate == peerUpdateRequestsRemoteRejectReason {
+ continue
+ }
+
existing := t.requestingPeer(req)
if existing != nil && existing != p {
+ // don't steal on cancel - because this is triggered by t.cancelRequest below
+ // which means that the cancelled can immediately try to steal back a request
+ // it has lost which can lead to circular cancel/add processing
+ if p.needRequestUpdate == peerUpdateRequestsPeerCancelReason {
+ continue
+ }
+
// Don't steal from the poor.
diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1)
// Steal a request that leaves us with one more request than the existing peer
// connection if the stealer more recently received a chunk.
- if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) {
+ if diff > 1 || (diff == 1 && !p.lastUsefulChunkReceived.After(existing.lastUsefulChunkReceived)) {
continue
}
t.cancelRequest(req)