]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy/order.go
Cache piece request orderings
[btrtrc.git] / request-strategy / order.go
1 package request_strategy
2
3 import (
4         "bytes"
5         "expvar"
6         "runtime"
7         "sort"
8         "sync"
9
10         "github.com/anacrolix/multiless"
11         "github.com/anacrolix/torrent/metainfo"
12
13         "github.com/anacrolix/torrent/types"
14 )
15
16 type (
17         RequestIndex  = uint32
18         ChunkIndex    = uint32
19         Request       = types.Request
20         pieceIndex    = types.PieceIndex
21         piecePriority = types.PiecePriority
22         // This can be made into a type-param later, will be great for testing.
23         ChunkSpec = types.ChunkSpec
24 )
25
26 type ClientPieceOrder struct{}
27
28 func equalFilterPieces(l, r []filterPiece) bool {
29         if len(l) != len(r) {
30                 return false
31         }
32         for i := range l {
33                 lp := &l[i]
34                 rp := &r[i]
35                 if lp.Priority != rp.Priority ||
36                         lp.Partial != rp.Partial ||
37                         lp.Availability != rp.Availability ||
38                         lp.index != rp.index ||
39                         lp.t.InfoHash != rp.t.InfoHash {
40                         return false
41                 }
42         }
43         return true
44 }
45
46 func sortFilterPieces(pieces []filterPiece, indices []int) {
47         sort.Slice(indices, func(_i, _j int) bool {
48                 i := &pieces[_i]
49                 j := &pieces[_j]
50                 return multiless.New().Int(
51                         int(j.Priority), int(i.Priority),
52                 ).Bool(
53                         j.Partial, i.Partial,
54                 ).Int64(
55                         i.Availability, j.Availability,
56                 ).Int(
57                         i.index, j.index,
58                 ).Lazy(func() multiless.Computation {
59                         return multiless.New().Cmp(bytes.Compare(
60                                 i.t.InfoHash[:],
61                                 j.t.InfoHash[:],
62                         ))
63                 }).MustLess()
64         })
65 }
66
67 type requestsPeer struct {
68         Peer
69         nextState                  PeerNextRequestState
70         requestablePiecesRemaining int
71 }
72
73 func (rp *requestsPeer) canFitRequest() bool {
74         return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
75 }
76
77 func (rp *requestsPeer) addNextRequest(r RequestIndex) {
78         if !rp.nextState.Requests.CheckedAdd(r) {
79                 panic("should only add once")
80         }
81 }
82
83 type peersForPieceRequests struct {
84         requestsInPiece int
85         *requestsPeer
86 }
87
88 func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
89         me.requestsPeer.addNextRequest(r)
90         me.requestsInPiece++
91 }
92
93 type requestablePiece struct {
94         index             pieceIndex
95         t                 *Torrent
96         alwaysReallocate  bool
97         NumPendingChunks  int
98         IterPendingChunks ChunksIterFunc
99 }
100
101 func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
102         return p.t.ChunksPerPiece*uint32(p.index) + c
103 }
104
105 type filterPiece struct {
106         t     *Torrent
107         index pieceIndex
108         *Piece
109 }
110
111 var (
112         sortsMu sync.Mutex
113         sorts   = map[*[]filterPiece][]int{}
114 )
115
116 func reorderedFilterPieces(pieces []filterPiece, indices []int) (ret []filterPiece) {
117         ret = make([]filterPiece, len(indices))
118         for i, j := range indices {
119                 ret[i] = pieces[j]
120         }
121         return
122 }
123
124 var packageExpvarMap = expvar.NewMap("request-strategy")
125
126 func getSortedFilterPieces(unsorted []filterPiece) []filterPiece {
127         sortsMu.Lock()
128         defer sortsMu.Unlock()
129         for key, order := range sorts {
130                 if equalFilterPieces(*key, unsorted) {
131                         packageExpvarMap.Add("reused filter piece ordering", 1)
132                         return reorderedFilterPieces(unsorted, order)
133                 }
134         }
135         sorted := append(make([]filterPiece, 0, len(unsorted)), unsorted...)
136         indices := make([]int, len(sorted))
137         for i := 0; i < len(indices); i++ {
138                 indices[i] = i
139         }
140         sortFilterPieces(sorted, indices)
141         packageExpvarMap.Add("added filter piece ordering", 1)
142         sorts[&unsorted] = indices
143         runtime.SetFinalizer(&pieceOrderingFinalizer{unsorted: &unsorted}, func(me *pieceOrderingFinalizer) {
144                 packageExpvarMap.Add("finalized filter piece ordering", 1)
145                 sortsMu.Lock()
146                 defer sortsMu.Unlock()
147                 delete(sorts, me.unsorted)
148         })
149         return reorderedFilterPieces(unsorted, indices)
150 }
151
152 type pieceOrderingFinalizer struct {
153         unsorted *[]filterPiece
154 }
155
156 // Calls f with requestable pieces in order.
157 func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
158         maxPieces := 0
159         for i := range input.Torrents {
160                 maxPieces += len(input.Torrents[i].Pieces)
161         }
162         pieces := make([]filterPiece, 0, maxPieces)
163         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
164         // TorrentImpl. A nil value means no capacity limit.
165         var storageLeft *int64
166         if input.Capacity != nil {
167                 storageLeft = new(int64)
168                 *storageLeft = *input.Capacity
169         }
170         for _t := range input.Torrents {
171                 // TODO: We could do metainfo requests here.
172                 t := &input.Torrents[_t]
173                 for i := range t.Pieces {
174                         pieces = append(pieces, filterPiece{
175                                 t:     &input.Torrents[_t],
176                                 index: i,
177                                 Piece: &t.Pieces[i],
178                         })
179                 }
180         }
181         pieces = getSortedFilterPieces(pieces)
182         var allTorrentsUnverifiedBytes int64
183         torrentUnverifiedBytes := map[metainfo.Hash]int64{}
184         for _, piece := range pieces {
185                 if left := storageLeft; left != nil {
186                         if *left < piece.Length {
187                                 continue
188                         }
189                         *left -= piece.Length
190                 }
191                 if !piece.Request || piece.NumPendingChunks == 0 {
192                         // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
193                         // considered unverified and hold up further requests.
194                         continue
195                 }
196                 if piece.t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[piece.t.InfoHash]+piece.Length > piece.t.MaxUnverifiedBytes {
197                         continue
198                 }
199                 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
200                         continue
201                 }
202                 torrentUnverifiedBytes[piece.t.InfoHash] += piece.Length
203                 allTorrentsUnverifiedBytes += piece.Length
204                 f(piece.t, piece.Piece, piece.index)
205         }
206         return
207 }
208
209 type Input struct {
210         // This is all torrents that share the same capacity below (or likely a single torrent if there
211         // is infinite capacity, since you could just run it separately for each Torrent if that's the
212         // case).
213         Torrents []Torrent
214         // Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents
215         // that share the same capacity key must be incorporated in piece ordering.
216         Capacity *int64
217         // Across all the Torrents. This might be partitioned by storage capacity key now.
218         MaxUnverifiedBytes int64
219 }
220
221 // Checks that a sorted peersForPiece slice makes sense.
222 func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
223         if !sort.IsSorted(peers) {
224                 panic("not sorted")
225         }
226         peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
227         for _, p := range peers.peersForPiece {
228                 if _, ok := peerMap[p]; ok {
229                         panic(p)
230                 }
231                 peerMap[p] = struct{}{}
232         }
233 }
234
235 var peersForPiecesPool sync.Pool
236
237 func makePeersForPiece(cap int) []*peersForPieceRequests {
238         got := peersForPiecesPool.Get()
239         if got == nil {
240                 return make([]*peersForPieceRequests, 0, cap)
241         }
242         return got.([]*peersForPieceRequests)[:0]
243 }
244
245 type peersForPieceSorter struct {
246         peersForPiece []*peersForPieceRequests
247         req           *RequestIndex
248         p             requestablePiece
249 }
250
251 func (me *peersForPieceSorter) Len() int {
252         return len(me.peersForPiece)
253 }
254
255 func (me *peersForPieceSorter) Swap(i, j int) {
256         me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
257 }
258
259 func (me *peersForPieceSorter) Less(_i, _j int) bool {
260         i := me.peersForPiece[_i]
261         j := me.peersForPiece[_j]
262         req := me.req
263         p := &me.p
264         byHasRequest := func() multiless.Computation {
265                 ml := multiless.New()
266                 if req != nil {
267                         iHas := i.nextState.Requests.Contains(*req)
268                         jHas := j.nextState.Requests.Contains(*req)
269                         ml = ml.Bool(jHas, iHas)
270                 }
271                 return ml
272         }()
273         ml := multiless.New()
274         // We always "reallocate", that is force even striping amongst peers that are either on
275         // the last piece they can contribute too, or for pieces marked for this behaviour.
276         // Striping prevents starving peers of requests, and will always re-balance to the
277         // fastest known peers.
278         if !p.alwaysReallocate {
279                 ml = ml.Bool(
280                         j.requestablePiecesRemaining == 1,
281                         i.requestablePiecesRemaining == 1)
282         }
283         if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
284                 ml = ml.Int(
285                         i.requestsInPiece,
286                         j.requestsInPiece)
287         } else {
288                 ml = ml.AndThen(byHasRequest)
289         }
290         ml = ml.Int(
291                 i.requestablePiecesRemaining,
292                 j.requestablePiecesRemaining,
293         ).Float64(
294                 j.DownloadRate,
295                 i.DownloadRate,
296         )
297         if ml.Ok() {
298                 return ml.Less()
299         }
300         ml = ml.AndThen(byHasRequest)
301         return ml.Int64(
302                 int64(j.Age), int64(i.Age),
303                 // TODO: Probably peer priority can come next
304         ).Uintptr(
305                 i.Id.Uintptr(),
306                 j.Id.Uintptr(),
307         ).MustLess()
308 }
309
310 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
311         peersForPiece := makePeersForPiece(len(peers))
312         for _, peer := range peers {
313                 if !peer.canRequestPiece(p.index) {
314                         continue
315                 }
316                 if !peer.canFitRequest() {
317                         peer.requestablePiecesRemaining--
318                         continue
319                 }
320                 peersForPiece = append(peersForPiece, &peersForPieceRequests{
321                         requestsInPiece: 0,
322                         requestsPeer:    peer,
323                 })
324         }
325         defer func() {
326                 for _, peer := range peersForPiece {
327                         peer.requestablePiecesRemaining--
328                 }
329                 peersForPiecesPool.Put(peersForPiece)
330         }()
331         peersForPieceSorter := peersForPieceSorter{
332                 peersForPiece: peersForPiece,
333                 p:             p,
334         }
335         sortPeersForPiece := func(req *RequestIndex) {
336                 peersForPieceSorter.req = req
337                 sort.Sort(&peersForPieceSorter)
338                 // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
339         }
340         // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
341         // with "next" request state before another request strategy run occurs.
342         preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
343         p.IterPendingChunks(func(spec ChunkIndex) {
344                 req := p.chunkIndexToRequestIndex(spec)
345                 for _, peer := range peersForPiece {
346                         if !peer.ExistingRequests.Contains(req) {
347                                 continue
348                         }
349                         if !peer.canFitRequest() {
350                                 continue
351                         }
352                         preallocated[spec] = append(preallocated[spec], peer)
353                         peer.addNextRequest(req)
354                 }
355         })
356         pendingChunksRemaining := int(p.NumPendingChunks)
357         p.IterPendingChunks(func(chunk ChunkIndex) {
358                 if len(preallocated[chunk]) != 0 {
359                         return
360                 }
361                 req := p.chunkIndexToRequestIndex(chunk)
362                 defer func() { pendingChunksRemaining-- }()
363                 sortPeersForPiece(nil)
364                 for _, peer := range peersForPiece {
365                         if !peer.canFitRequest() {
366                                 continue
367                         }
368                         if !peer.PieceAllowedFast.ContainsInt(p.index) {
369                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
370                                 peer.nextState.Interested = true
371                                 if peer.Choking {
372                                         continue
373                                 }
374                         }
375                         peer.addNextRequest(req)
376                         break
377                 }
378         })
379 chunk:
380         for chunk, prePeers := range preallocated {
381                 if len(prePeers) == 0 {
382                         continue
383                 }
384                 pendingChunksRemaining--
385                 req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
386                 for _, pp := range prePeers {
387                         pp.requestsInPiece--
388                 }
389                 sortPeersForPiece(&req)
390                 for _, pp := range prePeers {
391                         pp.nextState.Requests.Remove(req)
392                 }
393                 for _, peer := range peersForPiece {
394                         if !peer.canFitRequest() {
395                                 continue
396                         }
397                         if !peer.PieceAllowedFast.ContainsInt(p.index) {
398                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
399                                 peer.nextState.Interested = true
400                                 if peer.Choking {
401                                         continue
402                                 }
403                         }
404                         peer.addNextRequest(req)
405                         continue chunk
406                 }
407         }
408         if pendingChunksRemaining != 0 {
409                 panic(pendingChunksRemaining)
410         }
411 }