]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Implement piece request ordering with retained state
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "math/rand"
8         "reflect"
9         "runtime/pprof"
10         "time"
11         "unsafe"
12
13         "github.com/anacrolix/log"
14         "github.com/anacrolix/multiless"
15
16         request_strategy "github.com/anacrolix/torrent/request-strategy"
17 )
18
19 // Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
20 func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
21         input.MaxUnverifiedBytes = cl.config.MaxUnverifiedBytes
22         if !primaryTorrent.haveInfo() {
23                 return
24         }
25         if capFunc := primaryTorrent.storage.Capacity; capFunc != nil {
26                 if cap, ok := (*capFunc)(); ok {
27                         input.Capacity = &cap
28                 }
29         }
30         if false {
31                 if input.Capacity == nil {
32                         input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()}
33                         return
34                 }
35         }
36         input.Torrents = make([]request_strategy.Torrent, 0, len(cl.torrents))
37         for _, t := range cl.torrents {
38                 if !t.haveInfo() {
39                         // This would be removed if metadata is handled here. Determining chunks per piece
40                         // requires the info. If we have no info, we have no pieces too, so the end result is
41                         // the same.
42                         continue
43                 }
44                 if t.storage.Capacity != primaryTorrent.storage.Capacity {
45                         continue
46                 }
47                 input.Torrents = append(input.Torrents, t.requestStrategyTorrentInput())
48         }
49         return
50 }
51
52 func (t *Torrent) getRequestStrategyInput() request_strategy.Input {
53         return t.cl.getRequestStrategyInput(t)
54 }
55
56 func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent {
57         rst := request_strategy.Torrent{
58                 InfoHash:       t.infoHash,
59                 ChunksPerPiece: t.chunksPerRegularPiece(),
60         }
61         rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
62         for i := range t.pieces {
63                 rst.Pieces = append(rst.Pieces, t.makeRequestStrategyPiece(i))
64         }
65         return rst
66 }
67
68 func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
69         return request_strategy.PieceRequestOrderState{
70                 Priority:     t.piece(i).purePriority(),
71                 Partial:      t.piecePartiallyDownloaded(i),
72                 Availability: t.piece(i).availability,
73         }
74 }
75
76 func (t *Torrent) makeRequestStrategyPiece(i int) request_strategy.Piece {
77         p := &t.pieces[i]
78         return request_strategy.Piece{
79                 Request:           !t.ignorePieceForRequests(i),
80                 Priority:          p.purePriority(),
81                 Partial:           t.piecePartiallyDownloaded(i),
82                 Availability:      p.availability,
83                 Length:            int64(p.length()),
84                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
85                 IterPendingChunks: &p.undirtiedChunksIter,
86         }
87 }
88
89 func init() {
90         gob.Register(peerId{})
91 }
92
93 type peerId struct {
94         *Peer
95         ptr uintptr
96 }
97
98 func (p peerId) Uintptr() uintptr {
99         return p.ptr
100 }
101
102 func (p peerId) GobEncode() (b []byte, _ error) {
103         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
104                 Data: uintptr(unsafe.Pointer(&p.ptr)),
105                 Len:  int(unsafe.Sizeof(p.ptr)),
106                 Cap:  int(unsafe.Sizeof(p.ptr)),
107         }
108         return
109 }
110
111 func (p *peerId) GobDecode(b []byte) error {
112         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
113                 panic(len(b))
114         }
115         ptr := unsafe.Pointer(&b[0])
116         p.ptr = *(*uintptr)(ptr)
117         log.Printf("%p", ptr)
118         dst := reflect.SliceHeader{
119                 Data: uintptr(unsafe.Pointer(&p.Peer)),
120                 Len:  int(unsafe.Sizeof(p.Peer)),
121                 Cap:  int(unsafe.Sizeof(p.Peer)),
122         }
123         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
124         return nil
125 }
126
127 type (
128         RequestIndex   = request_strategy.RequestIndex
129         chunkIndexType = request_strategy.ChunkIndex
130 )
131
132 type peerRequests struct {
133         requestIndexes       []RequestIndex
134         peer                 *Peer
135         torrentStrategyInput *request_strategy.Torrent
136 }
137
138 func (p *peerRequests) Len() int {
139         return len(p.requestIndexes)
140 }
141
142 func (p *peerRequests) Less(i, j int) bool {
143         leftRequest := p.requestIndexes[i]
144         rightRequest := p.requestIndexes[j]
145         t := p.peer.t
146         leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
147         rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
148         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
149         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
150         pending := func(index RequestIndex, current bool) int {
151                 ret := t.pendingRequests.Get(index)
152                 if current {
153                         ret--
154                 }
155                 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
156                 // resolved.
157                 if ret < 0 {
158                         panic(ret)
159                 }
160                 return ret
161         }
162         ml := multiless.New()
163         // Push requests that can't be served right now to the end. But we don't throw them away unless
164         // there's a better alternative. This is for when we're using the fast extension and get choked
165         // but our requests could still be good when we get unchoked.
166         if p.peer.peerChoking {
167                 ml = ml.Bool(
168                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
169                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
170                 )
171         }
172         ml = ml.Int(
173                 pending(leftRequest, leftCurrent),
174                 pending(rightRequest, rightCurrent))
175         ml = ml.Bool(!leftCurrent, !rightCurrent)
176         ml = ml.Int(
177                 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
178                 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
179         )
180         ml = ml.Int(
181                 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
182                 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
183         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
184         ml = ml.Uint32(leftRequest, rightRequest)
185         return ml.MustLess()
186 }
187
188 func (p *peerRequests) Swap(i, j int) {
189         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
190 }
191
192 func (p *peerRequests) Push(x interface{}) {
193         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
194 }
195
196 func (p *peerRequests) Pop() interface{} {
197         last := len(p.requestIndexes) - 1
198         x := p.requestIndexes[last]
199         p.requestIndexes = p.requestIndexes[:last]
200         return x
201 }
202
203 type desiredRequestState struct {
204         Requests   []RequestIndex
205         Interested bool
206 }
207
208 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
209         input := p.t.getRequestStrategyInput()
210         requestHeap := peerRequests{
211                 peer: p,
212         }
213         for i := range input.Torrents {
214                 t := &input.Torrents[i]
215                 if t.InfoHash == p.t.infoHash {
216                         requestHeap.torrentStrategyInput = t
217                         break
218                 }
219         }
220         request_strategy.GetRequestablePieces(
221                 input,
222                 p.t.cl.pieceRequestOrder[p.t.storage.Capacity],
223                 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
224                         if t.InfoHash != p.t.infoHash {
225                                 return
226                         }
227                         if !p.peerHasPiece(pieceIndex) {
228                                 return
229                         }
230                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
231                         rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
232                                 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
233                                 // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
234                                 //      return
235                                 // }
236                                 if !allowedFast {
237                                         // We must signal interest to request this
238                                         desired.Interested = true
239                                         // We can make or will allow sustaining a request here if we're not choked, or
240                                         // have made the request previously (presumably while unchoked), and haven't had
241                                         // the peer respond yet (and the request was retained because we are using the
242                                         // fast extension).
243                                         if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
244                                                 // We can't request this right now.
245                                                 return
246                                         }
247                                 }
248                                 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
249                         })
250                 },
251         )
252         p.t.assertPendingRequests()
253         heap.Init(&requestHeap)
254         for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
255                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
256                 desired.Requests = append(desired.Requests, requestIndex)
257         }
258         return
259 }
260
261 func (p *Peer) maybeUpdateActualRequestState() bool {
262         if p.needRequestUpdate == "" {
263                 return true
264         }
265         var more bool
266         pprof.Do(
267                 context.Background(),
268                 pprof.Labels("update request", p.needRequestUpdate),
269                 func(_ context.Context) {
270                         next := p.getDesiredRequestState()
271                         more = p.applyRequestState(next)
272                 },
273         )
274         return more
275 }
276
277 // Transmit/action the request state to the peer.
278 func (p *Peer) applyRequestState(next desiredRequestState) bool {
279         current := &p.actualRequestState
280         if !p.setInterested(next.Interested) {
281                 return false
282         }
283         more := true
284         cancel := current.Requests.Clone()
285         for _, ri := range next.Requests {
286                 cancel.Remove(ri)
287         }
288         cancel.Iterate(func(req uint32) bool {
289                 more = p.cancel(req)
290                 return more
291         })
292         if !more {
293                 return false
294         }
295         shuffled := false
296         lastPending := 0
297         for i := 0; i < len(next.Requests); i++ {
298                 req := next.Requests[i]
299                 if p.cancelledRequests.Contains(req) {
300                         // Waiting for a reject or piece message, which will suitably trigger us to update our
301                         // requests, so we can skip this one with no additional consideration.
302                         continue
303                 }
304                 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
305                 // in the calculation of the requests. However, if we cancelled requests and they haven't
306                 // been rejected or serviced yet with the fast extension enabled, we can end up with more
307                 // extra outstanding requests. We could subtract the number of outstanding cancels from the
308                 // next request cardinality, but peers might not like that.
309                 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
310                         // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
311                         //      next.Requests.GetCardinality(),
312                         //      p.cancelledRequests.GetCardinality(),
313                         //      current.Requests.GetCardinality(),
314                         //      p.nominalMaxRequests(),
315                         // )
316                         break
317                 }
318                 otherPending := p.t.pendingRequests.Get(next.Requests[0])
319                 if p.actualRequestState.Requests.Contains(next.Requests[0]) {
320                         otherPending--
321                 }
322                 if otherPending < lastPending {
323                         // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
324                         // doesn't, our shuffling condition could be wrong.
325                         panic(lastPending)
326                 }
327                 // If the request has already been requested by another peer, shuffle this and the rest of
328                 // the requests (since according to the increasing condition, the rest of the indices
329                 // already have an outstanding request with another peer).
330                 if !shuffled && otherPending > 0 {
331                         shuffleReqs := next.Requests[i:]
332                         rand.Shuffle(len(shuffleReqs), func(i, j int) {
333                                 shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
334                         })
335                         // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
336                         shuffled = true
337                         // Repeat this index
338                         i--
339                         continue
340                 }
341
342                 more = p.mustRequest(req)
343                 if !more {
344                         break
345                 }
346         }
347         p.updateRequestsTimer.Stop()
348         if more {
349                 p.needRequestUpdate = ""
350                 if !current.Requests.IsEmpty() {
351                         p.updateRequestsTimer.Reset(3 * time.Second)
352                 }
353         }
354         return more
355 }