1 package request_strategy
10 "github.com/anacrolix/multiless"
11 "github.com/anacrolix/torrent/metainfo"
13 "github.com/anacrolix/torrent/types"
19 Request = types.Request
20 pieceIndex = types.PieceIndex
21 piecePriority = types.PiecePriority
22 // This can be made into a type-param later, will be great for testing.
23 ChunkSpec = types.ChunkSpec
26 type ClientPieceOrder struct{}
28 func equalFilterPieces(l, r []filterPiece) bool {
35 if lp.Priority != rp.Priority ||
36 lp.Partial != rp.Partial ||
37 lp.Availability != rp.Availability ||
38 lp.index != rp.index ||
39 lp.t.InfoHash != rp.t.InfoHash {
46 func sortFilterPieces(pieces []filterPiece, indices []int) {
47 sort.Slice(indices, func(_i, _j int) bool {
48 i := &pieces[indices[_i]]
49 j := &pieces[indices[_j]]
50 return multiless.New().Int(
51 int(j.Priority), int(i.Priority),
55 i.Availability, j.Availability,
58 ).Lazy(func() multiless.Computation {
59 return multiless.New().Cmp(bytes.Compare(
67 type requestsPeer struct {
69 nextState PeerNextRequestState
70 requestablePiecesRemaining int
73 func (rp *requestsPeer) canFitRequest() bool {
74 return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
77 func (rp *requestsPeer) addNextRequest(r RequestIndex) {
78 if !rp.nextState.Requests.CheckedAdd(r) {
79 panic("should only add once")
83 type peersForPieceRequests struct {
88 func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
89 me.requestsPeer.addNextRequest(r)
93 type requestablePiece struct {
98 IterPendingChunks ChunksIterFunc
101 func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
102 return p.t.ChunksPerPiece*uint32(p.index) + c
105 type filterPiece struct {
113 sorts = map[*[]filterPiece][]int{}
116 func reorderedFilterPieces(pieces []filterPiece, indices []int) (ret []filterPiece) {
117 ret = make([]filterPiece, len(indices))
118 for i, j := range indices {
124 var packageExpvarMap = expvar.NewMap("request-strategy")
126 func getSortedFilterPieces(unsorted []filterPiece) []filterPiece {
128 defer sortsMu.Unlock()
129 for key, order := range sorts {
130 if equalFilterPieces(*key, unsorted) {
131 packageExpvarMap.Add("reused filter piece ordering", 1)
132 return reorderedFilterPieces(unsorted, order)
135 sorted := append(make([]filterPiece, 0, len(unsorted)), unsorted...)
136 indices := make([]int, len(sorted))
137 for i := 0; i < len(indices); i++ {
140 sortFilterPieces(sorted, indices)
141 packageExpvarMap.Add("added filter piece ordering", 1)
142 sorts[&unsorted] = indices
143 runtime.SetFinalizer(&pieceOrderingFinalizer{unsorted: &unsorted}, func(me *pieceOrderingFinalizer) {
144 packageExpvarMap.Add("finalized filter piece ordering", 1)
146 defer sortsMu.Unlock()
147 delete(sorts, me.unsorted)
149 return reorderedFilterPieces(unsorted, indices)
152 type pieceOrderingFinalizer struct {
153 unsorted *[]filterPiece
156 // Calls f with requestable pieces in order.
157 func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
159 for i := range input.Torrents {
160 maxPieces += len(input.Torrents[i].Pieces)
162 pieces := make([]filterPiece, 0, maxPieces)
163 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
164 // TorrentImpl. A nil value means no capacity limit.
165 var storageLeft *int64
166 if input.Capacity != nil {
167 storageLeft = new(int64)
168 *storageLeft = *input.Capacity
170 for _t := range input.Torrents {
171 // TODO: We could do metainfo requests here.
172 t := &input.Torrents[_t]
173 for i := range t.Pieces {
174 pieces = append(pieces, filterPiece{
175 t: &input.Torrents[_t],
181 pieces = getSortedFilterPieces(pieces)
182 var allTorrentsUnverifiedBytes int64
183 torrentUnverifiedBytes := map[metainfo.Hash]int64{}
184 for _, piece := range pieces {
185 if left := storageLeft; left != nil {
186 if *left < piece.Length {
189 *left -= piece.Length
191 if !piece.Request || piece.NumPendingChunks == 0 {
192 // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
193 // considered unverified and hold up further requests.
196 if piece.t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[piece.t.InfoHash]+piece.Length > piece.t.MaxUnverifiedBytes {
199 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
202 torrentUnverifiedBytes[piece.t.InfoHash] += piece.Length
203 allTorrentsUnverifiedBytes += piece.Length
204 f(piece.t, piece.Piece, piece.index)
210 // This is all torrents that share the same capacity below (or likely a single torrent if there
211 // is infinite capacity, since you could just run it separately for each Torrent if that's the
214 // Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents
215 // that share the same capacity key must be incorporated in piece ordering.
217 // Across all the Torrents. This might be partitioned by storage capacity key now.
218 MaxUnverifiedBytes int64
221 // Checks that a sorted peersForPiece slice makes sense.
222 func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
223 if !sort.IsSorted(peers) {
226 peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
227 for _, p := range peers.peersForPiece {
228 if _, ok := peerMap[p]; ok {
231 peerMap[p] = struct{}{}
235 var peersForPiecesPool sync.Pool
237 func makePeersForPiece(cap int) []*peersForPieceRequests {
238 got := peersForPiecesPool.Get()
240 return make([]*peersForPieceRequests, 0, cap)
242 return got.([]*peersForPieceRequests)[:0]
245 type peersForPieceSorter struct {
246 peersForPiece []*peersForPieceRequests
251 func (me *peersForPieceSorter) Len() int {
252 return len(me.peersForPiece)
255 func (me *peersForPieceSorter) Swap(i, j int) {
256 me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
259 func (me *peersForPieceSorter) Less(_i, _j int) bool {
260 i := me.peersForPiece[_i]
261 j := me.peersForPiece[_j]
264 byHasRequest := func() multiless.Computation {
265 ml := multiless.New()
267 iHas := i.nextState.Requests.Contains(*req)
268 jHas := j.nextState.Requests.Contains(*req)
269 ml = ml.Bool(jHas, iHas)
273 ml := multiless.New()
274 // We always "reallocate", that is force even striping amongst peers that are either on
275 // the last piece they can contribute too, or for pieces marked for this behaviour.
276 // Striping prevents starving peers of requests, and will always re-balance to the
277 // fastest known peers.
278 if !p.alwaysReallocate {
280 j.requestablePiecesRemaining == 1,
281 i.requestablePiecesRemaining == 1)
283 if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
288 ml = ml.AndThen(byHasRequest)
291 i.requestablePiecesRemaining,
292 j.requestablePiecesRemaining,
300 ml = ml.AndThen(byHasRequest)
302 int64(j.Age), int64(i.Age),
303 // TODO: Probably peer priority can come next
310 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
311 peersForPiece := makePeersForPiece(len(peers))
312 for _, peer := range peers {
313 if !peer.canRequestPiece(p.index) {
316 if !peer.canFitRequest() {
317 peer.requestablePiecesRemaining--
320 peersForPiece = append(peersForPiece, &peersForPieceRequests{
326 for _, peer := range peersForPiece {
327 peer.requestablePiecesRemaining--
329 peersForPiecesPool.Put(peersForPiece)
331 peersForPieceSorter := peersForPieceSorter{
332 peersForPiece: peersForPiece,
335 sortPeersForPiece := func(req *RequestIndex) {
336 peersForPieceSorter.req = req
337 sort.Sort(&peersForPieceSorter)
338 // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
340 // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
341 // with "next" request state before another request strategy run occurs.
342 preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
343 p.IterPendingChunks(func(spec ChunkIndex) {
344 req := p.chunkIndexToRequestIndex(spec)
345 for _, peer := range peersForPiece {
346 if !peer.ExistingRequests.Contains(req) {
349 if !peer.canFitRequest() {
352 preallocated[spec] = append(preallocated[spec], peer)
353 peer.addNextRequest(req)
356 pendingChunksRemaining := int(p.NumPendingChunks)
357 p.IterPendingChunks(func(chunk ChunkIndex) {
358 if len(preallocated[chunk]) != 0 {
361 req := p.chunkIndexToRequestIndex(chunk)
362 defer func() { pendingChunksRemaining-- }()
363 sortPeersForPiece(nil)
364 for _, peer := range peersForPiece {
365 if !peer.canFitRequest() {
368 if !peer.PieceAllowedFast.ContainsInt(p.index) {
369 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
370 peer.nextState.Interested = true
375 peer.addNextRequest(req)
380 for chunk, prePeers := range preallocated {
381 if len(prePeers) == 0 {
384 pendingChunksRemaining--
385 req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
386 for _, pp := range prePeers {
389 sortPeersForPiece(&req)
390 for _, pp := range prePeers {
391 pp.nextState.Requests.Remove(req)
393 for _, peer := range peersForPiece {
394 if !peer.canFitRequest() {
397 if !peer.PieceAllowedFast.ContainsInt(p.index) {
398 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
399 peer.nextState.Interested = true
404 peer.addNextRequest(req)
408 if pendingChunksRemaining != 0 {
409 panic(pendingChunksRemaining)