]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy/order.go
requesting: Filter peers that can't be allocated chunks
[btrtrc.git] / request-strategy / order.go
1 package request_strategy
2
3 import (
4         "fmt"
5         "sort"
6         "sync"
7
8         "github.com/anacrolix/multiless"
9         "github.com/anacrolix/torrent/storage"
10
11         pp "github.com/anacrolix/torrent/peer_protocol"
12         "github.com/anacrolix/torrent/types"
13 )
14
15 type (
16         Request       = types.Request
17         pieceIndex    = types.PieceIndex
18         piecePriority = types.PiecePriority
19         // This can be made into a type-param later, will be great for testing.
20         ChunkSpec = types.ChunkSpec
21 )
22
23 type ClientPieceOrder struct{}
24
25 type filterTorrent struct {
26         *Torrent
27         unverifiedBytes int64
28         // Potentially shared with other torrents.
29         storageLeft *int64
30 }
31
32 func sortFilterPieces(pieces []filterPiece) {
33         sort.Slice(pieces, func(_i, _j int) bool {
34                 i := &pieces[_i]
35                 j := &pieces[_j]
36                 return multiless.New().Int(
37                         int(j.Priority), int(i.Priority),
38                 ).Bool(
39                         j.Partial, i.Partial,
40                 ).Int64(
41                         i.Availability, j.Availability,
42                 ).Int(
43                         i.index, j.index,
44                 ).Uintptr(
45                         i.t.StableId, j.t.StableId,
46                 ).MustLess()
47         })
48 }
49
50 type requestsPeer struct {
51         Peer
52         nextState                  PeerNextRequestState
53         requestablePiecesRemaining int
54 }
55
56 func (rp *requestsPeer) canFitRequest() bool {
57         return len(rp.nextState.Requests) < rp.MaxRequests
58 }
59
60 func (rp *requestsPeer) addNextRequest(r Request) {
61         _, ok := rp.nextState.Requests[r]
62         if ok {
63                 panic("should only add once")
64         }
65         rp.nextState.Requests[r] = struct{}{}
66 }
67
68 type peersForPieceRequests struct {
69         requestsInPiece int
70         *requestsPeer
71 }
72
73 func (me *peersForPieceRequests) addNextRequest(r Request) {
74         me.requestsPeer.addNextRequest(r)
75         me.requestsInPiece++
76 }
77
78 type requestablePiece struct {
79         index             pieceIndex
80         t                 *Torrent
81         alwaysReallocate  bool
82         NumPendingChunks  int
83         IterPendingChunks ChunksIter
84 }
85
86 type filterPiece struct {
87         t     *filterTorrent
88         index pieceIndex
89         *Piece
90 }
91
92 func getRequestablePieces(input Input) (ret []requestablePiece) {
93         maxPieces := 0
94         for i := range input.Torrents {
95                 maxPieces += len(input.Torrents[i].Pieces)
96         }
97         pieces := make([]filterPiece, 0, maxPieces)
98         ret = make([]requestablePiece, 0, maxPieces)
99         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
100         // TorrentImpl. A nil value means no capacity limit.
101         storageLeft := make(map[storage.TorrentCapacity]*int64)
102         for _t := range input.Torrents {
103                 // TODO: We could do metainfo requests here.
104                 t := &filterTorrent{
105                         Torrent:         &input.Torrents[_t],
106                         unverifiedBytes: 0,
107                 }
108                 key := t.Capacity
109                 if key != nil {
110                         if _, ok := storageLeft[key]; !ok {
111                                 capacity, ok := (*key)()
112                                 if ok {
113                                         storageLeft[key] = &capacity
114                                 } else {
115                                         storageLeft[key] = nil
116                                 }
117                         }
118                         t.storageLeft = storageLeft[key]
119                 }
120                 for i := range t.Pieces {
121                         pieces = append(pieces, filterPiece{
122                                 t:     t,
123                                 index: i,
124                                 Piece: &t.Pieces[i],
125                         })
126                 }
127         }
128         sortFilterPieces(pieces)
129         var allTorrentsUnverifiedBytes int64
130         for _, piece := range pieces {
131                 if left := piece.t.storageLeft; left != nil {
132                         if *left < int64(piece.Length) {
133                                 continue
134                         }
135                         *left -= int64(piece.Length)
136                 }
137                 if !piece.Request || piece.NumPendingChunks == 0 {
138                         // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
139                         // considered unverified and hold up further requests.
140                         continue
141                 }
142                 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
143                         continue
144                 }
145                 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
146                         continue
147                 }
148                 piece.t.unverifiedBytes += piece.Length
149                 allTorrentsUnverifiedBytes += piece.Length
150                 ret = append(ret, requestablePiece{
151                         index:             piece.index,
152                         t:                 piece.t.Torrent,
153                         NumPendingChunks:  piece.NumPendingChunks,
154                         IterPendingChunks: piece.iterPendingChunksWrapper,
155                         alwaysReallocate:  piece.Priority >= types.PiecePriorityNext,
156                 })
157         }
158         return
159 }
160
161 type Input struct {
162         Torrents           []Torrent
163         MaxUnverifiedBytes int64
164 }
165
166 // TODO: We could do metainfo requests here.
167 func Run(input Input) map[PeerId]PeerNextRequestState {
168         requestPieces := getRequestablePieces(input)
169         torrents := input.Torrents
170         allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
171         for _, t := range torrents {
172                 peers := make([]*requestsPeer, 0, len(t.Peers))
173                 for _, p := range t.Peers {
174                         peers = append(peers, &requestsPeer{
175                                 Peer: p,
176                                 nextState: PeerNextRequestState{
177                                         Requests: make(map[Request]struct{}, p.MaxRequests),
178                                 },
179                         })
180                 }
181                 allPeers[t.StableId] = peers
182         }
183         for _, piece := range requestPieces {
184                 for _, peer := range allPeers[piece.t.StableId] {
185                         if peer.canRequestPiece(piece.index) {
186                                 peer.requestablePiecesRemaining++
187                         }
188                 }
189         }
190         for _, piece := range requestPieces {
191                 allocatePendingChunks(piece, allPeers[piece.t.StableId])
192         }
193         ret := make(map[PeerId]PeerNextRequestState)
194         for _, peers := range allPeers {
195                 for _, rp := range peers {
196                         if rp.requestablePiecesRemaining != 0 {
197                                 panic(rp.requestablePiecesRemaining)
198                         }
199                         if _, ok := ret[rp.Id]; ok {
200                                 panic(fmt.Sprintf("duplicate peer id: %v", rp.Id))
201                         }
202                         ret[rp.Id] = rp.nextState
203                 }
204         }
205         return ret
206 }
207
208 // Checks that a sorted peersForPiece slice makes sense.
209 func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
210         if !sort.IsSorted(peers) {
211                 panic("not sorted")
212         }
213         peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
214         for _, p := range peers.peersForPiece {
215                 if _, ok := peerMap[p]; ok {
216                         panic(p)
217                 }
218                 peerMap[p] = struct{}{}
219         }
220 }
221
222 var peersForPiecesPool sync.Pool
223
224 func makePeersForPiece(cap int) []*peersForPieceRequests {
225         got := peersForPiecesPool.Get()
226         if got == nil {
227                 return make([]*peersForPieceRequests, 0, cap)
228         }
229         return got.([]*peersForPieceRequests)[:0]
230 }
231
232 type peersForPieceSorter struct {
233         peersForPiece []*peersForPieceRequests
234         req           *Request
235         p             requestablePiece
236 }
237
238 func (me *peersForPieceSorter) Len() int {
239         return len(me.peersForPiece)
240 }
241
242 func (me *peersForPieceSorter) Swap(i, j int) {
243         me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
244 }
245
246 func (me *peersForPieceSorter) Less(_i, _j int) bool {
247         i := me.peersForPiece[_i]
248         j := me.peersForPiece[_j]
249         req := me.req
250         p := &me.p
251         byHasRequest := func() multiless.Computation {
252                 ml := multiless.New()
253                 if req != nil {
254                         _, iHas := i.nextState.Requests[*req]
255                         _, jHas := j.nextState.Requests[*req]
256                         ml = ml.Bool(jHas, iHas)
257                 }
258                 return ml
259         }()
260         ml := multiless.New()
261         // We always "reallocate", that is force even striping amongst peers that are either on
262         // the last piece they can contribute too, or for pieces marked for this behaviour.
263         // Striping prevents starving peers of requests, and will always re-balance to the
264         // fastest known peers.
265         if !p.alwaysReallocate {
266                 ml = ml.Bool(
267                         j.requestablePiecesRemaining == 1,
268                         i.requestablePiecesRemaining == 1)
269         }
270         if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
271                 ml = ml.Int(
272                         i.requestsInPiece,
273                         j.requestsInPiece)
274         } else {
275                 ml = ml.AndThen(byHasRequest)
276         }
277         ml = ml.Int(
278                 i.requestablePiecesRemaining,
279                 j.requestablePiecesRemaining,
280         ).Float64(
281                 j.DownloadRate,
282                 i.DownloadRate,
283         )
284         if ml.Ok() {
285                 return ml.Less()
286         }
287         ml = ml.AndThen(byHasRequest)
288         return ml.Int64(
289                 int64(j.Age), int64(i.Age),
290                 // TODO: Probably peer priority can come next
291         ).Uintptr(
292                 i.Id.Uintptr(),
293                 j.Id.Uintptr(),
294         ).MustLess()
295 }
296
297 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
298         peersForPiece := makePeersForPiece(len(peers))
299         for _, peer := range peers {
300                 if !peer.canRequestPiece(p.index) {
301                         continue
302                 }
303                 if !peer.canFitRequest() {
304                         peer.requestablePiecesRemaining--
305                         continue
306                 }
307                 peersForPiece = append(peersForPiece, &peersForPieceRequests{
308                         requestsInPiece: 0,
309                         requestsPeer:    peer,
310                 })
311         }
312         defer func() {
313                 for _, peer := range peersForPiece {
314                         if peer.canRequestPiece(p.index) {
315                                 peer.requestablePiecesRemaining--
316                         }
317                 }
318                 peersForPiecesPool.Put(peersForPiece)
319         }()
320         peersForPieceSorter := peersForPieceSorter{
321                 peersForPiece: peersForPiece,
322                 p:             p,
323         }
324         sortPeersForPiece := func(req *Request) {
325                 peersForPieceSorter.req = req
326                 sort.Sort(&peersForPieceSorter)
327                 //ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
328         }
329         // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
330         // with "next" request state before another request strategy run occurs.
331         preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks)
332         p.IterPendingChunks(func(spec ChunkSpec) {
333                 req := Request{pp.Integer(p.index), spec}
334                 for _, peer := range peersForPiece {
335                         if h := peer.HasExistingRequest; h == nil || !h(req) {
336                                 continue
337                         }
338                         if !peer.canFitRequest() {
339                                 continue
340                         }
341                         if !peer.canRequestPiece(p.index) {
342                                 continue
343                         }
344                         preallocated[spec] = append(preallocated[spec], peer)
345                         peer.addNextRequest(req)
346                 }
347         })
348         pendingChunksRemaining := int(p.NumPendingChunks)
349         p.IterPendingChunks(func(chunk types.ChunkSpec) {
350                 if _, ok := preallocated[chunk]; ok {
351                         return
352                 }
353                 req := Request{pp.Integer(p.index), chunk}
354                 defer func() { pendingChunksRemaining-- }()
355                 sortPeersForPiece(nil)
356                 for _, peer := range peersForPiece {
357                         if !peer.canFitRequest() {
358                                 continue
359                         }
360                         if !peer.HasPiece(p.index) {
361                                 continue
362                         }
363                         if !peer.pieceAllowedFastOrDefault(p.index) {
364                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
365                                 peer.nextState.Interested = true
366                                 if peer.Choking {
367                                         continue
368                                 }
369                         }
370                         peer.addNextRequest(req)
371                         break
372                 }
373         })
374 chunk:
375         for chunk, prePeers := range preallocated {
376                 pendingChunksRemaining--
377                 req := Request{pp.Integer(p.index), chunk}
378                 for _, pp := range prePeers {
379                         pp.requestsInPiece--
380                 }
381                 sortPeersForPiece(&req)
382                 for _, pp := range prePeers {
383                         delete(pp.nextState.Requests, req)
384                 }
385                 for _, peer := range peersForPiece {
386                         if !peer.canFitRequest() {
387                                 continue
388                         }
389                         if !peer.HasPiece(p.index) {
390                                 continue
391                         }
392                         if !peer.pieceAllowedFastOrDefault(p.index) {
393                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
394                                 peer.nextState.Interested = true
395                                 if peer.Choking {
396                                         continue
397                                 }
398                         }
399                         peer.addNextRequest(req)
400                         continue chunk
401                 }
402         }
403         if pendingChunksRemaining != 0 {
404                 panic(pendingChunksRemaining)
405         }
406 }