if !p.Request {
continue
}
- peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
- for _, peer := range allPeers[p.t] {
- peersForPiece = append(peersForPiece, &peersForPieceRequests{
- requestsInPiece: 0,
- requestsPeer: peer,
- })
- }
- sortPeersForPiece := func() {
- sort.Slice(peersForPiece, func(i, j int) bool {
- return multiless.New().Int(
- peersForPiece[i].requestsInPiece,
- peersForPiece[j].requestsInPiece,
- ).Int(
- peersForPiece[i].requestablePiecesRemaining,
- peersForPiece[j].requestablePiecesRemaining,
- ).Float64(
- peersForPiece[j].DownloadRate,
- peersForPiece[i].DownloadRate,
- ).Int64(
- int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
- // TODO: Probably peer priority can come next
- ).Uintptr(
- peersForPiece[i].Id.Uintptr(),
- peersForPiece[j].Id.Uintptr(),
- ).MustLess()
- })
- }
- pendingChunksRemaining := int(p.NumPendingChunks)
- if f := torrentPiece.IterPendingChunks; f != nil {
- f(func(chunk types.ChunkSpec) {
- req := Request{pp.Integer(p.index), chunk}
- defer func() { pendingChunksRemaining-- }()
- sortPeersForPiece()
- skipped := 0
- // Try up to the number of peers that could legitimately receive the request equal to
- // the number of chunks left. This should ensure that only the best peers serve the last
- // few chunks in a piece.
- lowestNumRequestsInPiece := math.MaxInt16
- for _, peer := range peersForPiece {
- if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) {
- continue
- }
- if skipped+1 >= pendingChunksRemaining {
- break
- }
- if f := peer.HasExistingRequest; f == nil || !f(req) {
- skipped++
- lowestNumRequestsInPiece = peer.requestsInPiece
- continue
- }
- if peer.requestsInPiece > lowestNumRequestsInPiece {
- break
- }
- if !peer.pieceAllowedFastOrDefault(p.index) {
- // We must stay interested for this.
- peer.nextState.Interested = true
- }
- peer.addNextRequest(req)
- return
- }
- for _, peer := range peersForPiece {
- if !peer.canFitRequest() {
- continue
- }
- if !peer.HasPiece(p.index) {
- continue
- }
- if !peer.pieceAllowedFastOrDefault(p.index) {
- // TODO: Verify that's okay to stay uninterested if we request allowed fast
- // pieces.
- peer.nextState.Interested = true
- if peer.Choking {
- continue
- }
- }
- peer.addNextRequest(req)
- return
- }
- })
- }
- if pendingChunksRemaining != 0 {
- panic(pendingChunksRemaining)
- }
- for _, peer := range peersForPiece {
- if peer.canRequestPiece(p.index) {
- peer.requestablePiecesRemaining--
- }
- }
+ allocatePendingChunks(p, allPeers[p.t])
}
ret := make(map[PeerId]PeerNextRequestState)
for _, peers := range allPeers {
}
return ret
}
+
+func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
+ peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
+ for _, peer := range peers {
+ peersForPiece = append(peersForPiece, &peersForPieceRequests{
+ requestsInPiece: 0,
+ requestsPeer: peer,
+ })
+ }
+ sortPeersForPiece := func() {
+ sort.Slice(peersForPiece, func(i, j int) bool {
+ return multiless.New().Int(
+ peersForPiece[i].requestsInPiece,
+ peersForPiece[j].requestsInPiece,
+ ).Int(
+ peersForPiece[i].requestablePiecesRemaining,
+ peersForPiece[j].requestablePiecesRemaining,
+ ).Float64(
+ peersForPiece[j].DownloadRate,
+ peersForPiece[i].DownloadRate,
+ ).Int64(
+ int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
+ // TODO: Probably peer priority can come next
+ ).Uintptr(
+ peersForPiece[i].Id.Uintptr(),
+ peersForPiece[j].Id.Uintptr(),
+ ).MustLess()
+ })
+ }
+ pendingChunksRemaining := int(p.NumPendingChunks)
+ if f := p.IterPendingChunks; f != nil {
+ f(func(chunk types.ChunkSpec) {
+ req := Request{pp.Integer(p.index), chunk}
+ defer func() { pendingChunksRemaining-- }()
+ sortPeersForPiece()
+ skipped := 0
+ // Try up to the number of peers that could legitimately receive the request equal to
+ // the number of chunks left. This should ensure that only the best peers serve the last
+ // few chunks in a piece.
+ lowestNumRequestsInPiece := math.MaxInt16
+ for _, peer := range peersForPiece {
+ if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) {
+ continue
+ }
+ if skipped+1 >= pendingChunksRemaining {
+ break
+ }
+ if f := peer.HasExistingRequest; f == nil || !f(req) {
+ skipped++
+ lowestNumRequestsInPiece = peer.requestsInPiece
+ continue
+ }
+ if peer.requestsInPiece > lowestNumRequestsInPiece {
+ break
+ }
+ if !peer.pieceAllowedFastOrDefault(p.index) {
+ // We must stay interested for this.
+ peer.nextState.Interested = true
+ }
+ peer.addNextRequest(req)
+ return
+ }
+ for _, peer := range peersForPiece {
+ if !peer.canFitRequest() {
+ continue
+ }
+ if !peer.HasPiece(p.index) {
+ continue
+ }
+ if !peer.pieceAllowedFastOrDefault(p.index) {
+ // TODO: Verify that's okay to stay uninterested if we request allowed fast
+ // pieces.
+ peer.nextState.Interested = true
+ if peer.Choking {
+ continue
+ }
+ }
+ peer.addNextRequest(req)
+ return
+ }
+ })
+ }
+ if pendingChunksRemaining != 0 {
+ panic(pendingChunksRemaining)
+ }
+ for _, peer := range peersForPiece {
+ if peer.canRequestPiece(p.index) {
+ peer.requestablePiecesRemaining--
+ }
+ }
+}