]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy/order.go
Optimize peersForPieces allocations for make and sorting
[btrtrc.git] / request-strategy / order.go
1 package request_strategy
2
3 import (
4         "fmt"
5         "sort"
6         "sync"
7
8         "github.com/anacrolix/multiless"
9
10         pp "github.com/anacrolix/torrent/peer_protocol"
11         "github.com/anacrolix/torrent/types"
12 )
13
14 type (
15         Request       = types.Request
16         pieceIndex    = types.PieceIndex
17         piecePriority = types.PiecePriority
18         // This can be made into a type-param later, will be great for testing.
19         ChunkSpec = types.ChunkSpec
20 )
21
22 type ClientPieceOrder struct{}
23
24 type filterTorrent struct {
25         *Torrent
26         unverifiedBytes int64
27         // Potentially shared with other torrents.
28         storageLeft *int64
29 }
30
31 func sortFilterPieces(pieces []filterPiece) {
32         sort.Slice(pieces, func(_i, _j int) bool {
33                 i := &pieces[_i]
34                 j := &pieces[_j]
35                 return multiless.New().Int(
36                         int(j.Priority), int(i.Priority),
37                 ).Bool(
38                         j.Partial, i.Partial,
39                 ).Int64(
40                         i.Availability, j.Availability,
41                 ).Int(
42                         i.index, j.index,
43                 ).Uintptr(
44                         i.t.StableId, j.t.StableId,
45                 ).MustLess()
46         })
47 }
48
49 type requestsPeer struct {
50         Peer
51         nextState                  PeerNextRequestState
52         requestablePiecesRemaining int
53 }
54
55 func (rp *requestsPeer) canFitRequest() bool {
56         return len(rp.nextState.Requests) < rp.MaxRequests
57 }
58
59 func (rp *requestsPeer) addNextRequest(r Request) {
60         _, ok := rp.nextState.Requests[r]
61         if ok {
62                 panic("should only add once")
63         }
64         rp.nextState.Requests[r] = struct{}{}
65 }
66
67 type peersForPieceRequests struct {
68         requestsInPiece int
69         *requestsPeer
70 }
71
72 func (me *peersForPieceRequests) addNextRequest(r Request) {
73         me.requestsPeer.addNextRequest(r)
74         me.requestsInPiece++
75 }
76
77 type requestablePiece struct {
78         index             pieceIndex
79         t                 *Torrent
80         alwaysReallocate  bool
81         NumPendingChunks  int
82         IterPendingChunks ChunksIter
83 }
84
85 type filterPiece struct {
86         t     *filterTorrent
87         index pieceIndex
88         *Piece
89 }
90
91 func getRequestablePieces(input Input) (ret []requestablePiece) {
92         maxPieces := 0
93         for i := range input.Torrents {
94                 maxPieces += len(input.Torrents[i].Pieces)
95         }
96         pieces := make([]filterPiece, 0, maxPieces)
97         ret = make([]requestablePiece, 0, maxPieces)
98         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
99         // TorrentImpl.
100         storageLeft := make(map[*func() *int64]*int64)
101         for _t := range input.Torrents {
102                 // TODO: We could do metainfo requests here.
103                 t := &filterTorrent{
104                         Torrent:         &input.Torrents[_t],
105                         unverifiedBytes: 0,
106                 }
107                 key := t.Capacity
108                 if key != nil {
109                         if _, ok := storageLeft[key]; !ok {
110                                 storageLeft[key] = (*key)()
111                         }
112                         t.storageLeft = storageLeft[key]
113                 }
114                 for i := range t.Pieces {
115                         pieces = append(pieces, filterPiece{
116                                 t:     t,
117                                 index: i,
118                                 Piece: &t.Pieces[i],
119                         })
120                 }
121         }
122         sortFilterPieces(pieces)
123         var allTorrentsUnverifiedBytes int64
124         for _, piece := range pieces {
125                 if left := piece.t.storageLeft; left != nil {
126                         if *left < int64(piece.Length) {
127                                 continue
128                         }
129                         *left -= int64(piece.Length)
130                 }
131                 if !piece.Request || piece.NumPendingChunks == 0 {
132                         // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
133                         // considered unverified and hold up further requests.
134                         continue
135                 }
136                 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
137                         continue
138                 }
139                 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
140                         continue
141                 }
142                 piece.t.unverifiedBytes += piece.Length
143                 allTorrentsUnverifiedBytes += piece.Length
144                 ret = append(ret, requestablePiece{
145                         index:             piece.index,
146                         t:                 piece.t.Torrent,
147                         NumPendingChunks:  piece.NumPendingChunks,
148                         IterPendingChunks: piece.iterPendingChunksWrapper,
149                         alwaysReallocate:  piece.Priority >= types.PiecePriorityNext,
150                 })
151         }
152         return
153 }
154
155 type Input struct {
156         Torrents           []Torrent
157         MaxUnverifiedBytes int64
158 }
159
160 // TODO: We could do metainfo requests here.
161 func Run(input Input) map[PeerId]PeerNextRequestState {
162         requestPieces := getRequestablePieces(input)
163         torrents := input.Torrents
164         allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
165         for _, t := range torrents {
166                 peers := make([]*requestsPeer, 0, len(t.Peers))
167                 for _, p := range t.Peers {
168                         peers = append(peers, &requestsPeer{
169                                 Peer: p,
170                                 nextState: PeerNextRequestState{
171                                         Requests: make(map[Request]struct{}, p.MaxRequests),
172                                 },
173                         })
174                 }
175                 allPeers[t.StableId] = peers
176         }
177         for _, piece := range requestPieces {
178                 for _, peer := range allPeers[piece.t.StableId] {
179                         if peer.canRequestPiece(piece.index) {
180                                 peer.requestablePiecesRemaining++
181                         }
182                 }
183         }
184         for _, piece := range requestPieces {
185                 allocatePendingChunks(piece, allPeers[piece.t.StableId])
186         }
187         ret := make(map[PeerId]PeerNextRequestState)
188         for _, peers := range allPeers {
189                 for _, rp := range peers {
190                         if rp.requestablePiecesRemaining != 0 {
191                                 panic(rp.requestablePiecesRemaining)
192                         }
193                         if _, ok := ret[rp.Id]; ok {
194                                 panic(fmt.Sprintf("duplicate peer id: %v", rp.Id))
195                         }
196                         ret[rp.Id] = rp.nextState
197                 }
198         }
199         return ret
200 }
201
202 // Checks that a sorted peersForPiece slice makes sense.
203 func ensureValidSortedPeersForPieceRequests(peers peersForPieceSorter) {
204         if !sort.IsSorted(peers) {
205                 panic("not sorted")
206         }
207         peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
208         for _, p := range peers.peersForPiece {
209                 if _, ok := peerMap[p]; ok {
210                         panic(p)
211                 }
212                 peerMap[p] = struct{}{}
213         }
214 }
215
216 var peersForPiecesPool sync.Pool
217
218 func makePeersForPiece(cap int) []*peersForPieceRequests {
219         got := peersForPiecesPool.Get()
220         if got == nil {
221                 return make([]*peersForPieceRequests, 0, cap)
222         }
223         return got.([]*peersForPieceRequests)[:0]
224 }
225
226 type peersForPieceSorter struct {
227         peersForPiece []*peersForPieceRequests
228         req           *Request
229         p             requestablePiece
230 }
231
232 func (me peersForPieceSorter) Len() int {
233         return len(me.peersForPiece)
234 }
235
236 func (me peersForPieceSorter) Swap(i, j int) {
237         me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
238 }
239
240 func (me peersForPieceSorter) Less(_i, _j int) bool {
241         i := me.peersForPiece[_i]
242         j := me.peersForPiece[_j]
243         req := me.req
244         p := me.p
245         byHasRequest := func() multiless.Computation {
246                 ml := multiless.New()
247                 if req != nil {
248                         _, iHas := i.nextState.Requests[*req]
249                         _, jHas := j.nextState.Requests[*req]
250                         ml = ml.Bool(jHas, iHas)
251                 }
252                 return ml
253         }()
254         ml := multiless.New()
255         // We always "reallocate", that is force even striping amongst peers that are either on
256         // the last piece they can contribute too, or for pieces marked for this behaviour.
257         // Striping prevents starving peers of requests, and will always re-balance to the
258         // fastest known peers.
259         if !p.alwaysReallocate {
260                 ml = ml.Bool(
261                         j.requestablePiecesRemaining == 1,
262                         i.requestablePiecesRemaining == 1)
263         }
264         if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
265                 ml = ml.Int(
266                         i.requestsInPiece,
267                         j.requestsInPiece)
268         } else {
269                 ml = ml.AndThen(byHasRequest)
270         }
271         ml = ml.Int(
272                 i.requestablePiecesRemaining,
273                 j.requestablePiecesRemaining,
274         ).Float64(
275                 j.DownloadRate,
276                 i.DownloadRate,
277         )
278         ml = ml.AndThen(byHasRequest)
279         return ml.Int64(
280                 int64(j.Age), int64(i.Age),
281                 // TODO: Probably peer priority can come next
282         ).Uintptr(
283                 i.Id.Uintptr(),
284                 j.Id.Uintptr(),
285         ).MustLess()
286 }
287
288 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
289         peersForPiece := makePeersForPiece(len(peers))
290         for _, peer := range peers {
291                 peersForPiece = append(peersForPiece, &peersForPieceRequests{
292                         requestsInPiece: 0,
293                         requestsPeer:    peer,
294                 })
295         }
296         defer func() {
297                 for _, peer := range peersForPiece {
298                         if peer.canRequestPiece(p.index) {
299                                 peer.requestablePiecesRemaining--
300                         }
301                 }
302                 peersForPiecesPool.Put(peersForPiece)
303         }()
304         peersForPieceSorter := peersForPieceSorter{
305                 peersForPiece: peersForPiece,
306                 p:             p,
307         }
308         sortPeersForPiece := func(req *Request) {
309                 peersForPieceSorter.req = req
310                 sort.Sort(&peersForPieceSorter)
311                 //ensureValidSortedPeersForPieceRequests(peersForPieceSorter)
312         }
313         // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
314         // with "next" request state before another request strategy run occurs.
315         preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks)
316         p.IterPendingChunks(func(spec ChunkSpec) {
317                 req := Request{pp.Integer(p.index), spec}
318                 for _, peer := range peersForPiece {
319                         if h := peer.HasExistingRequest; h == nil || !h(req) {
320                                 continue
321                         }
322                         if !peer.canFitRequest() {
323                                 continue
324                         }
325                         if !peer.canRequestPiece(p.index) {
326                                 continue
327                         }
328                         preallocated[spec] = append(preallocated[spec], peer)
329                         peer.addNextRequest(req)
330                 }
331         })
332         pendingChunksRemaining := int(p.NumPendingChunks)
333         p.IterPendingChunks(func(chunk types.ChunkSpec) {
334                 if _, ok := preallocated[chunk]; ok {
335                         return
336                 }
337                 req := Request{pp.Integer(p.index), chunk}
338                 defer func() { pendingChunksRemaining-- }()
339                 sortPeersForPiece(nil)
340                 for _, peer := range peersForPiece {
341                         if !peer.canFitRequest() {
342                                 continue
343                         }
344                         if !peer.HasPiece(p.index) {
345                                 continue
346                         }
347                         if !peer.pieceAllowedFastOrDefault(p.index) {
348                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
349                                 peer.nextState.Interested = true
350                                 if peer.Choking {
351                                         continue
352                                 }
353                         }
354                         peer.addNextRequest(req)
355                         break
356                 }
357         })
358 chunk:
359         for chunk, prePeers := range preallocated {
360                 pendingChunksRemaining--
361                 req := Request{pp.Integer(p.index), chunk}
362                 for _, pp := range prePeers {
363                         pp.requestsInPiece--
364                 }
365                 sortPeersForPiece(&req)
366                 for _, pp := range prePeers {
367                         delete(pp.nextState.Requests, req)
368                 }
369                 for _, peer := range peersForPiece {
370                         if !peer.canFitRequest() {
371                                 continue
372                         }
373                         if !peer.HasPiece(p.index) {
374                                 continue
375                         }
376                         if !peer.pieceAllowedFastOrDefault(p.index) {
377                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
378                                 peer.nextState.Interested = true
379                                 if peer.Choking {
380                                         continue
381                                 }
382                         }
383                         peer.addNextRequest(req)
384                         continue chunk
385                 }
386         }
387         if pendingChunksRemaining != 0 {
388                 panic(pendingChunksRemaining)
389         }
390 }