]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy/order.go
Provide mapping from infohash to Torrent in Input
[btrtrc.git] / request-strategy / order.go
1 package request_strategy
2
3 import (
4         "bytes"
5         "expvar"
6         "runtime"
7         "sort"
8         "sync"
9
10         "github.com/anacrolix/multiless"
11         "github.com/anacrolix/torrent/metainfo"
12         "github.com/google/btree"
13
14         "github.com/anacrolix/torrent/types"
15 )
16
17 type (
18         RequestIndex  = uint32
19         ChunkIndex    = uint32
20         Request       = types.Request
21         pieceIndex    = types.PieceIndex
22         piecePriority = types.PiecePriority
23         // This can be made into a type-param later, will be great for testing.
24         ChunkSpec = types.ChunkSpec
25 )
26
27 type ClientPieceOrder struct{}
28
29 func equalFilterPieces(l, r []filterPiece) bool {
30         if len(l) != len(r) {
31                 return false
32         }
33         for i := range l {
34                 lp := &l[i]
35                 rp := &r[i]
36                 if lp.Priority != rp.Priority ||
37                         lp.Partial != rp.Partial ||
38                         lp.Availability != rp.Availability ||
39                         lp.index != rp.index ||
40                         lp.t.InfoHash != rp.t.InfoHash {
41                         return false
42                 }
43         }
44         return true
45 }
46
47 type pieceSorter struct {
48         swap func(i, j int)
49         get  func(i int) *filterPiece
50         len  int
51 }
52
53 func (me pieceSorter) Len() int {
54         return me.len
55 }
56
57 func (me pieceSorter) Swap(i, j int) {
58         me.swap(i, j)
59 }
60
61 func (me pieceSorter) Less(_i, _j int) bool {
62         i := me.get(_i)
63         j := me.get(_j)
64         return pieceOrderLess(i.toPieceOrderInput(), j.toPieceOrderInput()).MustLess()
65 }
66
67 type pieceOrderInput struct {
68         PieceRequestOrderState
69         PieceRequestOrderKey
70 }
71
72 func pieceOrderLess(i, j pieceOrderInput) multiless.Computation {
73         return multiless.New().Int(
74                 int(j.Priority), int(i.Priority),
75         ).Bool(
76                 j.Partial, i.Partial,
77         ).Int64(
78                 i.Availability, j.Availability,
79         ).Int(
80                 i.Index, j.Index,
81         ).Lazy(func() multiless.Computation {
82                 return multiless.New().Cmp(bytes.Compare(
83                         i.InfoHash[:],
84                         j.InfoHash[:],
85                 ))
86         })
87 }
88
89 type requestsPeer struct {
90         Peer
91         nextState                  PeerNextRequestState
92         requestablePiecesRemaining int
93 }
94
95 func (rp *requestsPeer) canFitRequest() bool {
96         return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
97 }
98
99 func (rp *requestsPeer) addNextRequest(r RequestIndex) {
100         if !rp.nextState.Requests.CheckedAdd(r) {
101                 panic("should only add once")
102         }
103 }
104
105 type peersForPieceRequests struct {
106         requestsInPiece int
107         *requestsPeer
108 }
109
110 func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
111         me.requestsPeer.addNextRequest(r)
112         me.requestsInPiece++
113 }
114
115 type requestablePiece struct {
116         index             pieceIndex
117         t                 *Torrent
118         alwaysReallocate  bool
119         NumPendingChunks  int
120         IterPendingChunks ChunksIterFunc
121 }
122
123 func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
124         return p.t.ChunksPerPiece*uint32(p.index) + c
125 }
126
127 type filterPiece struct {
128         t     *Torrent
129         index pieceIndex
130         *Piece
131 }
132
133 func (fp *filterPiece) toPieceOrderInput() (ret pieceOrderInput) {
134         ret.Partial = fp.Partial
135         ret.InfoHash = fp.t.InfoHash
136         ret.Availability = fp.Availability
137         ret.Priority = fp.Priority
138         ret.Index = fp.index
139         return
140 }
141
142 var (
143         sortsMu sync.Mutex
144         sorts   = map[*[]filterPiece][]int{}
145 )
146
147 func reorderedFilterPieces(pieces []filterPiece, indices []int) (ret []filterPiece) {
148         ret = make([]filterPiece, len(indices))
149         for i, j := range indices {
150                 ret[i] = pieces[j]
151         }
152         return
153 }
154
155 var packageExpvarMap = expvar.NewMap("request-strategy")
156
157 func getSortedFilterPieces(unsorted []filterPiece) []filterPiece {
158         const cachePieceSorts = false
159         if !cachePieceSorts {
160                 sort.Sort(pieceSorter{
161                         len: len(unsorted),
162                         swap: func(i, j int) {
163                                 unsorted[i], unsorted[j] = unsorted[j], unsorted[i]
164                         },
165                         get: func(i int) *filterPiece {
166                                 return &unsorted[i]
167                         },
168                 })
169                 return unsorted
170         }
171         sortsMu.Lock()
172         defer sortsMu.Unlock()
173         for key, order := range sorts {
174                 if equalFilterPieces(*key, unsorted) {
175                         packageExpvarMap.Add("reused filter piece ordering", 1)
176                         return reorderedFilterPieces(unsorted, order)
177                 }
178         }
179         indices := make([]int, len(unsorted))
180         for i := 0; i < len(indices); i++ {
181                 indices[i] = i
182         }
183         sort.Sort(pieceSorter{
184                 len: len(unsorted),
185                 swap: func(i, j int) {
186                         indices[i], indices[j] = indices[j], indices[i]
187                 },
188                 get: func(i int) *filterPiece {
189                         return &unsorted[indices[i]]
190                 },
191         })
192         packageExpvarMap.Add("added filter piece ordering", 1)
193         sorts[&unsorted] = indices
194         runtime.SetFinalizer(&pieceOrderingFinalizer{unsorted: &unsorted}, func(me *pieceOrderingFinalizer) {
195                 packageExpvarMap.Add("finalized filter piece ordering", 1)
196                 sortsMu.Lock()
197                 defer sortsMu.Unlock()
198                 delete(sorts, me.unsorted)
199         })
200         return reorderedFilterPieces(unsorted, indices)
201 }
202
203 type pieceOrderingFinalizer struct {
204         unsorted *[]filterPiece
205 }
206
207 // Calls f with requestable pieces in order.
208 func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(t *Torrent, p *Piece, pieceIndex int)) {
209         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
210         // TorrentImpl. A nil value means no capacity limit.
211         var storageLeft *int64
212         if input.Capacity != nil {
213                 storageLeft = new(int64)
214                 *storageLeft = *input.Capacity
215         }
216         var allTorrentsUnverifiedBytes int64
217         torrentUnverifiedBytes := map[metainfo.Hash]int64{}
218         pro.tree.Ascend(func(i btree.Item) bool {
219                 _i := i.(pieceRequestOrderItem)
220                 var t Torrent = input.Torrents[_i.key.InfoHash]
221                 var piece *Piece = &t.Pieces[_i.key.Index]
222                 if left := storageLeft; left != nil {
223                         if *left < piece.Length {
224                                 return true
225                         }
226                         *left -= piece.Length
227                 }
228                 if !piece.Request || piece.NumPendingChunks == 0 {
229                         // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
230                         // considered unverified and hold up further requests.
231
232                         return true
233                 }
234                 if t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[t.InfoHash]+piece.Length > t.MaxUnverifiedBytes {
235                         return true
236                 }
237                 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
238                         return true
239                 }
240                 torrentUnverifiedBytes[t.InfoHash] += piece.Length
241                 allTorrentsUnverifiedBytes += piece.Length
242                 f(&t, piece, _i.key.Index)
243                 return true
244         })
245         return
246 }
247
248 type Input struct {
249         // This is all torrents that share the same capacity below (or likely a single torrent if there
250         // is infinite capacity, since you could just run it separately for each Torrent if that's the
251         // case).
252         Torrents map[metainfo.Hash]Torrent
253         // Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents
254         // that share the same capacity key must be incorporated in piece ordering.
255         Capacity *int64
256         // Across all the Torrents. This might be partitioned by storage capacity key now.
257         MaxUnverifiedBytes int64
258 }
259
260 // Checks that a sorted peersForPiece slice makes sense.
261 func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
262         if !sort.IsSorted(peers) {
263                 panic("not sorted")
264         }
265         peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
266         for _, p := range peers.peersForPiece {
267                 if _, ok := peerMap[p]; ok {
268                         panic(p)
269                 }
270                 peerMap[p] = struct{}{}
271         }
272 }
273
274 var peersForPiecesPool sync.Pool
275
276 func makePeersForPiece(cap int) []*peersForPieceRequests {
277         got := peersForPiecesPool.Get()
278         if got == nil {
279                 return make([]*peersForPieceRequests, 0, cap)
280         }
281         return got.([]*peersForPieceRequests)[:0]
282 }
283
284 type peersForPieceSorter struct {
285         peersForPiece []*peersForPieceRequests
286         req           *RequestIndex
287         p             requestablePiece
288 }
289
290 func (me *peersForPieceSorter) Len() int {
291         return len(me.peersForPiece)
292 }
293
294 func (me *peersForPieceSorter) Swap(i, j int) {
295         me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
296 }
297
298 func (me *peersForPieceSorter) Less(_i, _j int) bool {
299         i := me.peersForPiece[_i]
300         j := me.peersForPiece[_j]
301         req := me.req
302         p := &me.p
303         byHasRequest := func() multiless.Computation {
304                 ml := multiless.New()
305                 if req != nil {
306                         iHas := i.nextState.Requests.Contains(*req)
307                         jHas := j.nextState.Requests.Contains(*req)
308                         ml = ml.Bool(jHas, iHas)
309                 }
310                 return ml
311         }()
312         ml := multiless.New()
313         // We always "reallocate", that is force even striping amongst peers that are either on
314         // the last piece they can contribute too, or for pieces marked for this behaviour.
315         // Striping prevents starving peers of requests, and will always re-balance to the
316         // fastest known peers.
317         if !p.alwaysReallocate {
318                 ml = ml.Bool(
319                         j.requestablePiecesRemaining == 1,
320                         i.requestablePiecesRemaining == 1)
321         }
322         if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
323                 ml = ml.Int(
324                         i.requestsInPiece,
325                         j.requestsInPiece)
326         } else {
327                 ml = ml.AndThen(byHasRequest)
328         }
329         ml = ml.Int(
330                 i.requestablePiecesRemaining,
331                 j.requestablePiecesRemaining,
332         ).Float64(
333                 j.DownloadRate,
334                 i.DownloadRate,
335         )
336         if ml.Ok() {
337                 return ml.Less()
338         }
339         ml = ml.AndThen(byHasRequest)
340         return ml.Int64(
341                 int64(j.Age), int64(i.Age),
342                 // TODO: Probably peer priority can come next
343         ).Uintptr(
344                 i.Id.Uintptr(),
345                 j.Id.Uintptr(),
346         ).MustLess()
347 }
348
349 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
350         peersForPiece := makePeersForPiece(len(peers))
351         for _, peer := range peers {
352                 if !peer.canRequestPiece(p.index) {
353                         continue
354                 }
355                 if !peer.canFitRequest() {
356                         peer.requestablePiecesRemaining--
357                         continue
358                 }
359                 peersForPiece = append(peersForPiece, &peersForPieceRequests{
360                         requestsInPiece: 0,
361                         requestsPeer:    peer,
362                 })
363         }
364         defer func() {
365                 for _, peer := range peersForPiece {
366                         peer.requestablePiecesRemaining--
367                 }
368                 peersForPiecesPool.Put(peersForPiece)
369         }()
370         peersForPieceSorter := peersForPieceSorter{
371                 peersForPiece: peersForPiece,
372                 p:             p,
373         }
374         sortPeersForPiece := func(req *RequestIndex) {
375                 peersForPieceSorter.req = req
376                 sort.Sort(&peersForPieceSorter)
377                 // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
378         }
379         // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
380         // with "next" request state before another request strategy run occurs.
381         preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
382         p.IterPendingChunks(func(spec ChunkIndex) {
383                 req := p.chunkIndexToRequestIndex(spec)
384                 for _, peer := range peersForPiece {
385                         if !peer.ExistingRequests.Contains(req) {
386                                 continue
387                         }
388                         if !peer.canFitRequest() {
389                                 continue
390                         }
391                         preallocated[spec] = append(preallocated[spec], peer)
392                         peer.addNextRequest(req)
393                 }
394         })
395         pendingChunksRemaining := int(p.NumPendingChunks)
396         p.IterPendingChunks(func(chunk ChunkIndex) {
397                 if len(preallocated[chunk]) != 0 {
398                         return
399                 }
400                 req := p.chunkIndexToRequestIndex(chunk)
401                 defer func() { pendingChunksRemaining-- }()
402                 sortPeersForPiece(nil)
403                 for _, peer := range peersForPiece {
404                         if !peer.canFitRequest() {
405                                 continue
406                         }
407                         if !peer.PieceAllowedFast.ContainsInt(p.index) {
408                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
409                                 peer.nextState.Interested = true
410                                 if peer.Choking {
411                                         continue
412                                 }
413                         }
414                         peer.addNextRequest(req)
415                         break
416                 }
417         })
418 chunk:
419         for chunk, prePeers := range preallocated {
420                 if len(prePeers) == 0 {
421                         continue
422                 }
423                 pendingChunksRemaining--
424                 req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
425                 for _, pp := range prePeers {
426                         pp.requestsInPiece--
427                 }
428                 sortPeersForPiece(&req)
429                 for _, pp := range prePeers {
430                         pp.nextState.Requests.Remove(req)
431                 }
432                 for _, peer := range peersForPiece {
433                         if !peer.canFitRequest() {
434                                 continue
435                         }
436                         if !peer.PieceAllowedFast.ContainsInt(p.index) {
437                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
438                                 peer.nextState.Interested = true
439                                 if peer.Choking {
440                                         continue
441                                 }
442                         }
443                         peer.addNextRequest(req)
444                         continue chunk
445                 }
446         }
447         if pendingChunksRemaining != 0 {
448                 panic(pendingChunksRemaining)
449         }
450 }