1 package request_strategy
10 "github.com/anacrolix/multiless"
11 "github.com/anacrolix/torrent/metainfo"
12 "github.com/google/btree"
14 "github.com/anacrolix/torrent/types"
20 Request = types.Request
21 pieceIndex = types.PieceIndex
22 piecePriority = types.PiecePriority
23 // This can be made into a type-param later, will be great for testing.
24 ChunkSpec = types.ChunkSpec
27 type ClientPieceOrder struct{}
29 func equalFilterPieces(l, r []filterPiece) bool {
36 if lp.Priority != rp.Priority ||
37 lp.Partial != rp.Partial ||
38 lp.Availability != rp.Availability ||
39 lp.index != rp.index ||
40 lp.t.InfoHash != rp.t.InfoHash {
47 type pieceSorter struct {
49 get func(i int) *filterPiece
53 func (me pieceSorter) Len() int {
57 func (me pieceSorter) Swap(i, j int) {
61 func (me pieceSorter) Less(_i, _j int) bool {
64 return pieceOrderLess(i.toPieceOrderInput(), j.toPieceOrderInput()).MustLess()
67 type pieceOrderInput struct {
68 PieceRequestOrderState
72 func pieceOrderLess(i, j pieceOrderInput) multiless.Computation {
73 return multiless.New().Int(
74 int(j.Priority), int(i.Priority),
78 i.Availability, j.Availability,
81 ).Lazy(func() multiless.Computation {
82 return multiless.New().Cmp(bytes.Compare(
89 type requestsPeer struct {
91 nextState PeerNextRequestState
92 requestablePiecesRemaining int
95 func (rp *requestsPeer) canFitRequest() bool {
96 return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
99 func (rp *requestsPeer) addNextRequest(r RequestIndex) {
100 if !rp.nextState.Requests.CheckedAdd(r) {
101 panic("should only add once")
105 type peersForPieceRequests struct {
110 func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
111 me.requestsPeer.addNextRequest(r)
115 type requestablePiece struct {
118 alwaysReallocate bool
120 IterPendingChunks ChunksIterFunc
123 func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
124 return p.t.ChunksPerPiece*uint32(p.index) + c
127 type filterPiece struct {
133 func (fp *filterPiece) toPieceOrderInput() (ret pieceOrderInput) {
134 ret.Partial = fp.Partial
135 ret.InfoHash = fp.t.InfoHash
136 ret.Availability = fp.Availability
137 ret.Priority = fp.Priority
144 sorts = map[*[]filterPiece][]int{}
147 func reorderedFilterPieces(pieces []filterPiece, indices []int) (ret []filterPiece) {
148 ret = make([]filterPiece, len(indices))
149 for i, j := range indices {
155 var packageExpvarMap = expvar.NewMap("request-strategy")
157 func getSortedFilterPieces(unsorted []filterPiece) []filterPiece {
158 const cachePieceSorts = false
159 if !cachePieceSorts {
160 sort.Sort(pieceSorter{
162 swap: func(i, j int) {
163 unsorted[i], unsorted[j] = unsorted[j], unsorted[i]
165 get: func(i int) *filterPiece {
172 defer sortsMu.Unlock()
173 for key, order := range sorts {
174 if equalFilterPieces(*key, unsorted) {
175 packageExpvarMap.Add("reused filter piece ordering", 1)
176 return reorderedFilterPieces(unsorted, order)
179 indices := make([]int, len(unsorted))
180 for i := 0; i < len(indices); i++ {
183 sort.Sort(pieceSorter{
185 swap: func(i, j int) {
186 indices[i], indices[j] = indices[j], indices[i]
188 get: func(i int) *filterPiece {
189 return &unsorted[indices[i]]
192 packageExpvarMap.Add("added filter piece ordering", 1)
193 sorts[&unsorted] = indices
194 runtime.SetFinalizer(&pieceOrderingFinalizer{unsorted: &unsorted}, func(me *pieceOrderingFinalizer) {
195 packageExpvarMap.Add("finalized filter piece ordering", 1)
197 defer sortsMu.Unlock()
198 delete(sorts, me.unsorted)
200 return reorderedFilterPieces(unsorted, indices)
203 type pieceOrderingFinalizer struct {
204 unsorted *[]filterPiece
207 // Calls f with requestable pieces in order.
208 func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(t *Torrent, p *Piece, pieceIndex int)) {
209 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
210 // TorrentImpl. A nil value means no capacity limit.
211 var storageLeft *int64
212 if input.Capacity != nil {
213 storageLeft = new(int64)
214 *storageLeft = *input.Capacity
216 var allTorrentsUnverifiedBytes int64
217 torrentUnverifiedBytes := map[metainfo.Hash]int64{}
218 pro.tree.Ascend(func(i btree.Item) bool {
219 _i := i.(pieceRequestOrderItem)
220 var t Torrent = input.Torrents[_i.key.InfoHash]
221 var piece *Piece = &t.Pieces[_i.key.Index]
222 if left := storageLeft; left != nil {
223 if *left < piece.Length {
226 *left -= piece.Length
228 if !piece.Request || piece.NumPendingChunks == 0 {
229 // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
230 // considered unverified and hold up further requests.
234 if t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[t.InfoHash]+piece.Length > t.MaxUnverifiedBytes {
237 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
240 torrentUnverifiedBytes[t.InfoHash] += piece.Length
241 allTorrentsUnverifiedBytes += piece.Length
242 f(&t, piece, _i.key.Index)
249 // This is all torrents that share the same capacity below (or likely a single torrent if there
250 // is infinite capacity, since you could just run it separately for each Torrent if that's the
252 Torrents map[metainfo.Hash]Torrent
253 // Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents
254 // that share the same capacity key must be incorporated in piece ordering.
256 // Across all the Torrents. This might be partitioned by storage capacity key now.
257 MaxUnverifiedBytes int64
260 // Checks that a sorted peersForPiece slice makes sense.
261 func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
262 if !sort.IsSorted(peers) {
265 peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
266 for _, p := range peers.peersForPiece {
267 if _, ok := peerMap[p]; ok {
270 peerMap[p] = struct{}{}
274 var peersForPiecesPool sync.Pool
276 func makePeersForPiece(cap int) []*peersForPieceRequests {
277 got := peersForPiecesPool.Get()
279 return make([]*peersForPieceRequests, 0, cap)
281 return got.([]*peersForPieceRequests)[:0]
284 type peersForPieceSorter struct {
285 peersForPiece []*peersForPieceRequests
290 func (me *peersForPieceSorter) Len() int {
291 return len(me.peersForPiece)
294 func (me *peersForPieceSorter) Swap(i, j int) {
295 me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
298 func (me *peersForPieceSorter) Less(_i, _j int) bool {
299 i := me.peersForPiece[_i]
300 j := me.peersForPiece[_j]
303 byHasRequest := func() multiless.Computation {
304 ml := multiless.New()
306 iHas := i.nextState.Requests.Contains(*req)
307 jHas := j.nextState.Requests.Contains(*req)
308 ml = ml.Bool(jHas, iHas)
312 ml := multiless.New()
313 // We always "reallocate", that is force even striping amongst peers that are either on
314 // the last piece they can contribute too, or for pieces marked for this behaviour.
315 // Striping prevents starving peers of requests, and will always re-balance to the
316 // fastest known peers.
317 if !p.alwaysReallocate {
319 j.requestablePiecesRemaining == 1,
320 i.requestablePiecesRemaining == 1)
322 if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
327 ml = ml.AndThen(byHasRequest)
330 i.requestablePiecesRemaining,
331 j.requestablePiecesRemaining,
339 ml = ml.AndThen(byHasRequest)
341 int64(j.Age), int64(i.Age),
342 // TODO: Probably peer priority can come next
349 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
350 peersForPiece := makePeersForPiece(len(peers))
351 for _, peer := range peers {
352 if !peer.canRequestPiece(p.index) {
355 if !peer.canFitRequest() {
356 peer.requestablePiecesRemaining--
359 peersForPiece = append(peersForPiece, &peersForPieceRequests{
365 for _, peer := range peersForPiece {
366 peer.requestablePiecesRemaining--
368 peersForPiecesPool.Put(peersForPiece)
370 peersForPieceSorter := peersForPieceSorter{
371 peersForPiece: peersForPiece,
374 sortPeersForPiece := func(req *RequestIndex) {
375 peersForPieceSorter.req = req
376 sort.Sort(&peersForPieceSorter)
377 // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
379 // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
380 // with "next" request state before another request strategy run occurs.
381 preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
382 p.IterPendingChunks(func(spec ChunkIndex) {
383 req := p.chunkIndexToRequestIndex(spec)
384 for _, peer := range peersForPiece {
385 if !peer.ExistingRequests.Contains(req) {
388 if !peer.canFitRequest() {
391 preallocated[spec] = append(preallocated[spec], peer)
392 peer.addNextRequest(req)
395 pendingChunksRemaining := int(p.NumPendingChunks)
396 p.IterPendingChunks(func(chunk ChunkIndex) {
397 if len(preallocated[chunk]) != 0 {
400 req := p.chunkIndexToRequestIndex(chunk)
401 defer func() { pendingChunksRemaining-- }()
402 sortPeersForPiece(nil)
403 for _, peer := range peersForPiece {
404 if !peer.canFitRequest() {
407 if !peer.PieceAllowedFast.ContainsInt(p.index) {
408 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
409 peer.nextState.Interested = true
414 peer.addNextRequest(req)
419 for chunk, prePeers := range preallocated {
420 if len(prePeers) == 0 {
423 pendingChunksRemaining--
424 req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
425 for _, pp := range prePeers {
428 sortPeersForPiece(&req)
429 for _, pp := range prePeers {
430 pp.nextState.Requests.Remove(req)
432 for _, peer := range peersForPiece {
433 if !peer.canFitRequest() {
436 if !peer.PieceAllowedFast.ContainsInt(p.index) {
437 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
438 peer.nextState.Interested = true
443 peer.addNextRequest(req)
447 if pendingChunksRemaining != 0 {
448 panic(pendingChunksRemaining)