"time"
"unsafe"
+ "github.com/RoaringBitmap/roaring"
+ g "github.com/anacrolix/generics"
"github.com/anacrolix/generics/heap"
"github.com/anacrolix/log"
"github.com/anacrolix/multiless"
)
func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState {
+ t.slogger().Debug("requestStrategyPieceOrderState", "pieceIndex", i)
return requestStrategy.PieceRequestOrderState{
Priority: t.piece(i).purePriority(),
Partial: t.piecePartiallyDownloaded(i),
type desiredPeerRequests struct {
requestIndexes []RequestIndex
peer *Peer
- pieceStates []requestStrategy.PieceRequestOrderState
+ pieceStates []g.Option[requestStrategy.PieceRequestOrderState]
}
func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex) bool {
!p.peer.peerAllowedFast.Contains(rightPieceIndex),
)
}
- leftPiece := &p.pieceStates[leftPieceIndex]
- rightPiece := &p.pieceStates[rightPieceIndex]
+ leftPiece := p.pieceStates[leftPieceIndex].UnwrapPtr()
+ rightPiece := p.pieceStates[rightPieceIndex].UnwrapPtr()
// Putting this first means we can steal requests from lesser-performing peers for our first few
// new requests.
- priority := func() piecePriority {
+ priority := func() PiecePriority {
// Technically we would be happy with the cached priority here, except we don't actually
- // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve
+ // cache it anymore, and Torrent.PiecePriority just does another lookup of *Piece to resolve
// the priority through Piece.purePriority, which is probably slower.
leftPriority := leftPiece.Priority
rightPriority := rightPiece.Priority
pieceStates: t.requestPieceStates,
requestIndexes: t.requestIndexes,
}
+ clear(requestHeap.pieceStates)
+ t.logPieceRequestOrder()
// Caller-provided allocation for roaring bitmap iteration.
var it typedRoaring.Iterator[RequestIndex]
requestStrategy.GetRequestablePieces(
input,
t.getPieceRequestOrder(),
- func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) {
- if ih != t.infoHash {
- return
+ func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) bool {
+ if ih != *t.canonicalShortInfohash() {
+ return false
}
if !p.peerHasPiece(pieceIndex) {
- return
+ return false
}
- requestHeap.pieceStates[pieceIndex] = pieceExtra
+ requestHeap.pieceStates[pieceIndex].Set(pieceExtra)
allowedFast := p.peerAllowedFast.Contains(pieceIndex)
t.iterUndirtiedRequestIndexesInPiece(&it, pieceIndex, func(r requestStrategy.RequestIndex) {
if !allowedFast {
return
}
}
- if p.requestState.Cancelled.Contains(r) {
+ cancelled := &p.requestState.Cancelled
+ if !cancelled.IsEmpty() && cancelled.Contains(r) {
// Can't re-request while awaiting acknowledgement.
return
}
requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
})
+ return true
},
)
t.assertPendingRequests()
panic(since)
}
}
+ p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate)
pprof.Do(
context.Background(),
- pprof.Labels("update request", p.needRequestUpdate),
+ pprof.Labels("update request", string(p.needRequestUpdate)),
func(_ context.Context) {
next := p.getDesiredRequestState()
p.applyRequestState(next)
- p.t.requestIndexes = next.Requests.requestIndexes[:0]
+ p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
},
)
}
+func (t *Torrent) cacheNextRequestIndexesForReuse(slice []RequestIndex) {
+ // The incoming slice can be smaller when getDesiredRequestState short circuits on some
+ // conditions.
+ if cap(slice) > cap(t.requestIndexes) {
+ t.requestIndexes = slice[:0]
+ }
+}
+
+// Whether we should allow sending not interested ("losing interest") to the peer. I noticed
+// qBitTorrent seems to punish us for sending not interested when we're streaming and don't
+// currently need anything.
+func (p *Peer) allowSendNotInterested() bool {
+ // Except for caching, we're not likely to lose pieces very soon.
+ if p.t.haveAllPieces() {
+ return true
+ }
+ all, known := p.peerHasAllPieces()
+ if all || !known {
+ return false
+ }
+ // Allow losing interest if we have all the pieces the peer has.
+ return roaring.AndNot(p.peerPieces(), &p.t._completedPieces).IsEmpty()
+}
+
// Transmit/action the request state to the peer.
func (p *Peer) applyRequestState(next desiredRequestState) {
current := &p.requestState
+ // Make interest sticky
+ if !next.Interested && p.requestState.Interested {
+ if !p.allowSendNotInterested() {
+ next.Interested = true
+ }
+ }
if !p.setInterested(next.Interested) {
return
}
more := true
- requestHeap := heap.InterfaceForSlice(&next.Requests.requestIndexes, next.Requests.lessByValue)
+ orig := next.Requests.requestIndexes
+ requestHeap := heap.InterfaceForSlice(
+ &next.Requests.requestIndexes,
+ next.Requests.lessByValue,
+ )
heap.Init(requestHeap)
t := p.t
break
}
req := heap.Pop(requestHeap)
+ if cap(next.Requests.requestIndexes) != cap(orig) {
+ panic("changed")
+ }
+
+ // don't add requests on reciept of a reject - because this causes request back
+ // to potentially permanently unresponive peers - which just adds network noise. If
+ // the peer can handle more requests it will send an "unchoked" message - which
+ // will cause it to get added back to the request queue
+ if p.needRequestUpdate == peerUpdateRequestsRemoteRejectReason {
+ continue
+ }
+
existing := t.requestingPeer(req)
if existing != nil && existing != p {
+ // don't steal on cancel - because this is triggered by t.cancelRequest below
+ // which means that the cancelled can immediately try to steal back a request
+ // it has lost which can lead to circular cancel/add processing
+ if p.needRequestUpdate == peerUpdateRequestsPeerCancelReason {
+ continue
+ }
+
// Don't steal from the poor.
diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1)
// Steal a request that leaves us with one more request than the existing peer
// connection if the stealer more recently received a chunk.
- if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) {
+ if diff > 1 || (diff == 1 && !p.lastUsefulChunkReceived.After(existing.lastUsefulChunkReceived)) {
continue
}
t.cancelRequest(req)