]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Provide mapping from infohash to Torrent in Input
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "math/rand"
8         "reflect"
9         "runtime/pprof"
10         "time"
11         "unsafe"
12
13         "github.com/anacrolix/log"
14         "github.com/anacrolix/multiless"
15         "github.com/anacrolix/torrent/metainfo"
16
17         request_strategy "github.com/anacrolix/torrent/request-strategy"
18 )
19
20 // Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
21 func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
22         input.MaxUnverifiedBytes = cl.config.MaxUnverifiedBytes
23         if !primaryTorrent.haveInfo() {
24                 return
25         }
26         if capFunc := primaryTorrent.storage.Capacity; capFunc != nil {
27                 if cap, ok := (*capFunc)(); ok {
28                         input.Capacity = &cap
29                 }
30         }
31         input.Torrents = make(map[metainfo.Hash]request_strategy.Torrent, len(cl.torrents))
32         for _, t := range cl.torrents {
33                 if !t.haveInfo() {
34                         // This would be removed if metadata is handled here. Determining chunks per piece
35                         // requires the info. If we have no info, we have no pieces too, so the end result is
36                         // the same.
37                         continue
38                 }
39                 if t.storage.Capacity != primaryTorrent.storage.Capacity {
40                         continue
41                 }
42                 input.Torrents[t.infoHash] = t.requestStrategyTorrentInput()
43         }
44         return
45 }
46
47 func (t *Torrent) getRequestStrategyInput() request_strategy.Input {
48         return t.cl.getRequestStrategyInput(t)
49 }
50
51 func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent {
52         rst := request_strategy.Torrent{
53                 InfoHash:       t.infoHash,
54                 ChunksPerPiece: t.chunksPerRegularPiece(),
55         }
56         rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
57         for i := range t.pieces {
58                 rst.Pieces = append(rst.Pieces, t.makeRequestStrategyPiece(i))
59         }
60         return rst
61 }
62
63 func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
64         return request_strategy.PieceRequestOrderState{
65                 Priority:     t.piece(i).purePriority(),
66                 Partial:      t.piecePartiallyDownloaded(i),
67                 Availability: t.piece(i).availability,
68         }
69 }
70
71 func (t *Torrent) makeRequestStrategyPiece(i int) request_strategy.Piece {
72         p := &t.pieces[i]
73         return request_strategy.Piece{
74                 Request:           !t.ignorePieceForRequests(i),
75                 Priority:          p.purePriority(),
76                 Partial:           t.piecePartiallyDownloaded(i),
77                 Availability:      p.availability,
78                 Length:            int64(p.length()),
79                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
80                 IterPendingChunks: &p.undirtiedChunksIter,
81         }
82 }
83
84 func init() {
85         gob.Register(peerId{})
86 }
87
88 type peerId struct {
89         *Peer
90         ptr uintptr
91 }
92
93 func (p peerId) Uintptr() uintptr {
94         return p.ptr
95 }
96
97 func (p peerId) GobEncode() (b []byte, _ error) {
98         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
99                 Data: uintptr(unsafe.Pointer(&p.ptr)),
100                 Len:  int(unsafe.Sizeof(p.ptr)),
101                 Cap:  int(unsafe.Sizeof(p.ptr)),
102         }
103         return
104 }
105
106 func (p *peerId) GobDecode(b []byte) error {
107         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
108                 panic(len(b))
109         }
110         ptr := unsafe.Pointer(&b[0])
111         p.ptr = *(*uintptr)(ptr)
112         log.Printf("%p", ptr)
113         dst := reflect.SliceHeader{
114                 Data: uintptr(unsafe.Pointer(&p.Peer)),
115                 Len:  int(unsafe.Sizeof(p.Peer)),
116                 Cap:  int(unsafe.Sizeof(p.Peer)),
117         }
118         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
119         return nil
120 }
121
122 type (
123         RequestIndex   = request_strategy.RequestIndex
124         chunkIndexType = request_strategy.ChunkIndex
125 )
126
127 type peerRequests struct {
128         requestIndexes       []RequestIndex
129         peer                 *Peer
130         torrentStrategyInput request_strategy.Torrent
131 }
132
133 func (p *peerRequests) Len() int {
134         return len(p.requestIndexes)
135 }
136
137 func (p *peerRequests) Less(i, j int) bool {
138         leftRequest := p.requestIndexes[i]
139         rightRequest := p.requestIndexes[j]
140         t := p.peer.t
141         leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
142         rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
143         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
144         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
145         pending := func(index RequestIndex, current bool) int {
146                 ret := t.pendingRequests.Get(index)
147                 if current {
148                         ret--
149                 }
150                 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
151                 // resolved.
152                 if ret < 0 {
153                         panic(ret)
154                 }
155                 return ret
156         }
157         ml := multiless.New()
158         // Push requests that can't be served right now to the end. But we don't throw them away unless
159         // there's a better alternative. This is for when we're using the fast extension and get choked
160         // but our requests could still be good when we get unchoked.
161         if p.peer.peerChoking {
162                 ml = ml.Bool(
163                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
164                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
165                 )
166         }
167         ml = ml.Int(
168                 pending(leftRequest, leftCurrent),
169                 pending(rightRequest, rightCurrent))
170         ml = ml.Bool(!leftCurrent, !rightCurrent)
171         ml = ml.Int(
172                 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
173                 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
174         )
175         ml = ml.Int(
176                 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
177                 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
178         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
179         ml = ml.Uint32(leftRequest, rightRequest)
180         return ml.MustLess()
181 }
182
183 func (p *peerRequests) Swap(i, j int) {
184         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
185 }
186
187 func (p *peerRequests) Push(x interface{}) {
188         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
189 }
190
191 func (p *peerRequests) Pop() interface{} {
192         last := len(p.requestIndexes) - 1
193         x := p.requestIndexes[last]
194         p.requestIndexes = p.requestIndexes[:last]
195         return x
196 }
197
198 type desiredRequestState struct {
199         Requests   []RequestIndex
200         Interested bool
201 }
202
203 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
204         input := p.t.getRequestStrategyInput()
205         requestHeap := peerRequests{
206                 peer: p,
207         }
208         requestHeap.torrentStrategyInput = input.Torrents[p.t.infoHash]
209         request_strategy.GetRequestablePieces(
210                 input,
211                 p.t.cl.pieceRequestOrder[p.t.storage.Capacity],
212                 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
213                         if t.InfoHash != p.t.infoHash {
214                                 return
215                         }
216                         if !p.peerHasPiece(pieceIndex) {
217                                 return
218                         }
219                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
220                         rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
221                                 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
222                                 // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
223                                 //      return
224                                 // }
225                                 if !allowedFast {
226                                         // We must signal interest to request this
227                                         desired.Interested = true
228                                         // We can make or will allow sustaining a request here if we're not choked, or
229                                         // have made the request previously (presumably while unchoked), and haven't had
230                                         // the peer respond yet (and the request was retained because we are using the
231                                         // fast extension).
232                                         if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
233                                                 // We can't request this right now.
234                                                 return
235                                         }
236                                 }
237                                 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
238                         })
239                 },
240         )
241         p.t.assertPendingRequests()
242         heap.Init(&requestHeap)
243         for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
244                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
245                 desired.Requests = append(desired.Requests, requestIndex)
246         }
247         return
248 }
249
250 func (p *Peer) maybeUpdateActualRequestState() bool {
251         if p.needRequestUpdate == "" {
252                 return true
253         }
254         var more bool
255         pprof.Do(
256                 context.Background(),
257                 pprof.Labels("update request", p.needRequestUpdate),
258                 func(_ context.Context) {
259                         next := p.getDesiredRequestState()
260                         more = p.applyRequestState(next)
261                 },
262         )
263         return more
264 }
265
266 // Transmit/action the request state to the peer.
267 func (p *Peer) applyRequestState(next desiredRequestState) bool {
268         current := &p.actualRequestState
269         if !p.setInterested(next.Interested) {
270                 return false
271         }
272         more := true
273         cancel := current.Requests.Clone()
274         for _, ri := range next.Requests {
275                 cancel.Remove(ri)
276         }
277         cancel.Iterate(func(req uint32) bool {
278                 more = p.cancel(req)
279                 return more
280         })
281         if !more {
282                 return false
283         }
284         shuffled := false
285         lastPending := 0
286         for i := 0; i < len(next.Requests); i++ {
287                 req := next.Requests[i]
288                 if p.cancelledRequests.Contains(req) {
289                         // Waiting for a reject or piece message, which will suitably trigger us to update our
290                         // requests, so we can skip this one with no additional consideration.
291                         continue
292                 }
293                 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
294                 // in the calculation of the requests. However, if we cancelled requests and they haven't
295                 // been rejected or serviced yet with the fast extension enabled, we can end up with more
296                 // extra outstanding requests. We could subtract the number of outstanding cancels from the
297                 // next request cardinality, but peers might not like that.
298                 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
299                         // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
300                         //      next.Requests.GetCardinality(),
301                         //      p.cancelledRequests.GetCardinality(),
302                         //      current.Requests.GetCardinality(),
303                         //      p.nominalMaxRequests(),
304                         // )
305                         break
306                 }
307                 otherPending := p.t.pendingRequests.Get(next.Requests[0])
308                 if p.actualRequestState.Requests.Contains(next.Requests[0]) {
309                         otherPending--
310                 }
311                 if otherPending < lastPending {
312                         // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
313                         // doesn't, our shuffling condition could be wrong.
314                         panic(lastPending)
315                 }
316                 // If the request has already been requested by another peer, shuffle this and the rest of
317                 // the requests (since according to the increasing condition, the rest of the indices
318                 // already have an outstanding request with another peer).
319                 if !shuffled && otherPending > 0 {
320                         shuffleReqs := next.Requests[i:]
321                         rand.Shuffle(len(shuffleReqs), func(i, j int) {
322                                 shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
323                         })
324                         // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
325                         shuffled = true
326                         // Repeat this index
327                         i--
328                         continue
329                 }
330
331                 more = p.mustRequest(req)
332                 if !more {
333                         break
334                 }
335         }
336         p.updateRequestsTimer.Stop()
337         if more {
338                 p.needRequestUpdate = ""
339                 if !current.Requests.IsEmpty() {
340                         p.updateRequestsTimer.Reset(3 * time.Second)
341                 }
342         }
343         return more
344 }