package request_strategy
import (
- "math"
"sort"
"github.com/anacrolix/multiless"
return len(rp.nextState.Requests) < rp.MaxRequests
}
-// Returns true if it is added and wasn't there before.
-func (rp *requestsPeer) addNextRequest(r Request) bool {
+func (rp *requestsPeer) addNextRequest(r Request) {
_, ok := rp.nextState.Requests[r]
if ok {
- return false
+ panic("should only add once")
}
rp.nextState.Requests[r] = struct{}{}
- return true
}
type peersForPieceRequests struct {
}
func (me *peersForPieceRequests) addNextRequest(r Request) {
- if me.requestsPeer.addNextRequest(r) {
- me.requestsInPiece++
- }
+ me.requestsPeer.addNextRequest(r)
+ me.requestsInPiece++
}
type Torrent struct {
requestsPeer: peer,
})
}
- sortPeersForPiece := func() {
+ defer func() {
+ for _, peer := range peersForPiece {
+ if peer.canRequestPiece(p.index) {
+ peer.requestablePiecesRemaining--
+ }
+ }
+ }()
+ sortPeersForPiece := func(byHasRequest *Request) {
sort.Slice(peersForPiece, func(i, j int) bool {
- return multiless.New().Int(
+ ml := multiless.New().Int(
peersForPiece[i].requestsInPiece,
peersForPiece[j].requestsInPiece,
).Int(
).Float64(
peersForPiece[j].DownloadRate,
peersForPiece[i].DownloadRate,
- ).Int64(
+ )
+ if byHasRequest != nil {
+ _, iHas := peersForPiece[i].nextState.Requests[*byHasRequest]
+ _, jHas := peersForPiece[j].nextState.Requests[*byHasRequest]
+ ml = ml.Bool(jHas, iHas)
+ }
+ return ml.Int64(
int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
// TODO: Probably peer priority can come next
).Uintptr(
).MustLess()
})
}
+ preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
+ p.iterPendingChunksWrapper(func(spec ChunkSpec) {
+ req := Request{pp.Integer(p.index), spec}
+ for _, p := range peersForPiece {
+ if h := p.HasExistingRequest; h != nil && h(req) {
+ preallocated[spec] = p
+ p.addNextRequest(req)
+ }
+ }
+ })
pendingChunksRemaining := int(p.NumPendingChunks)
- if f := p.IterPendingChunks; f != nil {
- f(func(chunk types.ChunkSpec) {
- req := Request{pp.Integer(p.index), chunk}
- defer func() { pendingChunksRemaining-- }()
- sortPeersForPiece()
- skipped := 0
- // Try up to the number of peers that could legitimately receive the request equal to
- // the number of chunks left. This should ensure that only the best peers serve the last
- // few chunks in a piece.
- lowestNumRequestsInPiece := math.MaxInt16
- for _, peer := range peersForPiece {
- if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) {
- continue
- }
- if skipped+1 >= pendingChunksRemaining {
- break
- }
- if f := peer.HasExistingRequest; f == nil || !f(req) {
- skipped++
- lowestNumRequestsInPiece = peer.requestsInPiece
- continue
- }
- if peer.requestsInPiece > lowestNumRequestsInPiece {
- break
- }
- if !peer.pieceAllowedFastOrDefault(p.index) {
- // We must stay interested for this.
- peer.nextState.Interested = true
- }
- peer.addNextRequest(req)
- return
+ p.iterPendingChunksWrapper(func(chunk types.ChunkSpec) {
+ if _, ok := preallocated[chunk]; ok {
+ return
+ }
+ req := Request{pp.Integer(p.index), chunk}
+ defer func() { pendingChunksRemaining-- }()
+ sortPeersForPiece(nil)
+ for _, peer := range peersForPiece {
+ if !peer.canFitRequest() {
+ continue
}
- for _, peer := range peersForPiece {
- if !peer.canFitRequest() {
+ if !peer.HasPiece(p.index) {
+ continue
+ }
+ if !peer.pieceAllowedFastOrDefault(p.index) {
+ // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
+ peer.nextState.Interested = true
+ if peer.Choking {
continue
}
- if !peer.HasPiece(p.index) {
+ }
+ peer.addNextRequest(req)
+ return
+ }
+ })
+chunk:
+ for chunk, prePeer := range preallocated {
+ req := Request{pp.Integer(p.index), chunk}
+ prePeer.requestsInPiece--
+ sortPeersForPiece(&req)
+ delete(prePeer.nextState.Requests, req)
+ for _, peer := range peersForPiece {
+ if !peer.canFitRequest() {
+ continue
+ }
+ if !peer.HasPiece(p.index) {
+ continue
+ }
+ if !peer.pieceAllowedFastOrDefault(p.index) {
+ // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
+ peer.nextState.Interested = true
+ if peer.Choking {
continue
}
- if !peer.pieceAllowedFastOrDefault(p.index) {
- // TODO: Verify that's okay to stay uninterested if we request allowed fast
- // pieces.
- peer.nextState.Interested = true
- if peer.Choking {
- continue
- }
- }
- peer.addNextRequest(req)
- return
}
- })
+ pendingChunksRemaining--
+ peer.addNextRequest(req)
+ continue chunk
+ }
}
if pendingChunksRemaining != 0 {
panic(pendingChunksRemaining)
}
- for _, peer := range peersForPiece {
- if peer.canRequestPiece(p.index) {
- peer.requestablePiecesRemaining--
- }
- }
}
"testing"
pp "github.com/anacrolix/torrent/peer_protocol"
+ "github.com/bradfitz/iter"
qt "github.com/frankban/quicktest"
)
return Request{pp.Integer(i), ChunkSpec{pp.Integer(begin), 1}}
}
+func chunkIterRange(end int) func(func(ChunkSpec)) {
+ return func(f func(ChunkSpec)) {
+ for offset := range iter.N(end) {
+ f(ChunkSpec{pp.Integer(offset), 1})
+ }
+ }
+}
+
func chunkIter(offsets ...int) func(func(ChunkSpec)) {
return func(f func(ChunkSpec)) {
for _, offset := range offsets {
Pieces: []Piece{{
Request: true,
NumPendingChunks: 5,
- IterPendingChunks: chunkIter(0, 1, 2, 3, 4),
+ IterPendingChunks: chunkIterRange(5),
}},
Peers: []Peer{
stealee,
Requests: requestSetFromSlice(keepReq),
})
}
+
+func TestDontStealUnnecessarily(t *testing.T) {
+ c := qt.New(t)
+ order := ClientPieceOrder{}
+ basePeer := Peer{
+ HasPiece: func(i pieceIndex) bool {
+ return true
+ },
+ MaxRequests: math.MaxInt16,
+ DownloadRate: 2,
+ }
+ // Slower than the stealers, but has all requests already.
+ stealee := basePeer
+ stealee.DownloadRate = 1
+ keepReqs := requestSetFromSlice(r(0, 0), r(0, 1), r(0, 2))
+ stealee.HasExistingRequest = func(r Request) bool {
+ _, ok := keepReqs[r]
+ return ok
+ }
+ stealee.Id = intPeerId(1)
+ firstStealer := basePeer
+ firstStealer.Id = intPeerId(2)
+ secondStealer := basePeer
+ secondStealer.Id = intPeerId(3)
+ results := order.DoRequests([]*Torrent{{
+ Pieces: []Piece{{
+ Request: true,
+ NumPendingChunks: 9,
+ IterPendingChunks: chunkIterRange(9),
+ }},
+ Peers: []Peer{
+ firstStealer,
+ stealee,
+ secondStealer,
+ },
+ }})
+ c.Assert(results, qt.HasLen, 3)
+ check := func(p PeerId, l int) {
+ c.Check(results[p].Requests, qt.HasLen, l)
+ c.Check(results[p].Interested, qt.Equals, l > 0)
+ }
+ check(firstStealer.Id, 3)
+ check(secondStealer.Id, 3)
+ c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{
+ Interested: true,
+ Requests: keepReqs,
+ })
+}