8 "github.com/anacrolix/missinggo/iter"
9 "github.com/anacrolix/missinggo/v2/bitmap"
10 "github.com/anacrolix/missinggo/v2/prioritybitmap"
12 pp "github.com/anacrolix/torrent/peer_protocol"
15 type requestStrategyPiece interface {
16 numChunks() pp.Integer
17 dirtyChunks() bitmap.Bitmap
18 chunkIndexRequest(i pp.Integer) request
21 type requestStrategyTorrent interface {
25 readerPiecePriorities() (now, readahead bitmap.Bitmap)
26 ignorePieces() bitmap.Bitmap
27 pendingPieces() *prioritybitmap.PriorityBitmap
30 type requestStrategyConnection interface {
31 torrent() requestStrategyTorrent
32 peerPieces() bitmap.Bitmap
33 pieceRequestOrder() *prioritybitmap.PriorityBitmap
36 totalExpectingTime() time.Duration
38 chunksReceivedWhileExpecting() int64
41 type requestStrategyDefaults struct{}
43 func (requestStrategyDefaults) hooks() requestStrategyHooks {
44 return requestStrategyHooks{
45 sentRequest: func(request) {},
46 deletedRequest: func(request) {},
50 type requestStrategy interface {
51 iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool
52 iterUndirtiedChunks(requestStrategyPiece, func(chunkSpec) bool) bool
53 nominalMaxRequests(requestStrategyConnection) int
54 shouldRequestWithoutBias(requestStrategyConnection) bool
55 piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
56 hooks() requestStrategyHooks
59 type requestStrategyHooks struct {
60 sentRequest func(request)
61 deletedRequest func(request)
64 type requestStrategyCallbacks interface {
65 requestTimedOut(request)
68 // Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across
70 type requestStrategyFuzzing struct {
71 requestStrategyDefaults
74 // The fastest connection downloads strictly in order of priority, while all others adhere to their
75 // piece inclinations.
76 type requestStrategyFastest struct {
77 requestStrategyDefaults
80 func newRequestStrategyMaker(rs requestStrategy) RequestStrategyMaker {
81 return func(requestStrategyCallbacks, sync.Locker) requestStrategy {
86 func RequestStrategyFastest() RequestStrategyMaker {
87 return newRequestStrategyMaker(requestStrategyFastest{})
90 func RequestStrategyFuzzing() RequestStrategyMaker {
91 return newRequestStrategyMaker(requestStrategyFuzzing{})
94 func (requestStrategyFastest) ShouldRequestWithoutBias(cn requestStrategyConnection) bool {
95 if cn.torrent().numReaders() == 0 {
98 if cn.torrent().numConns() == 1 {
107 // Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is
109 type requestStrategyDuplicateRequestTimeout struct {
110 // How long to avoid duplicating a pending request.
111 duplicateRequestTimeout time.Duration
113 callbacks requestStrategyCallbacks
115 // The last time we requested a chunk. Deleting the request from any connection will clear this
117 lastRequested map[request]*time.Timer
118 // The lock to take when running a request timeout handler.
119 timeoutLocker sync.Locker
122 type RequestStrategyMaker func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy
124 func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duration) RequestStrategyMaker {
125 return func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy {
126 return requestStrategyDuplicateRequestTimeout{
127 duplicateRequestTimeout: duplicateRequestTimeout,
128 callbacks: callbacks,
129 lastRequested: make(map[request]*time.Timer),
130 timeoutLocker: clientLocker,
135 func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
136 return requestStrategyHooks{
137 deletedRequest: func(r request) {
138 if t, ok := rs.lastRequested[r]; ok {
140 delete(rs.lastRequested, r)
143 sentRequest: rs.onSentRequest,
147 func defaultPiecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
151 func (requestStrategyFastest) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
152 return defaultPiecePriority(cn, piece, tpp, prio)
155 func (requestStrategyDuplicateRequestTimeout) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
156 return defaultPiecePriority(cn, piece, tpp, prio)
159 func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
160 for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
161 if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
164 r := p.chunkIndexRequest(i)
165 if rs.wouldDuplicateRecent(r) {
175 func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
176 chunkIndices := p.dirtyChunks().Copy()
177 chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
178 return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
179 ci, err := chunkIndices.RB.Select(uint32(i))
183 return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec)
187 func (rs requestStrategyFuzzing) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
188 return defaultIterUndirtiedChunks(p, f)
191 func (rs requestStrategyFastest) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
192 return defaultIterUndirtiedChunks(p, f)
195 func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
197 case PiecePriorityNormal:
198 case PiecePriorityReadahead:
199 prio -= int(cn.torrent().numPieces())
200 case PiecePriorityNext, PiecePriorityNow:
201 prio -= 2 * int(cn.torrent().numPieces())
205 prio += int(piece / 3)
209 func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
210 return iterUnbiasedPieceRequestOrder(cn, f)
212 func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool {
213 if rs.shouldRequestWithoutBias(cn) {
214 return iterUnbiasedPieceRequestOrder(cn, f)
216 return cn.pieceRequestOrder().IterTyped(func(i int) bool {
217 return f(pieceIndex(i))
221 func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
222 return defaultIterPendingPieces(rs, cn, cb)
224 func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
225 return defaultIterPendingPieces(rs, cn, cb)
228 func defaultShouldRequestWithoutBias(cn requestStrategyConnection) bool {
232 func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
233 if cn.torrent().numReaders() == 0 {
236 if cn.torrent().numConns() == 1 {
245 func (requestStrategyFuzzing) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
246 return defaultShouldRequestWithoutBias(cn)
249 func (requestStrategyDuplicateRequestTimeout) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
250 return defaultShouldRequestWithoutBias(cn)
253 func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) {
254 rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
255 rs.timeoutLocker.Lock()
256 delete(rs.lastRequested, r)
257 rs.timeoutLocker.Unlock()
258 rs.callbacks.requestTimedOut(r)
262 // The actual value to use as the maximum outbound requests.
263 func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
264 expectingTime := int64(cn.totalExpectingTime())
265 if expectingTime == 0 {
266 expectingTime = math.MaxInt64
272 int64(cn.peerMaxRequests()),
274 // It makes sense to always pipeline at least one connection, since latency must be
277 // Request only as many as we expect to receive in the duplicateRequestTimeout
278 // window. We are trying to avoid having to duplicate requests.
279 cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
283 func defaultNominalMaxRequests(cn requestStrategyConnection) int {
286 cn.stats().ChunksReadUseful.Int64()-(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64())))
288 func (rs requestStrategyFuzzing) nominalMaxRequests(cn requestStrategyConnection) int {
289 return defaultNominalMaxRequests(cn)
291 func (rs requestStrategyFastest) nominalMaxRequests(cn requestStrategyConnection) int {
292 return defaultNominalMaxRequests(cn)
295 func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool {
296 // This piece has been requested on another connection, and the duplicate request timer is still
298 _, ok := rs.lastRequested[r]