"time"
"unsafe"
- "github.com/RoaringBitmap/roaring"
"github.com/anacrolix/generics/heap"
"github.com/anacrolix/log"
"github.com/anacrolix/multiless"
rightPiece := &p.pieceStates[rightPieceIndex]
// Putting this first means we can steal requests from lesser-performing peers for our first few
// new requests.
- priority := func() PiecePriority {
+ priority := func() piecePriority {
// Technically we would be happy with the cached priority here, except we don't actually
- // cache it anymore, and Torrent.PiecePriority just does another lookup of *Piece to resolve
+ // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve
// the priority through Piece.purePriority, which is probably slower.
leftPriority := leftPiece.Priority
rightPriority := rightPiece.Priority
if t.closed.IsSet() {
return
}
- if t.dataDownloadDisallowed.Bool() {
- return
- }
input := t.getRequestStrategyInput()
requestHeap := desiredPeerRequests{
peer: p,
input,
t.getPieceRequestOrder(),
func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) {
- if ih != *t.canonicalShortInfohash() {
+ if ih != t.infoHash {
return
}
if !p.peerHasPiece(pieceIndex) {
return
}
}
- cancelled := &p.requestState.Cancelled
- if !cancelled.IsEmpty() && cancelled.Contains(r) {
+ if p.requestState.Cancelled.Contains(r) {
// Can't re-request while awaiting acknowledgement.
return
}
func(_ context.Context) {
next := p.getDesiredRequestState()
p.applyRequestState(next)
- p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
+ p.t.requestIndexes = next.Requests.requestIndexes[:0]
},
)
}
-func (t *Torrent) cacheNextRequestIndexesForReuse(slice []RequestIndex) {
- // The incoming slice can be smaller when getDesiredRequestState short circuits on some
- // conditions.
- if cap(slice) > cap(t.requestIndexes) {
- t.requestIndexes = slice[:0]
- }
-}
-
-// Whether we should allow sending not interested ("losing interest") to the peer. I noticed
-// qBitTorrent seems to punish us for sending not interested when we're streaming and don't
-// currently need anything.
-func (p *Peer) allowSendNotInterested() bool {
- // Except for caching, we're not likely to lose pieces very soon.
- if p.t.haveAllPieces() {
- return true
- }
- all, known := p.peerHasAllPieces()
- if all || !known {
- return false
- }
- // Allow losing interest if we have all the pieces the peer has.
- return roaring.AndNot(p.peerPieces(), &p.t._completedPieces).IsEmpty()
-}
-
// Transmit/action the request state to the peer.
func (p *Peer) applyRequestState(next desiredRequestState) {
current := &p.requestState
- // Make interest sticky
- if !next.Interested && p.requestState.Interested {
- if !p.allowSendNotInterested() {
- next.Interested = true
- }
- }
if !p.setInterested(next.Interested) {
return
}
more := true
- orig := next.Requests.requestIndexes
- requestHeap := heap.InterfaceForSlice(
- &next.Requests.requestIndexes,
- next.Requests.lessByValue,
- )
+ requestHeap := heap.InterfaceForSlice(&next.Requests.requestIndexes, next.Requests.lessByValue)
heap.Init(requestHeap)
t := p.t
originalRequestCount := current.Requests.GetCardinality()
- for {
- if requestHeap.Len() == 0 {
- break
- }
- numPending := maxRequests(current.Requests.GetCardinality() + current.Cancelled.GetCardinality())
- if numPending >= p.nominalMaxRequests() {
- break
- }
+ // We're either here on a timer, or because we ran out of requests. Both are valid reasons to
+ // alter peakRequests.
+ if originalRequestCount != 0 && p.needRequestUpdate != peerUpdateRequestsTimerReason {
+ panic(fmt.Sprintf(
+ "expected zero existing requests (%v) for update reason %q",
+ originalRequestCount, p.needRequestUpdate))
+ }
+ for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() {
req := heap.Pop(requestHeap)
- if cap(next.Requests.requestIndexes) != cap(orig) {
- panic("changed")
- }
existing := t.requestingPeer(req)
if existing != nil && existing != p {
// Don't steal from the poor.