- shuffled := false
- lastPending := 0
- for i := 0; i < len(next.Requests); i++ {
- req := next.Requests[i]
- if p.cancelledRequests.Contains(req) {
- // Waiting for a reject or piece message, which will suitably trigger us to update our
- // requests, so we can skip this one with no additional consideration.
- continue
- }
- // The cardinality of our desired requests shouldn't exceed the max requests since it's used
- // in the calculation of the requests. However, if we cancelled requests and they haven't
- // been rejected or serviced yet with the fast extension enabled, we can end up with more
- // extra outstanding requests. We could subtract the number of outstanding cancels from the
- // next request cardinality, but peers might not like that.
- if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
- // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
- // next.Requests.GetCardinality(),
- // p.cancelledRequests.GetCardinality(),
- // current.Requests.GetCardinality(),
- // p.nominalMaxRequests(),
- // )
- break
- }
- otherPending := p.t.pendingRequests.Get(next.Requests[0])
- if p.actualRequestState.Requests.Contains(next.Requests[0]) {
- otherPending--
- }
- if otherPending < lastPending {
- // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
- // doesn't, our shuffling condition could be wrong.
- panic(lastPending)
- }
- // If the request has already been requested by another peer, shuffle this and the rest of
- // the requests (since according to the increasing condition, the rest of the indices
- // already have an outstanding request with another peer).
- if !shuffled && otherPending > 0 {
- shuffleReqs := next.Requests[i:]
- rand.Shuffle(len(shuffleReqs), func(i, j int) {
- shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
- })
- // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
- shuffled = true
- // Repeat this index
- i--
- continue
+ for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() {
+ req := heap.Pop(requestHeap)
+ existing := t.requestingPeer(req)
+ if existing != nil && existing != p {
+ // Don't steal from the poor.
+ diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1)
+ // Steal a request that leaves us with one more request than the existing peer
+ // connection if the stealer more recently received a chunk.
+ if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) {
+ continue
+ }
+ t.cancelRequest(req)