]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Partition piece request strategy by storage capacity key
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "math/rand"
8         "reflect"
9         "runtime/pprof"
10         "time"
11         "unsafe"
12
13         "github.com/anacrolix/log"
14         "github.com/anacrolix/multiless"
15
16         request_strategy "github.com/anacrolix/torrent/request-strategy"
17 )
18
19 // Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
20 func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
21         input.MaxUnverifiedBytes = cl.config.MaxUnverifiedBytes
22         if !primaryTorrent.haveInfo() {
23                 return
24         }
25         if capFunc := primaryTorrent.storage.Capacity; capFunc != nil {
26                 if cap, ok := (*capFunc)(); ok {
27                         input.Capacity = &cap
28                 }
29         }
30         if input.Capacity == nil {
31                 input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()}
32                 return
33         }
34         input.Torrents = make([]request_strategy.Torrent, 0, len(cl.torrents))
35         for _, t := range cl.torrents {
36                 if !t.haveInfo() {
37                         // This would be removed if metadata is handled here. Determining chunks per piece
38                         // requires the info. If we have no info, we have no pieces too, so the end result is
39                         // the same.
40                         continue
41                 }
42                 if t.storage.Capacity != primaryTorrent.storage.Capacity {
43                         continue
44                 }
45                 input.Torrents = append(input.Torrents, t.requestStrategyTorrentInput())
46         }
47         return
48 }
49
50 func (t *Torrent) getRequestStrategyInput() request_strategy.Input {
51         return t.cl.getRequestStrategyInput(t)
52 }
53
54 func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent {
55         rst := request_strategy.Torrent{
56                 InfoHash:       t.infoHash,
57                 ChunksPerPiece: t.chunksPerRegularPiece(),
58         }
59         rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
60         for i := range t.pieces {
61                 p := &t.pieces[i]
62                 rst.Pieces = append(rst.Pieces, request_strategy.Piece{
63                         Request:           !t.ignorePieceForRequests(i),
64                         Priority:          p.purePriority(),
65                         Partial:           t.piecePartiallyDownloaded(i),
66                         Availability:      p.availability,
67                         Length:            int64(p.length()),
68                         NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
69                         IterPendingChunks: &p.undirtiedChunksIter,
70                 })
71         }
72         return rst
73 }
74
75 func init() {
76         gob.Register(peerId{})
77 }
78
79 type peerId struct {
80         *Peer
81         ptr uintptr
82 }
83
84 func (p peerId) Uintptr() uintptr {
85         return p.ptr
86 }
87
88 func (p peerId) GobEncode() (b []byte, _ error) {
89         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
90                 Data: uintptr(unsafe.Pointer(&p.ptr)),
91                 Len:  int(unsafe.Sizeof(p.ptr)),
92                 Cap:  int(unsafe.Sizeof(p.ptr)),
93         }
94         return
95 }
96
97 func (p *peerId) GobDecode(b []byte) error {
98         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
99                 panic(len(b))
100         }
101         ptr := unsafe.Pointer(&b[0])
102         p.ptr = *(*uintptr)(ptr)
103         log.Printf("%p", ptr)
104         dst := reflect.SliceHeader{
105                 Data: uintptr(unsafe.Pointer(&p.Peer)),
106                 Len:  int(unsafe.Sizeof(p.Peer)),
107                 Cap:  int(unsafe.Sizeof(p.Peer)),
108         }
109         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
110         return nil
111 }
112
113 type (
114         RequestIndex   = request_strategy.RequestIndex
115         chunkIndexType = request_strategy.ChunkIndex
116 )
117
118 type peerRequests struct {
119         requestIndexes       []RequestIndex
120         peer                 *Peer
121         torrentStrategyInput *request_strategy.Torrent
122 }
123
124 func (p *peerRequests) Len() int {
125         return len(p.requestIndexes)
126 }
127
128 func (p *peerRequests) Less(i, j int) bool {
129         leftRequest := p.requestIndexes[i]
130         rightRequest := p.requestIndexes[j]
131         t := p.peer.t
132         leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
133         rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
134         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
135         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
136         pending := func(index RequestIndex, current bool) int {
137                 ret := t.pendingRequests.Get(index)
138                 if current {
139                         ret--
140                 }
141                 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
142                 // resolved.
143                 if ret < 0 {
144                         panic(ret)
145                 }
146                 return ret
147         }
148         ml := multiless.New()
149         // Push requests that can't be served right now to the end. But we don't throw them away unless
150         // there's a better alternative. This is for when we're using the fast extension and get choked
151         // but our requests could still be good when we get unchoked.
152         if p.peer.peerChoking {
153                 ml = ml.Bool(
154                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
155                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
156                 )
157         }
158         ml = ml.Int(
159                 pending(leftRequest, leftCurrent),
160                 pending(rightRequest, rightCurrent))
161         ml = ml.Bool(!leftCurrent, !rightCurrent)
162         ml = ml.Int(
163                 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
164                 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
165         )
166         ml = ml.Int(
167                 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
168                 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
169         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
170         ml = ml.Uint32(leftRequest, rightRequest)
171         return ml.MustLess()
172 }
173
174 func (p *peerRequests) Swap(i, j int) {
175         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
176 }
177
178 func (p *peerRequests) Push(x interface{}) {
179         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
180 }
181
182 func (p *peerRequests) Pop() interface{} {
183         last := len(p.requestIndexes) - 1
184         x := p.requestIndexes[last]
185         p.requestIndexes = p.requestIndexes[:last]
186         return x
187 }
188
189 type desiredRequestState struct {
190         Requests   []RequestIndex
191         Interested bool
192 }
193
194 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
195         input := p.t.getRequestStrategyInput()
196         requestHeap := peerRequests{
197                 peer: p,
198         }
199         for i := range input.Torrents {
200                 t := &input.Torrents[i]
201                 if t.InfoHash == p.t.infoHash {
202                         requestHeap.torrentStrategyInput = t
203                         break
204                 }
205         }
206         request_strategy.GetRequestablePieces(
207                 input,
208                 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
209                         if t.InfoHash != p.t.infoHash {
210                                 return
211                         }
212                         if !p.peerHasPiece(pieceIndex) {
213                                 return
214                         }
215                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
216                         rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
217                                 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
218                                 // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
219                                 //      return
220                                 // }
221                                 if !allowedFast {
222                                         // We must signal interest to request this
223                                         desired.Interested = true
224                                         // We can make or will allow sustaining a request here if we're not choked, or
225                                         // have made the request previously (presumably while unchoked), and haven't had
226                                         // the peer respond yet (and the request was retained because we are using the
227                                         // fast extension).
228                                         if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
229                                                 // We can't request this right now.
230                                                 return
231                                         }
232                                 }
233                                 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
234                         })
235                 },
236         )
237         p.t.assertPendingRequests()
238         heap.Init(&requestHeap)
239         for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
240                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
241                 desired.Requests = append(desired.Requests, requestIndex)
242         }
243         return
244 }
245
246 func (p *Peer) maybeUpdateActualRequestState() bool {
247         if p.needRequestUpdate == "" {
248                 return true
249         }
250         var more bool
251         pprof.Do(
252                 context.Background(),
253                 pprof.Labels("update request", p.needRequestUpdate),
254                 func(_ context.Context) {
255                         next := p.getDesiredRequestState()
256                         more = p.applyRequestState(next)
257                 },
258         )
259         return more
260 }
261
262 // Transmit/action the request state to the peer.
263 func (p *Peer) applyRequestState(next desiredRequestState) bool {
264         current := &p.actualRequestState
265         if !p.setInterested(next.Interested) {
266                 return false
267         }
268         more := true
269         cancel := current.Requests.Clone()
270         for _, ri := range next.Requests {
271                 cancel.Remove(ri)
272         }
273         cancel.Iterate(func(req uint32) bool {
274                 more = p.cancel(req)
275                 return more
276         })
277         if !more {
278                 return false
279         }
280         shuffled := false
281         lastPending := 0
282         for i := 0; i < len(next.Requests); i++ {
283                 req := next.Requests[i]
284                 if p.cancelledRequests.Contains(req) {
285                         // Waiting for a reject or piece message, which will suitably trigger us to update our
286                         // requests, so we can skip this one with no additional consideration.
287                         continue
288                 }
289                 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
290                 // in the calculation of the requests. However, if we cancelled requests and they haven't
291                 // been rejected or serviced yet with the fast extension enabled, we can end up with more
292                 // extra outstanding requests. We could subtract the number of outstanding cancels from the
293                 // next request cardinality, but peers might not like that.
294                 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
295                         // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
296                         //      next.Requests.GetCardinality(),
297                         //      p.cancelledRequests.GetCardinality(),
298                         //      current.Requests.GetCardinality(),
299                         //      p.nominalMaxRequests(),
300                         // )
301                         break
302                 }
303                 otherPending := p.t.pendingRequests.Get(next.Requests[0])
304                 if p.actualRequestState.Requests.Contains(next.Requests[0]) {
305                         otherPending--
306                 }
307                 if otherPending < lastPending {
308                         // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
309                         // doesn't, our shuffling condition could be wrong.
310                         panic(lastPending)
311                 }
312                 // If the request has already been requested by another peer, shuffle this and the rest of
313                 // the requests (since according to the increasing condition, the rest of the indices
314                 // already have an outstanding request with another peer).
315                 if !shuffled && otherPending > 0 {
316                         shuffleReqs := next.Requests[i:]
317                         rand.Shuffle(len(shuffleReqs), func(i, j int) {
318                                 shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
319                         })
320                         // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
321                         shuffled = true
322                         // Repeat this index
323                         i--
324                         continue
325                 }
326
327                 more = p.mustRequest(req)
328                 if !more {
329                         break
330                 }
331         }
332         p.updateRequestsTimer.Stop()
333         if more {
334                 p.needRequestUpdate = ""
335                 if !current.Requests.IsEmpty() {
336                         p.updateRequestsTimer.Reset(3 * time.Second)
337                 }
338         }
339         return more
340 }