]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy/order.go
Remove unused peer stuff in request strategy
[btrtrc.git] / request-strategy / order.go
1 package request_strategy
2
3 import (
4         "bytes"
5         "sort"
6         "sync"
7
8         "github.com/anacrolix/multiless"
9         "github.com/anacrolix/torrent/storage"
10
11         "github.com/anacrolix/torrent/types"
12 )
13
14 type (
15         RequestIndex  = uint32
16         ChunkIndex    = uint32
17         Request       = types.Request
18         pieceIndex    = types.PieceIndex
19         piecePriority = types.PiecePriority
20         // This can be made into a type-param later, will be great for testing.
21         ChunkSpec = types.ChunkSpec
22 )
23
24 type ClientPieceOrder struct{}
25
26 type filterTorrent struct {
27         *Torrent
28         unverifiedBytes int64
29         // Potentially shared with other torrents.
30         storageLeft *int64
31 }
32
33 func sortFilterPieces(pieces []filterPiece) {
34         sort.Slice(pieces, func(_i, _j int) bool {
35                 i := &pieces[_i]
36                 j := &pieces[_j]
37                 return multiless.New().Int(
38                         int(j.Priority), int(i.Priority),
39                 ).Bool(
40                         j.Partial, i.Partial,
41                 ).Int64(
42                         i.Availability, j.Availability,
43                 ).Int(
44                         i.index, j.index,
45                 ).Lazy(func() multiless.Computation {
46                         return multiless.New().Cmp(bytes.Compare(
47                                 i.t.InfoHash[:],
48                                 j.t.InfoHash[:],
49                         ))
50                 }).MustLess()
51         })
52 }
53
54 type requestsPeer struct {
55         Peer
56         nextState                  PeerNextRequestState
57         requestablePiecesRemaining int
58 }
59
60 func (rp *requestsPeer) canFitRequest() bool {
61         return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
62 }
63
64 func (rp *requestsPeer) addNextRequest(r RequestIndex) {
65         if !rp.nextState.Requests.CheckedAdd(r) {
66                 panic("should only add once")
67         }
68 }
69
70 type peersForPieceRequests struct {
71         requestsInPiece int
72         *requestsPeer
73 }
74
75 func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
76         me.requestsPeer.addNextRequest(r)
77         me.requestsInPiece++
78 }
79
80 type requestablePiece struct {
81         index             pieceIndex
82         t                 *Torrent
83         alwaysReallocate  bool
84         NumPendingChunks  int
85         IterPendingChunks ChunksIterFunc
86 }
87
88 func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
89         return p.t.ChunksPerPiece*uint32(p.index) + c
90 }
91
92 type filterPiece struct {
93         t     *filterTorrent
94         index pieceIndex
95         *Piece
96 }
97
98 // Calls f with requestable pieces in order.
99 func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
100         maxPieces := 0
101         for i := range input.Torrents {
102                 maxPieces += len(input.Torrents[i].Pieces)
103         }
104         pieces := make([]filterPiece, 0, maxPieces)
105         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
106         // TorrentImpl. A nil value means no capacity limit.
107         storageLeft := make(map[storage.TorrentCapacity]*int64)
108         for _t := range input.Torrents {
109                 // TODO: We could do metainfo requests here.
110                 t := &filterTorrent{
111                         Torrent:         &input.Torrents[_t],
112                         unverifiedBytes: 0,
113                 }
114                 key := t.Capacity
115                 if key != nil {
116                         if _, ok := storageLeft[key]; !ok {
117                                 capacity, ok := (*key)()
118                                 if ok {
119                                         storageLeft[key] = &capacity
120                                 } else {
121                                         storageLeft[key] = nil
122                                 }
123                         }
124                         t.storageLeft = storageLeft[key]
125                 }
126                 for i := range t.Pieces {
127                         pieces = append(pieces, filterPiece{
128                                 t:     t,
129                                 index: i,
130                                 Piece: &t.Pieces[i],
131                         })
132                 }
133         }
134         sortFilterPieces(pieces)
135         var allTorrentsUnverifiedBytes int64
136         for _, piece := range pieces {
137                 if left := piece.t.storageLeft; left != nil {
138                         if *left < int64(piece.Length) {
139                                 continue
140                         }
141                         *left -= int64(piece.Length)
142                 }
143                 if !piece.Request || piece.NumPendingChunks == 0 {
144                         // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
145                         // considered unverified and hold up further requests.
146                         continue
147                 }
148                 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
149                         continue
150                 }
151                 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
152                         continue
153                 }
154                 piece.t.unverifiedBytes += piece.Length
155                 allTorrentsUnverifiedBytes += piece.Length
156                 f(piece.t.Torrent, piece.Piece, piece.index)
157         }
158         return
159 }
160
161 type Input struct {
162         Torrents           []Torrent
163         MaxUnverifiedBytes int64
164 }
165
166 // Checks that a sorted peersForPiece slice makes sense.
167 func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
168         if !sort.IsSorted(peers) {
169                 panic("not sorted")
170         }
171         peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
172         for _, p := range peers.peersForPiece {
173                 if _, ok := peerMap[p]; ok {
174                         panic(p)
175                 }
176                 peerMap[p] = struct{}{}
177         }
178 }
179
180 var peersForPiecesPool sync.Pool
181
182 func makePeersForPiece(cap int) []*peersForPieceRequests {
183         got := peersForPiecesPool.Get()
184         if got == nil {
185                 return make([]*peersForPieceRequests, 0, cap)
186         }
187         return got.([]*peersForPieceRequests)[:0]
188 }
189
190 type peersForPieceSorter struct {
191         peersForPiece []*peersForPieceRequests
192         req           *RequestIndex
193         p             requestablePiece
194 }
195
196 func (me *peersForPieceSorter) Len() int {
197         return len(me.peersForPiece)
198 }
199
200 func (me *peersForPieceSorter) Swap(i, j int) {
201         me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
202 }
203
204 func (me *peersForPieceSorter) Less(_i, _j int) bool {
205         i := me.peersForPiece[_i]
206         j := me.peersForPiece[_j]
207         req := me.req
208         p := &me.p
209         byHasRequest := func() multiless.Computation {
210                 ml := multiless.New()
211                 if req != nil {
212                         iHas := i.nextState.Requests.Contains(*req)
213                         jHas := j.nextState.Requests.Contains(*req)
214                         ml = ml.Bool(jHas, iHas)
215                 }
216                 return ml
217         }()
218         ml := multiless.New()
219         // We always "reallocate", that is force even striping amongst peers that are either on
220         // the last piece they can contribute too, or for pieces marked for this behaviour.
221         // Striping prevents starving peers of requests, and will always re-balance to the
222         // fastest known peers.
223         if !p.alwaysReallocate {
224                 ml = ml.Bool(
225                         j.requestablePiecesRemaining == 1,
226                         i.requestablePiecesRemaining == 1)
227         }
228         if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
229                 ml = ml.Int(
230                         i.requestsInPiece,
231                         j.requestsInPiece)
232         } else {
233                 ml = ml.AndThen(byHasRequest)
234         }
235         ml = ml.Int(
236                 i.requestablePiecesRemaining,
237                 j.requestablePiecesRemaining,
238         ).Float64(
239                 j.DownloadRate,
240                 i.DownloadRate,
241         )
242         if ml.Ok() {
243                 return ml.Less()
244         }
245         ml = ml.AndThen(byHasRequest)
246         return ml.Int64(
247                 int64(j.Age), int64(i.Age),
248                 // TODO: Probably peer priority can come next
249         ).Uintptr(
250                 i.Id.Uintptr(),
251                 j.Id.Uintptr(),
252         ).MustLess()
253 }
254
255 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
256         peersForPiece := makePeersForPiece(len(peers))
257         for _, peer := range peers {
258                 if !peer.canRequestPiece(p.index) {
259                         continue
260                 }
261                 if !peer.canFitRequest() {
262                         peer.requestablePiecesRemaining--
263                         continue
264                 }
265                 peersForPiece = append(peersForPiece, &peersForPieceRequests{
266                         requestsInPiece: 0,
267                         requestsPeer:    peer,
268                 })
269         }
270         defer func() {
271                 for _, peer := range peersForPiece {
272                         peer.requestablePiecesRemaining--
273                 }
274                 peersForPiecesPool.Put(peersForPiece)
275         }()
276         peersForPieceSorter := peersForPieceSorter{
277                 peersForPiece: peersForPiece,
278                 p:             p,
279         }
280         sortPeersForPiece := func(req *RequestIndex) {
281                 peersForPieceSorter.req = req
282                 sort.Sort(&peersForPieceSorter)
283                 // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
284         }
285         // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
286         // with "next" request state before another request strategy run occurs.
287         preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
288         p.IterPendingChunks(func(spec ChunkIndex) {
289                 req := p.chunkIndexToRequestIndex(spec)
290                 for _, peer := range peersForPiece {
291                         if !peer.ExistingRequests.Contains(req) {
292                                 continue
293                         }
294                         if !peer.canFitRequest() {
295                                 continue
296                         }
297                         preallocated[spec] = append(preallocated[spec], peer)
298                         peer.addNextRequest(req)
299                 }
300         })
301         pendingChunksRemaining := int(p.NumPendingChunks)
302         p.IterPendingChunks(func(chunk ChunkIndex) {
303                 if len(preallocated[chunk]) != 0 {
304                         return
305                 }
306                 req := p.chunkIndexToRequestIndex(chunk)
307                 defer func() { pendingChunksRemaining-- }()
308                 sortPeersForPiece(nil)
309                 for _, peer := range peersForPiece {
310                         if !peer.canFitRequest() {
311                                 continue
312                         }
313                         if !peer.PieceAllowedFast.ContainsInt(p.index) {
314                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
315                                 peer.nextState.Interested = true
316                                 if peer.Choking {
317                                         continue
318                                 }
319                         }
320                         peer.addNextRequest(req)
321                         break
322                 }
323         })
324 chunk:
325         for chunk, prePeers := range preallocated {
326                 if len(prePeers) == 0 {
327                         continue
328                 }
329                 pendingChunksRemaining--
330                 req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
331                 for _, pp := range prePeers {
332                         pp.requestsInPiece--
333                 }
334                 sortPeersForPiece(&req)
335                 for _, pp := range prePeers {
336                         pp.nextState.Requests.Remove(req)
337                 }
338                 for _, peer := range peersForPiece {
339                         if !peer.canFitRequest() {
340                                 continue
341                         }
342                         if !peer.PieceAllowedFast.ContainsInt(p.index) {
343                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
344                                 peer.nextState.Interested = true
345                                 if peer.Choking {
346                                         continue
347                                 }
348                         }
349                         peer.addNextRequest(req)
350                         continue chunk
351                 }
352         }
353         if pendingChunksRemaining != 0 {
354                 panic(pendingChunksRemaining)
355         }
356 }