]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy/order.go
Export request_strategy.GetRequestablePieces
[btrtrc.git] / request-strategy / order.go
1 package request_strategy
2
3 import (
4         "fmt"
5         "sort"
6         "sync"
7
8         "github.com/anacrolix/multiless"
9         "github.com/anacrolix/torrent/storage"
10
11         pp "github.com/anacrolix/torrent/peer_protocol"
12         "github.com/anacrolix/torrent/types"
13 )
14
15 type (
16         Request       = types.Request
17         pieceIndex    = types.PieceIndex
18         piecePriority = types.PiecePriority
19         // This can be made into a type-param later, will be great for testing.
20         ChunkSpec = types.ChunkSpec
21 )
22
23 type ClientPieceOrder struct{}
24
25 type filterTorrent struct {
26         *Torrent
27         unverifiedBytes int64
28         // Potentially shared with other torrents.
29         storageLeft *int64
30 }
31
32 func sortFilterPieces(pieces []filterPiece) {
33         sort.Slice(pieces, func(_i, _j int) bool {
34                 i := &pieces[_i]
35                 j := &pieces[_j]
36                 return multiless.New().Int(
37                         int(j.Priority), int(i.Priority),
38                 ).Bool(
39                         j.Partial, i.Partial,
40                 ).Int64(
41                         i.Availability, j.Availability,
42                 ).Int(
43                         i.index, j.index,
44                 ).Uintptr(
45                         i.t.StableId, j.t.StableId,
46                 ).MustLess()
47         })
48 }
49
50 type requestsPeer struct {
51         Peer
52         nextState                  PeerNextRequestState
53         requestablePiecesRemaining int
54 }
55
56 func (rp *requestsPeer) canFitRequest() bool {
57         return len(rp.nextState.Requests) < rp.MaxRequests
58 }
59
60 func (rp *requestsPeer) addNextRequest(r Request) {
61         _, ok := rp.nextState.Requests[r]
62         if ok {
63                 panic("should only add once")
64         }
65         rp.nextState.Requests[r] = struct{}{}
66 }
67
68 type peersForPieceRequests struct {
69         requestsInPiece int
70         *requestsPeer
71 }
72
73 func (me *peersForPieceRequests) addNextRequest(r Request) {
74         me.requestsPeer.addNextRequest(r)
75         me.requestsInPiece++
76 }
77
78 type requestablePiece struct {
79         index             pieceIndex
80         t                 *Torrent
81         alwaysReallocate  bool
82         NumPendingChunks  int
83         IterPendingChunks ChunksIter
84 }
85
86 type filterPiece struct {
87         t     *filterTorrent
88         index pieceIndex
89         *Piece
90 }
91
92 // Calls f with requestable pieces in order.
93 func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
94         maxPieces := 0
95         for i := range input.Torrents {
96                 maxPieces += len(input.Torrents[i].Pieces)
97         }
98         pieces := make([]filterPiece, 0, maxPieces)
99         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
100         // TorrentImpl. A nil value means no capacity limit.
101         storageLeft := make(map[storage.TorrentCapacity]*int64)
102         for _t := range input.Torrents {
103                 // TODO: We could do metainfo requests here.
104                 t := &filterTorrent{
105                         Torrent:         &input.Torrents[_t],
106                         unverifiedBytes: 0,
107                 }
108                 key := t.Capacity
109                 if key != nil {
110                         if _, ok := storageLeft[key]; !ok {
111                                 capacity, ok := (*key)()
112                                 if ok {
113                                         storageLeft[key] = &capacity
114                                 } else {
115                                         storageLeft[key] = nil
116                                 }
117                         }
118                         t.storageLeft = storageLeft[key]
119                 }
120                 for i := range t.Pieces {
121                         pieces = append(pieces, filterPiece{
122                                 t:     t,
123                                 index: i,
124                                 Piece: &t.Pieces[i],
125                         })
126                 }
127         }
128         sortFilterPieces(pieces)
129         var allTorrentsUnverifiedBytes int64
130         for _, piece := range pieces {
131                 if left := piece.t.storageLeft; left != nil {
132                         if *left < int64(piece.Length) {
133                                 continue
134                         }
135                         *left -= int64(piece.Length)
136                 }
137                 if !piece.Request || piece.NumPendingChunks == 0 {
138                         // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
139                         // considered unverified and hold up further requests.
140                         continue
141                 }
142                 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
143                         continue
144                 }
145                 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
146                         continue
147                 }
148                 piece.t.unverifiedBytes += piece.Length
149                 allTorrentsUnverifiedBytes += piece.Length
150                 f(piece.t.Torrent, piece.Piece, piece.index)
151         }
152         return
153 }
154
155 type Input struct {
156         Torrents           []Torrent
157         MaxUnverifiedBytes int64
158 }
159
160 // TODO: We could do metainfo requests here.
161 func Run(input Input) map[PeerId]PeerNextRequestState {
162         var requestPieces []requestablePiece
163         GetRequestablePieces(input, func(t *Torrent, piece *Piece, pieceIndex int) {
164                 requestPieces = append(requestPieces, requestablePiece{
165                         index:             pieceIndex,
166                         t:                 t,
167                         NumPendingChunks:  piece.NumPendingChunks,
168                         IterPendingChunks: piece.iterPendingChunksWrapper,
169                         alwaysReallocate:  piece.Priority >= types.PiecePriorityNext,
170                 })
171         })
172         torrents := input.Torrents
173         allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
174         for _, t := range torrents {
175                 peers := make([]*requestsPeer, 0, len(t.Peers))
176                 for _, p := range t.Peers {
177                         peers = append(peers, &requestsPeer{
178                                 Peer: p,
179                                 nextState: PeerNextRequestState{
180                                         Requests: make(map[Request]struct{}, p.MaxRequests),
181                                 },
182                         })
183                 }
184                 allPeers[t.StableId] = peers
185         }
186         for _, piece := range requestPieces {
187                 for _, peer := range allPeers[piece.t.StableId] {
188                         if peer.canRequestPiece(piece.index) {
189                                 peer.requestablePiecesRemaining++
190                         }
191                 }
192         }
193         for _, piece := range requestPieces {
194                 allocatePendingChunks(piece, allPeers[piece.t.StableId])
195         }
196         ret := make(map[PeerId]PeerNextRequestState)
197         for _, peers := range allPeers {
198                 for _, rp := range peers {
199                         if rp.requestablePiecesRemaining != 0 {
200                                 panic(rp.requestablePiecesRemaining)
201                         }
202                         if _, ok := ret[rp.Id]; ok {
203                                 panic(fmt.Sprintf("duplicate peer id: %v", rp.Id))
204                         }
205                         ret[rp.Id] = rp.nextState
206                 }
207         }
208         return ret
209 }
210
211 // Checks that a sorted peersForPiece slice makes sense.
212 func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
213         if !sort.IsSorted(peers) {
214                 panic("not sorted")
215         }
216         peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
217         for _, p := range peers.peersForPiece {
218                 if _, ok := peerMap[p]; ok {
219                         panic(p)
220                 }
221                 peerMap[p] = struct{}{}
222         }
223 }
224
225 var peersForPiecesPool sync.Pool
226
227 func makePeersForPiece(cap int) []*peersForPieceRequests {
228         got := peersForPiecesPool.Get()
229         if got == nil {
230                 return make([]*peersForPieceRequests, 0, cap)
231         }
232         return got.([]*peersForPieceRequests)[:0]
233 }
234
235 type peersForPieceSorter struct {
236         peersForPiece []*peersForPieceRequests
237         req           *Request
238         p             requestablePiece
239 }
240
241 func (me *peersForPieceSorter) Len() int {
242         return len(me.peersForPiece)
243 }
244
245 func (me *peersForPieceSorter) Swap(i, j int) {
246         me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
247 }
248
249 func (me *peersForPieceSorter) Less(_i, _j int) bool {
250         i := me.peersForPiece[_i]
251         j := me.peersForPiece[_j]
252         req := me.req
253         p := &me.p
254         byHasRequest := func() multiless.Computation {
255                 ml := multiless.New()
256                 if req != nil {
257                         _, iHas := i.nextState.Requests[*req]
258                         _, jHas := j.nextState.Requests[*req]
259                         ml = ml.Bool(jHas, iHas)
260                 }
261                 return ml
262         }()
263         ml := multiless.New()
264         // We always "reallocate", that is force even striping amongst peers that are either on
265         // the last piece they can contribute too, or for pieces marked for this behaviour.
266         // Striping prevents starving peers of requests, and will always re-balance to the
267         // fastest known peers.
268         if !p.alwaysReallocate {
269                 ml = ml.Bool(
270                         j.requestablePiecesRemaining == 1,
271                         i.requestablePiecesRemaining == 1)
272         }
273         if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
274                 ml = ml.Int(
275                         i.requestsInPiece,
276                         j.requestsInPiece)
277         } else {
278                 ml = ml.AndThen(byHasRequest)
279         }
280         ml = ml.Int(
281                 i.requestablePiecesRemaining,
282                 j.requestablePiecesRemaining,
283         ).Float64(
284                 j.DownloadRate,
285                 i.DownloadRate,
286         )
287         if ml.Ok() {
288                 return ml.Less()
289         }
290         ml = ml.AndThen(byHasRequest)
291         return ml.Int64(
292                 int64(j.Age), int64(i.Age),
293                 // TODO: Probably peer priority can come next
294         ).Uintptr(
295                 i.Id.Uintptr(),
296                 j.Id.Uintptr(),
297         ).MustLess()
298 }
299
300 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
301         peersForPiece := makePeersForPiece(len(peers))
302         for _, peer := range peers {
303                 if !peer.canRequestPiece(p.index) {
304                         continue
305                 }
306                 if !peer.canFitRequest() {
307                         peer.requestablePiecesRemaining--
308                         continue
309                 }
310                 peersForPiece = append(peersForPiece, &peersForPieceRequests{
311                         requestsInPiece: 0,
312                         requestsPeer:    peer,
313                 })
314         }
315         defer func() {
316                 for _, peer := range peersForPiece {
317                         peer.requestablePiecesRemaining--
318                 }
319                 peersForPiecesPool.Put(peersForPiece)
320         }()
321         peersForPieceSorter := peersForPieceSorter{
322                 peersForPiece: peersForPiece,
323                 p:             p,
324         }
325         sortPeersForPiece := func(req *Request) {
326                 peersForPieceSorter.req = req
327                 sort.Sort(&peersForPieceSorter)
328                 //ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
329         }
330         // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
331         // with "next" request state before another request strategy run occurs.
332         preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks)
333         p.IterPendingChunks(func(spec ChunkSpec) {
334                 req := Request{pp.Integer(p.index), spec}
335                 for _, peer := range peersForPiece {
336                         if h := peer.HasExistingRequest; h == nil || !h(req) {
337                                 continue
338                         }
339                         if !peer.canFitRequest() {
340                                 continue
341                         }
342                         preallocated[spec] = append(preallocated[spec], peer)
343                         peer.addNextRequest(req)
344                 }
345         })
346         pendingChunksRemaining := int(p.NumPendingChunks)
347         p.IterPendingChunks(func(chunk types.ChunkSpec) {
348                 if _, ok := preallocated[chunk]; ok {
349                         return
350                 }
351                 req := Request{pp.Integer(p.index), chunk}
352                 defer func() { pendingChunksRemaining-- }()
353                 sortPeersForPiece(nil)
354                 for _, peer := range peersForPiece {
355                         if !peer.canFitRequest() {
356                                 continue
357                         }
358                         if !peer.pieceAllowedFastOrDefault(p.index) {
359                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
360                                 peer.nextState.Interested = true
361                                 if peer.Choking {
362                                         continue
363                                 }
364                         }
365                         peer.addNextRequest(req)
366                         break
367                 }
368         })
369 chunk:
370         for chunk, prePeers := range preallocated {
371                 pendingChunksRemaining--
372                 req := Request{pp.Integer(p.index), chunk}
373                 for _, pp := range prePeers {
374                         pp.requestsInPiece--
375                 }
376                 sortPeersForPiece(&req)
377                 for _, pp := range prePeers {
378                         delete(pp.nextState.Requests, req)
379                 }
380                 for _, peer := range peersForPiece {
381                         if !peer.canFitRequest() {
382                                 continue
383                         }
384                         if !peer.pieceAllowedFastOrDefault(p.index) {
385                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
386                                 peer.nextState.Interested = true
387                                 if peer.Choking {
388                                         continue
389                                 }
390                         }
391                         peer.addNextRequest(req)
392                         continue chunk
393                 }
394         }
395         if pendingChunksRemaining != 0 {
396                 panic(pendingChunksRemaining)
397         }
398 }