1 package request_strategy
8 "github.com/anacrolix/multiless"
10 pp "github.com/anacrolix/torrent/peer_protocol"
11 "github.com/anacrolix/torrent/types"
15 Request = types.Request
16 pieceIndex = types.PieceIndex
17 piecePriority = types.PiecePriority
18 // This can be made into a type-param later, will be great for testing.
19 ChunkSpec = types.ChunkSpec
22 type ClientPieceOrder struct{}
24 type filterTorrent struct {
27 // Potentially shared with other torrents.
31 func sortFilterPieces(pieces []filterPiece) {
32 sort.Slice(pieces, func(_i, _j int) bool {
35 return multiless.New().Int(
36 int(j.Priority), int(i.Priority),
40 i.Availability, j.Availability,
44 i.t.StableId, j.t.StableId,
49 type requestsPeer struct {
51 nextState PeerNextRequestState
52 requestablePiecesRemaining int
55 func (rp *requestsPeer) canFitRequest() bool {
56 return len(rp.nextState.Requests) < rp.MaxRequests
59 func (rp *requestsPeer) addNextRequest(r Request) {
60 _, ok := rp.nextState.Requests[r]
62 panic("should only add once")
64 rp.nextState.Requests[r] = struct{}{}
67 type peersForPieceRequests struct {
72 func (me *peersForPieceRequests) addNextRequest(r Request) {
73 me.requestsPeer.addNextRequest(r)
77 type requestablePiece struct {
82 IterPendingChunks ChunksIter
85 type filterPiece struct {
91 func getRequestablePieces(input Input) (ret []requestablePiece) {
93 for i := range input.Torrents {
94 maxPieces += len(input.Torrents[i].Pieces)
96 pieces := make([]filterPiece, 0, maxPieces)
97 ret = make([]requestablePiece, 0, maxPieces)
98 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
100 storageLeft := make(map[*func() *int64]*int64)
101 for _t := range input.Torrents {
102 // TODO: We could do metainfo requests here.
104 Torrent: &input.Torrents[_t],
109 if _, ok := storageLeft[key]; !ok {
110 storageLeft[key] = (*key)()
112 t.storageLeft = storageLeft[key]
114 for i := range t.Pieces {
115 pieces = append(pieces, filterPiece{
122 sortFilterPieces(pieces)
123 var allTorrentsUnverifiedBytes int64
124 for _, piece := range pieces {
125 if left := piece.t.storageLeft; left != nil {
126 if *left < int64(piece.Length) {
129 *left -= int64(piece.Length)
131 if !piece.Request || piece.NumPendingChunks == 0 {
132 // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
133 // considered unverified and hold up further requests.
136 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
139 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
142 piece.t.unverifiedBytes += piece.Length
143 allTorrentsUnverifiedBytes += piece.Length
144 ret = append(ret, requestablePiece{
147 NumPendingChunks: piece.NumPendingChunks,
148 IterPendingChunks: piece.iterPendingChunksWrapper,
149 alwaysReallocate: piece.Priority >= types.PiecePriorityNext,
157 MaxUnverifiedBytes int64
160 // TODO: We could do metainfo requests here.
161 func Run(input Input) map[PeerId]PeerNextRequestState {
162 requestPieces := getRequestablePieces(input)
163 torrents := input.Torrents
164 allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
165 for _, t := range torrents {
166 peers := make([]*requestsPeer, 0, len(t.Peers))
167 for _, p := range t.Peers {
168 peers = append(peers, &requestsPeer{
170 nextState: PeerNextRequestState{
171 Requests: make(map[Request]struct{}, p.MaxRequests),
175 allPeers[t.StableId] = peers
177 for _, piece := range requestPieces {
178 for _, peer := range allPeers[piece.t.StableId] {
179 if peer.canRequestPiece(piece.index) {
180 peer.requestablePiecesRemaining++
184 for _, piece := range requestPieces {
185 allocatePendingChunks(piece, allPeers[piece.t.StableId])
187 ret := make(map[PeerId]PeerNextRequestState)
188 for _, peers := range allPeers {
189 for _, rp := range peers {
190 if rp.requestablePiecesRemaining != 0 {
191 panic(rp.requestablePiecesRemaining)
193 if _, ok := ret[rp.Id]; ok {
194 panic(fmt.Sprintf("duplicate peer id: %v", rp.Id))
196 ret[rp.Id] = rp.nextState
202 // Checks that a sorted peersForPiece slice makes sense.
203 func ensureValidSortedPeersForPieceRequests(peers peersForPieceSorter) {
204 if !sort.IsSorted(peers) {
207 peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
208 for _, p := range peers.peersForPiece {
209 if _, ok := peerMap[p]; ok {
212 peerMap[p] = struct{}{}
216 var peersForPiecesPool sync.Pool
218 func makePeersForPiece(cap int) []*peersForPieceRequests {
219 got := peersForPiecesPool.Get()
221 return make([]*peersForPieceRequests, 0, cap)
223 return got.([]*peersForPieceRequests)[:0]
226 type peersForPieceSorter struct {
227 peersForPiece []*peersForPieceRequests
232 func (me peersForPieceSorter) Len() int {
233 return len(me.peersForPiece)
236 func (me peersForPieceSorter) Swap(i, j int) {
237 me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
240 func (me peersForPieceSorter) Less(_i, _j int) bool {
241 i := me.peersForPiece[_i]
242 j := me.peersForPiece[_j]
245 byHasRequest := func() multiless.Computation {
246 ml := multiless.New()
248 _, iHas := i.nextState.Requests[*req]
249 _, jHas := j.nextState.Requests[*req]
250 ml = ml.Bool(jHas, iHas)
254 ml := multiless.New()
255 // We always "reallocate", that is force even striping amongst peers that are either on
256 // the last piece they can contribute too, or for pieces marked for this behaviour.
257 // Striping prevents starving peers of requests, and will always re-balance to the
258 // fastest known peers.
259 if !p.alwaysReallocate {
261 j.requestablePiecesRemaining == 1,
262 i.requestablePiecesRemaining == 1)
264 if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
269 ml = ml.AndThen(byHasRequest)
272 i.requestablePiecesRemaining,
273 j.requestablePiecesRemaining,
278 ml = ml.AndThen(byHasRequest)
280 int64(j.Age), int64(i.Age),
281 // TODO: Probably peer priority can come next
288 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
289 peersForPiece := makePeersForPiece(len(peers))
290 for _, peer := range peers {
291 peersForPiece = append(peersForPiece, &peersForPieceRequests{
297 for _, peer := range peersForPiece {
298 if peer.canRequestPiece(p.index) {
299 peer.requestablePiecesRemaining--
302 peersForPiecesPool.Put(peersForPiece)
304 peersForPieceSorter := peersForPieceSorter{
305 peersForPiece: peersForPiece,
308 sortPeersForPiece := func(req *Request) {
309 peersForPieceSorter.req = req
310 sort.Sort(&peersForPieceSorter)
311 //ensureValidSortedPeersForPieceRequests(peersForPieceSorter)
313 // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
314 // with "next" request state before another request strategy run occurs.
315 preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks)
316 p.IterPendingChunks(func(spec ChunkSpec) {
317 req := Request{pp.Integer(p.index), spec}
318 for _, peer := range peersForPiece {
319 if h := peer.HasExistingRequest; h == nil || !h(req) {
322 if !peer.canFitRequest() {
325 if !peer.canRequestPiece(p.index) {
328 preallocated[spec] = append(preallocated[spec], peer)
329 peer.addNextRequest(req)
332 pendingChunksRemaining := int(p.NumPendingChunks)
333 p.IterPendingChunks(func(chunk types.ChunkSpec) {
334 if _, ok := preallocated[chunk]; ok {
337 req := Request{pp.Integer(p.index), chunk}
338 defer func() { pendingChunksRemaining-- }()
339 sortPeersForPiece(nil)
340 for _, peer := range peersForPiece {
341 if !peer.canFitRequest() {
344 if !peer.HasPiece(p.index) {
347 if !peer.pieceAllowedFastOrDefault(p.index) {
348 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
349 peer.nextState.Interested = true
354 peer.addNextRequest(req)
359 for chunk, prePeers := range preallocated {
360 pendingChunksRemaining--
361 req := Request{pp.Integer(p.index), chunk}
362 for _, pp := range prePeers {
365 sortPeersForPiece(&req)
366 for _, pp := range prePeers {
367 delete(pp.nextState.Requests, req)
369 for _, peer := range peersForPiece {
370 if !peer.canFitRequest() {
373 if !peer.HasPiece(p.index) {
376 if !peer.pieceAllowedFastOrDefault(p.index) {
377 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
378 peer.nextState.Interested = true
383 peer.addNextRequest(req)
387 if pendingChunksRemaining != 0 {
388 panic(pendingChunksRemaining)