package request_strategy
import (
+ "fmt"
"sort"
"github.com/anacrolix/multiless"
+
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/types"
)
peers = append(peers, &requestsPeer{
Peer: p,
nextState: PeerNextRequestState{
- Requests: make(map[Request]struct{}),
+ Requests: make(map[Request]struct{}, p.MaxRequests),
},
})
}
if rp.requestablePiecesRemaining != 0 {
panic(rp.requestablePiecesRemaining)
}
+ if _, ok := ret[rp.Id]; ok {
+ panic(fmt.Sprintf("duplicate peer id: %v", rp.Id))
+ }
ret[rp.Id] = rp.nextState
}
}
return ret
}
+// Checks that a sorted peersForPiece slice makes sense.
+func ensureValidSortedPeersForPieceRequests(peers []*peersForPieceRequests, sortLess func(_, _ int) bool) {
+ if !sort.SliceIsSorted(peers, sortLess) {
+ panic("not sorted")
+ }
+ peerMap := make(map[*peersForPieceRequests]struct{}, len(peers))
+ for _, p := range peers {
+ if _, ok := peerMap[p]; ok {
+ panic(p)
+ }
+ peerMap[p] = struct{}{}
+ }
+}
+
func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
for _, peer := range peers {
}
}()
sortPeersForPiece := func(req *Request) {
- sort.Slice(peersForPiece, func(i, j int) bool {
+ less := func(i, j int) bool {
byHasRequest := func() multiless.Computation {
ml := multiless.New()
if req != nil {
peersForPiece[i].Id.Uintptr(),
peersForPiece[j].Id.Uintptr(),
).MustLess()
- })
+ }
+ sort.Slice(peersForPiece, less)
+ ensureValidSortedPeersForPieceRequests(peersForPiece, less)
}
- preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
+ // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
+ // with "next" request state before another request strategy run occurs.
+ preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks)
p.IterPendingChunks(func(spec ChunkSpec) {
req := Request{pp.Integer(p.index), spec}
for _, peer := range peersForPiece {
if !peer.canRequestPiece(p.index) {
continue
}
- preallocated[spec] = peer
+ preallocated[spec] = append(preallocated[spec], peer)
peer.addNextRequest(req)
}
})
}
})
chunk:
- for chunk, prePeer := range preallocated {
+ for chunk, prePeers := range preallocated {
pendingChunksRemaining--
req := Request{pp.Integer(p.index), chunk}
- prePeer.requestsInPiece--
+ for _, pp := range prePeers {
+ pp.requestsInPiece--
+ }
sortPeersForPiece(&req)
- delete(prePeer.nextState.Requests, req)
+ for _, pp := range prePeers {
+ delete(pp.nextState.Requests, req)
+ }
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
"math"
"testing"
- pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/bradfitz/iter"
qt "github.com/frankban/quicktest"
+
+ pp "github.com/anacrolix/torrent/peer_protocol"
)
func r(i pieceIndex, begin int) Request {
Requests: requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)),
})
}
+
+// This tests a situation where multiple peers had the same existing request, due to "actual" and
+// "next" request states being out of sync. This reasonable occurs when a peer hasn't fully updated
+// its actual request state since the last request strategy run.
+func TestDuplicatePreallocations(t *testing.T) {
+ peer := func(id int, downloadRate float64) Peer {
+ return Peer{
+ HasExistingRequest: func(r Request) bool {
+ return true
+ },
+ MaxRequests: 2,
+ HasPiece: func(i pieceIndex) bool {
+ return true
+ },
+ Id: intPeerId(id),
+ DownloadRate: downloadRate,
+ }
+ }
+ results := Run(Input{
+ Torrents: []Torrent{{
+ Pieces: []Piece{{
+ Request: true,
+ NumPendingChunks: 1,
+ IterPendingChunks: chunkIterRange(1),
+ }, {
+ Request: true,
+ NumPendingChunks: 1,
+ IterPendingChunks: chunkIterRange(1),
+ }},
+ Peers: []Peer{
+ // The second peer was be marked as the preallocation, clobbering the first. The
+ // first peer is preferred, and the piece isn't striped, so it gets preallocated a
+ // request, and then gets reallocated from the peer the same request.
+ peer(1, 2),
+ peer(2, 1),
+ },
+ }},
+ })
+ c := qt.New(t)
+ c.Assert(2, qt.Equals, len(results[intPeerId(1)].Requests)+len(results[intPeerId(2)].Requests))
+}