]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy/order.go
Merge branch 'peer-requesting'
[btrtrc.git] / request-strategy / order.go
1 package request_strategy
2
3 import (
4         "bytes"
5         "fmt"
6         "sort"
7         "sync"
8
9         "github.com/anacrolix/multiless"
10         "github.com/anacrolix/torrent/metainfo"
11         "github.com/anacrolix/torrent/storage"
12
13         "github.com/anacrolix/torrent/types"
14 )
15
16 type (
17         RequestIndex  = uint32
18         ChunkIndex    = uint32
19         Request       = types.Request
20         pieceIndex    = types.PieceIndex
21         piecePriority = types.PiecePriority
22         // This can be made into a type-param later, will be great for testing.
23         ChunkSpec = types.ChunkSpec
24 )
25
26 type ClientPieceOrder struct{}
27
28 type filterTorrent struct {
29         *Torrent
30         unverifiedBytes int64
31         // Potentially shared with other torrents.
32         storageLeft *int64
33 }
34
35 func sortFilterPieces(pieces []filterPiece) {
36         sort.Slice(pieces, func(_i, _j int) bool {
37                 i := &pieces[_i]
38                 j := &pieces[_j]
39                 return multiless.New().Int(
40                         int(j.Priority), int(i.Priority),
41                 ).Bool(
42                         j.Partial, i.Partial,
43                 ).Int64(
44                         i.Availability, j.Availability,
45                 ).Int(
46                         i.index, j.index,
47                 ).Lazy(func() multiless.Computation {
48                         return multiless.New().Cmp(bytes.Compare(
49                                 i.t.InfoHash[:],
50                                 j.t.InfoHash[:],
51                         ))
52                 }).MustLess()
53         })
54 }
55
56 type requestsPeer struct {
57         Peer
58         nextState                  PeerNextRequestState
59         requestablePiecesRemaining int
60 }
61
62 func (rp *requestsPeer) canFitRequest() bool {
63         return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
64 }
65
66 func (rp *requestsPeer) addNextRequest(r RequestIndex) {
67         if !rp.nextState.Requests.CheckedAdd(r) {
68                 panic("should only add once")
69         }
70 }
71
72 type peersForPieceRequests struct {
73         requestsInPiece int
74         *requestsPeer
75 }
76
77 func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
78         me.requestsPeer.addNextRequest(r)
79         me.requestsInPiece++
80 }
81
82 type requestablePiece struct {
83         index             pieceIndex
84         t                 *Torrent
85         alwaysReallocate  bool
86         NumPendingChunks  int
87         IterPendingChunks ChunksIterFunc
88 }
89
90 func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
91         return p.t.ChunksPerPiece*uint32(p.index) + c
92 }
93
94 type filterPiece struct {
95         t     *filterTorrent
96         index pieceIndex
97         *Piece
98 }
99
100 // Calls f with requestable pieces in order.
101 func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
102         maxPieces := 0
103         for i := range input.Torrents {
104                 maxPieces += len(input.Torrents[i].Pieces)
105         }
106         pieces := make([]filterPiece, 0, maxPieces)
107         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
108         // TorrentImpl. A nil value means no capacity limit.
109         storageLeft := make(map[storage.TorrentCapacity]*int64)
110         for _t := range input.Torrents {
111                 // TODO: We could do metainfo requests here.
112                 t := &filterTorrent{
113                         Torrent:         &input.Torrents[_t],
114                         unverifiedBytes: 0,
115                 }
116                 key := t.Capacity
117                 if key != nil {
118                         if _, ok := storageLeft[key]; !ok {
119                                 capacity, ok := (*key)()
120                                 if ok {
121                                         storageLeft[key] = &capacity
122                                 } else {
123                                         storageLeft[key] = nil
124                                 }
125                         }
126                         t.storageLeft = storageLeft[key]
127                 }
128                 for i := range t.Pieces {
129                         pieces = append(pieces, filterPiece{
130                                 t:     t,
131                                 index: i,
132                                 Piece: &t.Pieces[i],
133                         })
134                 }
135         }
136         sortFilterPieces(pieces)
137         var allTorrentsUnverifiedBytes int64
138         for _, piece := range pieces {
139                 if left := piece.t.storageLeft; left != nil {
140                         if *left < int64(piece.Length) {
141                                 continue
142                         }
143                         *left -= int64(piece.Length)
144                 }
145                 if !piece.Request || piece.NumPendingChunks == 0 {
146                         // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
147                         // considered unverified and hold up further requests.
148                         continue
149                 }
150                 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
151                         continue
152                 }
153                 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
154                         continue
155                 }
156                 piece.t.unverifiedBytes += piece.Length
157                 allTorrentsUnverifiedBytes += piece.Length
158                 f(piece.t.Torrent, piece.Piece, piece.index)
159         }
160         return
161 }
162
163 type Input struct {
164         Torrents           []Torrent
165         MaxUnverifiedBytes int64
166 }
167
168 // TODO: We could do metainfo requests here.
169 func Run(input Input) map[PeerId]PeerNextRequestState {
170         var requestPieces []requestablePiece
171         GetRequestablePieces(input, func(t *Torrent, piece *Piece, pieceIndex int) {
172                 requestPieces = append(requestPieces, requestablePiece{
173                         index:             pieceIndex,
174                         t:                 t,
175                         NumPendingChunks:  piece.NumPendingChunks,
176                         IterPendingChunks: piece.iterPendingChunksWrapper,
177                         alwaysReallocate:  piece.Priority >= types.PiecePriorityNext,
178                 })
179         })
180         torrents := input.Torrents
181         allPeers := make(map[metainfo.Hash][]*requestsPeer, len(torrents))
182         for _, t := range torrents {
183                 peers := make([]*requestsPeer, 0, len(t.Peers))
184                 for _, p := range t.Peers {
185                         peers = append(peers, &requestsPeer{
186                                 Peer: p,
187                         })
188                 }
189                 allPeers[t.InfoHash] = peers
190         }
191         for _, piece := range requestPieces {
192                 for _, peer := range allPeers[piece.t.InfoHash] {
193                         if peer.canRequestPiece(piece.index) {
194                                 peer.requestablePiecesRemaining++
195                         }
196                 }
197         }
198         for _, piece := range requestPieces {
199                 allocatePendingChunks(piece, allPeers[piece.t.InfoHash])
200         }
201         ret := make(map[PeerId]PeerNextRequestState)
202         for _, peers := range allPeers {
203                 for _, rp := range peers {
204                         if rp.requestablePiecesRemaining != 0 {
205                                 panic(rp.requestablePiecesRemaining)
206                         }
207                         if _, ok := ret[rp.Id]; ok {
208                                 panic(fmt.Sprintf("duplicate peer id: %v", rp.Id))
209                         }
210                         ret[rp.Id] = rp.nextState
211                 }
212         }
213         return ret
214 }
215
216 // Checks that a sorted peersForPiece slice makes sense.
217 func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
218         if !sort.IsSorted(peers) {
219                 panic("not sorted")
220         }
221         peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
222         for _, p := range peers.peersForPiece {
223                 if _, ok := peerMap[p]; ok {
224                         panic(p)
225                 }
226                 peerMap[p] = struct{}{}
227         }
228 }
229
230 var peersForPiecesPool sync.Pool
231
232 func makePeersForPiece(cap int) []*peersForPieceRequests {
233         got := peersForPiecesPool.Get()
234         if got == nil {
235                 return make([]*peersForPieceRequests, 0, cap)
236         }
237         return got.([]*peersForPieceRequests)[:0]
238 }
239
240 type peersForPieceSorter struct {
241         peersForPiece []*peersForPieceRequests
242         req           *RequestIndex
243         p             requestablePiece
244 }
245
246 func (me *peersForPieceSorter) Len() int {
247         return len(me.peersForPiece)
248 }
249
250 func (me *peersForPieceSorter) Swap(i, j int) {
251         me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
252 }
253
254 func (me *peersForPieceSorter) Less(_i, _j int) bool {
255         i := me.peersForPiece[_i]
256         j := me.peersForPiece[_j]
257         req := me.req
258         p := &me.p
259         byHasRequest := func() multiless.Computation {
260                 ml := multiless.New()
261                 if req != nil {
262                         iHas := i.nextState.Requests.Contains(*req)
263                         jHas := j.nextState.Requests.Contains(*req)
264                         ml = ml.Bool(jHas, iHas)
265                 }
266                 return ml
267         }()
268         ml := multiless.New()
269         // We always "reallocate", that is force even striping amongst peers that are either on
270         // the last piece they can contribute too, or for pieces marked for this behaviour.
271         // Striping prevents starving peers of requests, and will always re-balance to the
272         // fastest known peers.
273         if !p.alwaysReallocate {
274                 ml = ml.Bool(
275                         j.requestablePiecesRemaining == 1,
276                         i.requestablePiecesRemaining == 1)
277         }
278         if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
279                 ml = ml.Int(
280                         i.requestsInPiece,
281                         j.requestsInPiece)
282         } else {
283                 ml = ml.AndThen(byHasRequest)
284         }
285         ml = ml.Int(
286                 i.requestablePiecesRemaining,
287                 j.requestablePiecesRemaining,
288         ).Float64(
289                 j.DownloadRate,
290                 i.DownloadRate,
291         )
292         if ml.Ok() {
293                 return ml.Less()
294         }
295         ml = ml.AndThen(byHasRequest)
296         return ml.Int64(
297                 int64(j.Age), int64(i.Age),
298                 // TODO: Probably peer priority can come next
299         ).Uintptr(
300                 i.Id.Uintptr(),
301                 j.Id.Uintptr(),
302         ).MustLess()
303 }
304
305 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
306         peersForPiece := makePeersForPiece(len(peers))
307         for _, peer := range peers {
308                 if !peer.canRequestPiece(p.index) {
309                         continue
310                 }
311                 if !peer.canFitRequest() {
312                         peer.requestablePiecesRemaining--
313                         continue
314                 }
315                 peersForPiece = append(peersForPiece, &peersForPieceRequests{
316                         requestsInPiece: 0,
317                         requestsPeer:    peer,
318                 })
319         }
320         defer func() {
321                 for _, peer := range peersForPiece {
322                         peer.requestablePiecesRemaining--
323                 }
324                 peersForPiecesPool.Put(peersForPiece)
325         }()
326         peersForPieceSorter := peersForPieceSorter{
327                 peersForPiece: peersForPiece,
328                 p:             p,
329         }
330         sortPeersForPiece := func(req *RequestIndex) {
331                 peersForPieceSorter.req = req
332                 sort.Sort(&peersForPieceSorter)
333                 //ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
334         }
335         // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
336         // with "next" request state before another request strategy run occurs.
337         preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
338         p.IterPendingChunks(func(spec ChunkIndex) {
339                 req := p.chunkIndexToRequestIndex(spec)
340                 for _, peer := range peersForPiece {
341                         if !peer.ExistingRequests.Contains(req) {
342                                 continue
343                         }
344                         if !peer.canFitRequest() {
345                                 continue
346                         }
347                         preallocated[spec] = append(preallocated[spec], peer)
348                         peer.addNextRequest(req)
349                 }
350         })
351         pendingChunksRemaining := int(p.NumPendingChunks)
352         p.IterPendingChunks(func(chunk ChunkIndex) {
353                 if len(preallocated[chunk]) != 0 {
354                         return
355                 }
356                 req := p.chunkIndexToRequestIndex(chunk)
357                 defer func() { pendingChunksRemaining-- }()
358                 sortPeersForPiece(nil)
359                 for _, peer := range peersForPiece {
360                         if !peer.canFitRequest() {
361                                 continue
362                         }
363                         if !peer.PieceAllowedFast.ContainsInt(p.index) {
364                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
365                                 peer.nextState.Interested = true
366                                 if peer.Choking {
367                                         continue
368                                 }
369                         }
370                         peer.addNextRequest(req)
371                         break
372                 }
373         })
374 chunk:
375         for chunk, prePeers := range preallocated {
376                 if len(prePeers) == 0 {
377                         continue
378                 }
379                 pendingChunksRemaining--
380                 req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
381                 for _, pp := range prePeers {
382                         pp.requestsInPiece--
383                 }
384                 sortPeersForPiece(&req)
385                 for _, pp := range prePeers {
386                         pp.nextState.Requests.Remove(req)
387                 }
388                 for _, peer := range peersForPiece {
389                         if !peer.canFitRequest() {
390                                 continue
391                         }
392                         if !peer.PieceAllowedFast.ContainsInt(p.index) {
393                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
394                                 peer.nextState.Interested = true
395                                 if peer.Choking {
396                                         continue
397                                 }
398                         }
399                         peer.addNextRequest(req)
400                         continue chunk
401                 }
402         }
403         if pendingChunksRemaining != 0 {
404                 panic(pendingChunksRemaining)
405         }
406 }