]> Sergey Matveev's repositories - btrtrc.git/blob - request_strategy.go
Move all the request strategy stuff into its file
[btrtrc.git] / request_strategy.go
1 package torrent
2
3 import (
4         "math"
5         "sync"
6         "time"
7
8         "github.com/anacrolix/missinggo/iter"
9         "github.com/anacrolix/missinggo/v2/bitmap"
10         "github.com/anacrolix/missinggo/v2/prioritybitmap"
11
12         pp "github.com/anacrolix/torrent/peer_protocol"
13 )
14
15 type requestStrategyPiece interface {
16         numChunks() pp.Integer
17         dirtyChunks() bitmap.Bitmap
18         chunkIndexRequest(i pp.Integer) request
19 }
20
21 type requestStrategyTorrent interface {
22         numConns() int
23         numReaders() int
24         numPieces() int
25         readerPiecePriorities() (now, readahead bitmap.Bitmap)
26         ignorePieces() bitmap.Bitmap
27         pendingPieces() *prioritybitmap.PriorityBitmap
28 }
29
30 type requestStrategyConnection interface {
31         torrent() requestStrategyTorrent
32         peerPieces() bitmap.Bitmap
33         pieceRequestOrder() *prioritybitmap.PriorityBitmap
34         fastest() bool
35         stats() *ConnStats
36         totalExpectingTime() time.Duration
37         peerMaxRequests() int
38         chunksReceivedWhileExpecting() int64
39 }
40
41 type requestStrategyDefaults struct{}
42
43 func (requestStrategyDefaults) hooks() requestStrategyHooks {
44         return requestStrategyHooks{
45                 sentRequest:    func(request) {},
46                 deletedRequest: func(request) {},
47         }
48 }
49
50 type requestStrategy interface {
51         iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool
52         iterUndirtiedChunks(requestStrategyPiece, func(chunkSpec) bool) bool
53         nominalMaxRequests(requestStrategyConnection) int
54         shouldRequestWithoutBias(requestStrategyConnection) bool
55         piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
56         hooks() requestStrategyHooks
57 }
58
59 type requestStrategyHooks struct {
60         sentRequest    func(request)
61         deletedRequest func(request)
62 }
63
64 type requestStrategyCallbacks interface {
65         requestTimedOut(request)
66 }
67
68 // Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across
69 // connections.
70 type requestStrategyFuzzing struct {
71         requestStrategyDefaults
72 }
73
74 // The fastest connection downloads strictly in order of priority, while all others adhere to their
75 // piece inclinations.
76 type requestStrategyFastest struct {
77         requestStrategyDefaults
78 }
79
80 func newRequestStrategyMaker(rs requestStrategy) RequestStrategyMaker {
81         return func(requestStrategyCallbacks, sync.Locker) requestStrategy {
82                 return rs
83         }
84 }
85
86 func RequestStrategyFastest() RequestStrategyMaker {
87         return newRequestStrategyMaker(requestStrategyFastest{})
88 }
89
90 func RequestStrategyFuzzing() RequestStrategyMaker {
91         return newRequestStrategyMaker(requestStrategyFuzzing{})
92 }
93
94 func (requestStrategyFastest) ShouldRequestWithoutBias(cn requestStrategyConnection) bool {
95         if cn.torrent().numReaders() == 0 {
96                 return false
97         }
98         if cn.torrent().numConns() == 1 {
99                 return true
100         }
101         if cn.fastest() {
102                 return true
103         }
104         return false
105 }
106
107 // Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is
108 // reached.
109 type requestStrategyDuplicateRequestTimeout struct {
110         // How long to avoid duplicating a pending request.
111         duplicateRequestTimeout time.Duration
112
113         callbacks requestStrategyCallbacks
114
115         // The last time we requested a chunk. Deleting the request from any connection will clear this
116         // value.
117         lastRequested map[request]*time.Timer
118         // The lock to take when running a request timeout handler.
119         timeoutLocker sync.Locker
120 }
121
122 type RequestStrategyMaker func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy
123
124 func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duration) RequestStrategyMaker {
125         return func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy {
126                 return requestStrategyDuplicateRequestTimeout{
127                         duplicateRequestTimeout: duplicateRequestTimeout,
128                         callbacks:               callbacks,
129                         lastRequested:           make(map[request]*time.Timer),
130                         timeoutLocker:           clientLocker,
131                 }
132         }
133 }
134
135 func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
136         return requestStrategyHooks{
137                 deletedRequest: func(r request) {
138                         if t, ok := rs.lastRequested[r]; ok {
139                                 t.Stop()
140                                 delete(rs.lastRequested, r)
141                         }
142                 },
143                 sentRequest: rs.onSentRequest,
144         }
145 }
146
147 func defaultPiecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
148         return prio
149 }
150
151 func (requestStrategyFastest) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
152         return defaultPiecePriority(cn, piece, tpp, prio)
153 }
154
155 func (requestStrategyDuplicateRequestTimeout) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
156         return defaultPiecePriority(cn, piece, tpp, prio)
157 }
158
159 func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
160         for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
161                 if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
162                         continue
163                 }
164                 r := p.chunkIndexRequest(i)
165                 if rs.wouldDuplicateRecent(r) {
166                         continue
167                 }
168                 if !f(r.chunkSpec) {
169                         return false
170                 }
171         }
172         return true
173 }
174
175 func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
176         chunkIndices := p.dirtyChunks().Copy()
177         chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
178         return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
179                 ci, err := chunkIndices.RB.Select(uint32(i))
180                 if err != nil {
181                         panic(err)
182                 }
183                 return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec)
184         })
185 }
186
187 func (rs requestStrategyFuzzing) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
188         return defaultIterUndirtiedChunks(p, f)
189 }
190
191 func (rs requestStrategyFastest) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
192         return defaultIterUndirtiedChunks(p, f)
193 }
194
195 func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
196         switch tpp {
197         case PiecePriorityNormal:
198         case PiecePriorityReadahead:
199                 prio -= int(cn.torrent().numPieces())
200         case PiecePriorityNext, PiecePriorityNow:
201                 prio -= 2 * int(cn.torrent().numPieces())
202         default:
203                 panic(tpp)
204         }
205         prio += int(piece / 3)
206         return prio
207 }
208
209 func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
210         return iterUnbiasedPieceRequestOrder(cn, f)
211 }
212 func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool {
213         if rs.shouldRequestWithoutBias(cn) {
214                 return iterUnbiasedPieceRequestOrder(cn, f)
215         } else {
216                 return cn.pieceRequestOrder().IterTyped(func(i int) bool {
217                         return f(pieceIndex(i))
218                 })
219         }
220 }
221 func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
222         return defaultIterPendingPieces(rs, cn, cb)
223 }
224 func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
225         return defaultIterPendingPieces(rs, cn, cb)
226 }
227
228 func defaultShouldRequestWithoutBias(cn requestStrategyConnection) bool {
229         return false
230 }
231
232 func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
233         if cn.torrent().numReaders() == 0 {
234                 return false
235         }
236         if cn.torrent().numConns() == 1 {
237                 return true
238         }
239         if cn.fastest() {
240                 return true
241         }
242         return false
243 }
244
245 func (requestStrategyFuzzing) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
246         return defaultShouldRequestWithoutBias(cn)
247 }
248
249 func (requestStrategyDuplicateRequestTimeout) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
250         return defaultShouldRequestWithoutBias(cn)
251 }
252
253 func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) {
254         rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
255                 rs.timeoutLocker.Lock()
256                 delete(rs.lastRequested, r)
257                 rs.timeoutLocker.Unlock()
258                 rs.callbacks.requestTimedOut(r)
259         })
260 }
261
262 // The actual value to use as the maximum outbound requests.
263 func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
264         expectingTime := int64(cn.totalExpectingTime())
265         if expectingTime == 0 {
266                 expectingTime = math.MaxInt64
267         } else {
268                 expectingTime *= 2
269         }
270         return int(clamp(
271                 1,
272                 int64(cn.peerMaxRequests()),
273                 max(
274                         // It makes sense to always pipeline at least one connection, since latency must be
275                         // non-zero.
276                         2,
277                         // Request only as many as we expect to receive in the duplicateRequestTimeout
278                         // window. We are trying to avoid having to duplicate requests.
279                         cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
280                 ),
281         ))
282 }
283 func defaultNominalMaxRequests(cn requestStrategyConnection) int {
284         return int(
285                 max(64,
286                         cn.stats().ChunksReadUseful.Int64()-(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64())))
287 }
288 func (rs requestStrategyFuzzing) nominalMaxRequests(cn requestStrategyConnection) int {
289         return defaultNominalMaxRequests(cn)
290 }
291 func (rs requestStrategyFastest) nominalMaxRequests(cn requestStrategyConnection) int {
292         return defaultNominalMaxRequests(cn)
293 }
294
295 func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool {
296         // This piece has been requested on another connection, and the duplicate request timer is still
297         // running.
298         _, ok := rs.lastRequested[r]
299         return ok
300 }