]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy.go
07d0cce6005eaf3e399bda60afd6cd3701a27e85
[btrtrc.git] / request-strategy.go
1 package torrent
2
3 import (
4         "math"
5         "sync"
6         "time"
7
8         "github.com/anacrolix/missinggo/v2/bitmap"
9         "github.com/anacrolix/missinggo/v2/prioritybitmap"
10
11         pp "github.com/anacrolix/torrent/peer_protocol"
12 )
13
14 type requestStrategyPiece interface {
15         numChunks() pp.Integer
16         dirtyChunks() bitmap.Bitmap
17         chunkIndexRequest(i pp.Integer) request
18 }
19
20 type requestStrategyTorrent interface {
21         numConns() int
22         numReaders() int
23         numPieces() int
24         readerPiecePriorities() (now, readahead bitmap.Bitmap)
25         ignorePieces() bitmap.Bitmap
26         pendingPieces() *prioritybitmap.PriorityBitmap
27 }
28
29 type requestStrategyConnection interface {
30         torrent() requestStrategyTorrent
31         peerPieces() bitmap.Bitmap
32         pieceRequestOrder() *prioritybitmap.PriorityBitmap
33         fastest() bool
34         stats() *ConnStats
35         totalExpectingTime() time.Duration
36         peerMaxRequests() int
37         chunksReceivedWhileExpecting() int64
38 }
39
40 type requestStrategy interface {
41         iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool
42         iterUndirtiedChunks(requestStrategyPiece, func(chunkSpec) bool) bool
43         nominalMaxRequests(requestStrategyConnection) int
44         shouldRequestWithoutBias(requestStrategyConnection) bool
45         piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
46         hooks() requestStrategyHooks
47 }
48
49 type requestStrategyHooks struct {
50         sentRequest    func(request)
51         deletedRequest func(request)
52 }
53
54 type requestStrategyCallbacks interface {
55         requestTimedOut(request)
56 }
57
58 type requestStrategyFuzzing struct {
59         requestStrategyDefaults
60 }
61
62 type requestStrategyFastest struct {
63         requestStrategyDefaults
64 }
65
66 func newRequestStrategyMaker(rs requestStrategy) requestStrategyMaker {
67         return func(requestStrategyCallbacks, sync.Locker) requestStrategy {
68                 return rs
69         }
70 }
71
72 // The fastest connection downloads strictly in order of priority, while all others adhere to their
73 // piece inclinations.
74 func RequestStrategyFastest() requestStrategyMaker {
75         return newRequestStrategyMaker(requestStrategyFastest{})
76 }
77
78 // Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across
79 // connections.
80 func RequestStrategyFuzzing() requestStrategyMaker {
81         return newRequestStrategyMaker(requestStrategyFuzzing{})
82 }
83
84 func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
85         if cn.torrent().numReaders() == 0 {
86                 return false
87         }
88         if cn.torrent().numConns() == 1 {
89                 return true
90         }
91         if cn.fastest() {
92                 return true
93         }
94         return false
95 }
96
97 type requestStrategyDuplicateRequestTimeout struct {
98         requestStrategyDefaults
99         // How long to avoid duplicating a pending request.
100         duplicateRequestTimeout time.Duration
101
102         callbacks requestStrategyCallbacks
103
104         // The last time we requested a chunk. Deleting the request from any connection will clear this
105         // value.
106         lastRequested map[request]*time.Timer
107         // The lock to take when running a request timeout handler.
108         timeoutLocker sync.Locker
109 }
110
111 // Generates a request strategy instance for a given torrent. callbacks are probably specific to the torrent.
112 type requestStrategyMaker func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy
113
114 // Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is
115 // reached.
116 func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duration) requestStrategyMaker {
117         return func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy {
118                 return requestStrategyDuplicateRequestTimeout{
119                         duplicateRequestTimeout: duplicateRequestTimeout,
120                         callbacks:               callbacks,
121                         lastRequested:           make(map[request]*time.Timer),
122                         timeoutLocker:           clientLocker,
123                 }
124         }
125 }
126
127 func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
128         return requestStrategyHooks{
129                 deletedRequest: func(r request) {
130                         if t, ok := rs.lastRequested[r]; ok {
131                                 t.Stop()
132                                 delete(rs.lastRequested, r)
133                         }
134                 },
135                 sentRequest: rs.onSentRequest,
136         }
137 }
138
139 func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
140         for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
141                 if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
142                         continue
143                 }
144                 r := p.chunkIndexRequest(i)
145                 if rs.wouldDuplicateRecent(r) {
146                         continue
147                 }
148                 if !f(r.chunkSpec) {
149                         return false
150                 }
151         }
152         return true
153 }
154
155 func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
156         switch tpp {
157         case PiecePriorityNormal:
158         case PiecePriorityReadahead:
159                 prio -= int(cn.torrent().numPieces())
160         case PiecePriorityNext, PiecePriorityNow:
161                 prio -= 2 * int(cn.torrent().numPieces())
162         default:
163                 panic(tpp)
164         }
165         prio += int(piece / 3)
166         return prio
167 }
168
169 func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
170         return iterUnbiasedPieceRequestOrder(cn, f)
171 }
172 func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool {
173         if rs.shouldRequestWithoutBias(cn) {
174                 return iterUnbiasedPieceRequestOrder(cn, f)
175         } else {
176                 return cn.pieceRequestOrder().IterTyped(func(i int) bool {
177                         return f(pieceIndex(i))
178                 })
179         }
180 }
181 func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
182         return defaultIterPendingPieces(rs, cn, cb)
183 }
184 func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
185         return defaultIterPendingPieces(rs, cn, cb)
186 }
187
188 func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) {
189         rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
190                 rs.timeoutLocker.Lock()
191                 delete(rs.lastRequested, r)
192                 rs.timeoutLocker.Unlock()
193                 rs.callbacks.requestTimedOut(r)
194         })
195 }
196
197 // The actual value to use as the maximum outbound requests.
198 func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
199         expectingTime := int64(cn.totalExpectingTime())
200         if expectingTime == 0 {
201                 expectingTime = math.MaxInt64
202         } else {
203                 expectingTime *= 2
204         }
205         return int(clamp(
206                 1,
207                 int64(cn.peerMaxRequests()),
208                 max(
209                         // It makes sense to always pipeline at least one connection, since latency must be
210                         // non-zero.
211                         2,
212                         // Request only as many as we expect to receive in the duplicateRequestTimeout
213                         // window. We are trying to avoid having to duplicate requests.
214                         cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
215                 ),
216         ))
217 }
218 func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool {
219         // This piece has been requested on another connection, and the duplicate request timer is still
220         // running.
221         _, ok := rs.lastRequested[r]
222         return ok
223 }