"container/heap"
"context"
"encoding/gob"
- "math/rand"
"reflect"
"runtime/pprof"
"time"
t := p.peer.t
leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
- leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
- rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
- pending := func(index RequestIndex, current bool) int {
- ret := t.pendingRequests.Get(index)
- if current {
- ret--
- }
- // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
- // resolved.
- if ret < 0 {
- panic(ret)
- }
- return ret
- }
ml := multiless.New()
// Push requests that can't be served right now to the end. But we don't throw them away unless
// there's a better alternative. This is for when we're using the fast extension and get choked
!p.peer.peerAllowedFast.Contains(rightPieceIndex),
)
}
- ml = ml.Int(
- pending(leftRequest, leftCurrent),
- pending(rightRequest, rightCurrent))
- ml = ml.Bool(!leftCurrent, !rightCurrent)
+ leftPeer := t.pendingRequests[leftRequest]
+ rightPeer := t.pendingRequests[rightRequest]
+ ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer)
+ ml = ml.Bool(rightPeer == nil, leftPeer == nil)
+ if ml.Ok() {
+ return ml.MustLess()
+ }
+ if leftPeer != nil {
+ ml = ml.Uint64(
+ rightPeer.actualRequestState.Requests.GetCardinality(),
+ leftPeer.actualRequestState.Requests.GetCardinality(),
+ )
+ }
+ ml = ml.CmpInt64(t.lastRequested[rightRequest].Sub(t.lastRequested[leftRequest]).Nanoseconds())
leftPiece := t.piece(int(leftPieceIndex))
rightPiece := t.piece(int(rightPieceIndex))
ml = ml.Int(
+ // Technically we would be happy with the cached priority here, except we don't actually
+ // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve
+ // the priority through Piece.purePriority, which is probably slower.
-int(leftPiece.purePriority()),
-int(rightPiece.purePriority()),
)
ml = ml.Int(
int(leftPiece.availability),
int(rightPiece.availability))
- leftLastRequested := p.peer.t.lastRequested[leftRequest]
- rightLastRequested := p.peer.t.lastRequested[rightRequest]
- ml = ml.EagerSameLess(
- leftLastRequested.Equal(rightLastRequested),
- leftLastRequested.Before(rightLastRequested),
- )
- ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
- ml = ml.Uint32(leftRequest, rightRequest)
- return ml.MustLess()
+ return ml.Less()
}
func (p *peerRequests) Swap(i, j int) {
}
type desiredRequestState struct {
- Requests []RequestIndex
+ Requests peerRequests
Interested bool
}
// return
// }
if !allowedFast {
- // We must signal interest to request this
+ // We must signal interest to request this. TODO: We could set interested if the
+ // peers pieces (minus the allowed fast set) overlap with our missing pieces if
+ // there are any readers, or any pending pieces.
desired.Interested = true
// We can make or will allow sustaining a request here if we're not choked, or
// have made the request previously (presumably while unchoked), and haven't had
return
}
}
- // Note that we can still be interested if we filter all requests due to being
- // recently requested from another peer.
- if !p.actualRequestState.Requests.Contains(r) {
- if time.Since(p.t.lastRequested[r]) < time.Second {
- return
- }
- }
requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
})
},
)
p.t.assertPendingRequests()
- heap.Init(&requestHeap)
- for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
- requestIndex := heap.Pop(&requestHeap).(RequestIndex)
- desired.Requests = append(desired.Requests, requestIndex)
- }
+ desired.Requests = requestHeap
return
}
return false
}
more := true
- cancel := current.Requests.Clone()
- for _, ri := range next.Requests {
- cancel.Remove(ri)
- }
- cancel.Iterate(func(req uint32) bool {
- more = p.cancel(req)
- return more
- })
- if !more {
- return false
- }
- shuffled := false
- lastPending := 0
- for i := 0; i < len(next.Requests); i++ {
- req := next.Requests[i]
+ requestHeap := &next.Requests
+ heap.Init(requestHeap)
+ for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() {
+ req := heap.Pop(requestHeap).(RequestIndex)
if p.cancelledRequests.Contains(req) {
// Waiting for a reject or piece message, which will suitably trigger us to update our
// requests, so we can skip this one with no additional consideration.
continue
}
- // The cardinality of our desired requests shouldn't exceed the max requests since it's used
- // in the calculation of the requests. However, if we cancelled requests and they haven't
- // been rejected or serviced yet with the fast extension enabled, we can end up with more
- // extra outstanding requests. We could subtract the number of outstanding cancels from the
- // next request cardinality, but peers might not like that.
- if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
- // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
- // next.Requests.GetCardinality(),
- // p.cancelledRequests.GetCardinality(),
- // current.Requests.GetCardinality(),
- // p.nominalMaxRequests(),
- // )
- break
- }
- otherPending := p.t.pendingRequests.Get(next.Requests[0])
- if p.actualRequestState.Requests.Contains(next.Requests[0]) {
- otherPending--
+ existing := p.t.pendingRequests[req]
+ if existing != nil && existing != p && existing.actualRequestState.Requests.GetCardinality()-existing.cancelledRequests.GetCardinality() > current.Requests.GetCardinality() {
+ existing.cancel(req)
}
- if otherPending < lastPending {
- // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
- // doesn't, our shuffling condition could be wrong.
- panic(lastPending)
- }
- // If the request has already been requested by another peer, shuffle this and the rest of
- // the requests (since according to the increasing condition, the rest of the indices
- // already have an outstanding request with another peer).
- if !shuffled && otherPending > 0 {
- shuffleReqs := next.Requests[i:]
- rand.Shuffle(len(shuffleReqs), func(i, j int) {
- shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
- })
- // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
- shuffled = true
- // Repeat this index
- i--
- continue
- }
-
more = p.mustRequest(req)
if !more {
break