1 package request_strategy
8 "github.com/anacrolix/multiless"
9 "github.com/anacrolix/torrent/storage"
11 "github.com/anacrolix/torrent/types"
17 Request = types.Request
18 pieceIndex = types.PieceIndex
19 piecePriority = types.PiecePriority
20 // This can be made into a type-param later, will be great for testing.
21 ChunkSpec = types.ChunkSpec
24 type ClientPieceOrder struct{}
26 type filterTorrent struct {
29 // Potentially shared with other torrents.
33 func sortFilterPieces(pieces []filterPiece) {
34 sort.Slice(pieces, func(_i, _j int) bool {
37 return multiless.New().Int(
38 int(j.Priority), int(i.Priority),
42 i.Availability, j.Availability,
45 ).Lazy(func() multiless.Computation {
46 return multiless.New().Cmp(bytes.Compare(
54 type requestsPeer struct {
56 nextState PeerNextRequestState
57 requestablePiecesRemaining int
60 func (rp *requestsPeer) canFitRequest() bool {
61 return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
64 func (rp *requestsPeer) addNextRequest(r RequestIndex) {
65 if !rp.nextState.Requests.CheckedAdd(r) {
66 panic("should only add once")
70 type peersForPieceRequests struct {
75 func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
76 me.requestsPeer.addNextRequest(r)
80 type requestablePiece struct {
85 IterPendingChunks ChunksIterFunc
88 func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
89 return p.t.ChunksPerPiece*uint32(p.index) + c
92 type filterPiece struct {
98 // Calls f with requestable pieces in order.
99 func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
101 for i := range input.Torrents {
102 maxPieces += len(input.Torrents[i].Pieces)
104 pieces := make([]filterPiece, 0, maxPieces)
105 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
106 // TorrentImpl. A nil value means no capacity limit.
107 storageLeft := make(map[storage.TorrentCapacity]*int64)
108 for _t := range input.Torrents {
109 // TODO: We could do metainfo requests here.
111 Torrent: &input.Torrents[_t],
116 if _, ok := storageLeft[key]; !ok {
117 capacity, ok := (*key)()
119 storageLeft[key] = &capacity
121 storageLeft[key] = nil
124 t.storageLeft = storageLeft[key]
126 for i := range t.Pieces {
127 pieces = append(pieces, filterPiece{
134 sortFilterPieces(pieces)
135 var allTorrentsUnverifiedBytes int64
136 for _, piece := range pieces {
137 if left := piece.t.storageLeft; left != nil {
138 if *left < int64(piece.Length) {
141 *left -= int64(piece.Length)
143 if !piece.Request || piece.NumPendingChunks == 0 {
144 // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
145 // considered unverified and hold up further requests.
148 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
151 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
154 piece.t.unverifiedBytes += piece.Length
155 allTorrentsUnverifiedBytes += piece.Length
156 f(piece.t.Torrent, piece.Piece, piece.index)
163 MaxUnverifiedBytes int64
166 // Checks that a sorted peersForPiece slice makes sense.
167 func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
168 if !sort.IsSorted(peers) {
171 peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
172 for _, p := range peers.peersForPiece {
173 if _, ok := peerMap[p]; ok {
176 peerMap[p] = struct{}{}
180 var peersForPiecesPool sync.Pool
182 func makePeersForPiece(cap int) []*peersForPieceRequests {
183 got := peersForPiecesPool.Get()
185 return make([]*peersForPieceRequests, 0, cap)
187 return got.([]*peersForPieceRequests)[:0]
190 type peersForPieceSorter struct {
191 peersForPiece []*peersForPieceRequests
196 func (me *peersForPieceSorter) Len() int {
197 return len(me.peersForPiece)
200 func (me *peersForPieceSorter) Swap(i, j int) {
201 me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
204 func (me *peersForPieceSorter) Less(_i, _j int) bool {
205 i := me.peersForPiece[_i]
206 j := me.peersForPiece[_j]
209 byHasRequest := func() multiless.Computation {
210 ml := multiless.New()
212 iHas := i.nextState.Requests.Contains(*req)
213 jHas := j.nextState.Requests.Contains(*req)
214 ml = ml.Bool(jHas, iHas)
218 ml := multiless.New()
219 // We always "reallocate", that is force even striping amongst peers that are either on
220 // the last piece they can contribute too, or for pieces marked for this behaviour.
221 // Striping prevents starving peers of requests, and will always re-balance to the
222 // fastest known peers.
223 if !p.alwaysReallocate {
225 j.requestablePiecesRemaining == 1,
226 i.requestablePiecesRemaining == 1)
228 if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
233 ml = ml.AndThen(byHasRequest)
236 i.requestablePiecesRemaining,
237 j.requestablePiecesRemaining,
245 ml = ml.AndThen(byHasRequest)
247 int64(j.Age), int64(i.Age),
248 // TODO: Probably peer priority can come next
255 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
256 peersForPiece := makePeersForPiece(len(peers))
257 for _, peer := range peers {
258 if !peer.canRequestPiece(p.index) {
261 if !peer.canFitRequest() {
262 peer.requestablePiecesRemaining--
265 peersForPiece = append(peersForPiece, &peersForPieceRequests{
271 for _, peer := range peersForPiece {
272 peer.requestablePiecesRemaining--
274 peersForPiecesPool.Put(peersForPiece)
276 peersForPieceSorter := peersForPieceSorter{
277 peersForPiece: peersForPiece,
280 sortPeersForPiece := func(req *RequestIndex) {
281 peersForPieceSorter.req = req
282 sort.Sort(&peersForPieceSorter)
283 // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
285 // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
286 // with "next" request state before another request strategy run occurs.
287 preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
288 p.IterPendingChunks(func(spec ChunkIndex) {
289 req := p.chunkIndexToRequestIndex(spec)
290 for _, peer := range peersForPiece {
291 if !peer.ExistingRequests.Contains(req) {
294 if !peer.canFitRequest() {
297 preallocated[spec] = append(preallocated[spec], peer)
298 peer.addNextRequest(req)
301 pendingChunksRemaining := int(p.NumPendingChunks)
302 p.IterPendingChunks(func(chunk ChunkIndex) {
303 if len(preallocated[chunk]) != 0 {
306 req := p.chunkIndexToRequestIndex(chunk)
307 defer func() { pendingChunksRemaining-- }()
308 sortPeersForPiece(nil)
309 for _, peer := range peersForPiece {
310 if !peer.canFitRequest() {
313 if !peer.PieceAllowedFast.ContainsInt(p.index) {
314 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
315 peer.nextState.Interested = true
320 peer.addNextRequest(req)
325 for chunk, prePeers := range preallocated {
326 if len(prePeers) == 0 {
329 pendingChunksRemaining--
330 req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
331 for _, pp := range prePeers {
334 sortPeersForPiece(&req)
335 for _, pp := range prePeers {
336 pp.nextState.Requests.Remove(req)
338 for _, peer := range peersForPiece {
339 if !peer.canFitRequest() {
342 if !peer.PieceAllowedFast.ContainsInt(p.index) {
343 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
344 peer.nextState.Interested = true
349 peer.addNextRequest(req)
353 if pendingChunksRemaining != 0 {
354 panic(pendingChunksRemaining)