]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Don't try to request anything without the torrent info
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "math/rand"
8         "reflect"
9         "runtime/pprof"
10         "time"
11         "unsafe"
12
13         "github.com/anacrolix/log"
14         "github.com/anacrolix/multiless"
15
16         request_strategy "github.com/anacrolix/torrent/request-strategy"
17 )
18
19 func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
20         return request_strategy.PieceRequestOrderState{
21                 Priority:     t.piece(i).purePriority(),
22                 Partial:      t.piecePartiallyDownloaded(i),
23                 Availability: t.piece(i).availability,
24         }
25 }
26
27 func init() {
28         gob.Register(peerId{})
29 }
30
31 type peerId struct {
32         *Peer
33         ptr uintptr
34 }
35
36 func (p peerId) Uintptr() uintptr {
37         return p.ptr
38 }
39
40 func (p peerId) GobEncode() (b []byte, _ error) {
41         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
42                 Data: uintptr(unsafe.Pointer(&p.ptr)),
43                 Len:  int(unsafe.Sizeof(p.ptr)),
44                 Cap:  int(unsafe.Sizeof(p.ptr)),
45         }
46         return
47 }
48
49 func (p *peerId) GobDecode(b []byte) error {
50         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
51                 panic(len(b))
52         }
53         ptr := unsafe.Pointer(&b[0])
54         p.ptr = *(*uintptr)(ptr)
55         log.Printf("%p", ptr)
56         dst := reflect.SliceHeader{
57                 Data: uintptr(unsafe.Pointer(&p.Peer)),
58                 Len:  int(unsafe.Sizeof(p.Peer)),
59                 Cap:  int(unsafe.Sizeof(p.Peer)),
60         }
61         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
62         return nil
63 }
64
65 type (
66         RequestIndex   = request_strategy.RequestIndex
67         chunkIndexType = request_strategy.ChunkIndex
68 )
69
70 type peerRequests struct {
71         requestIndexes []RequestIndex
72         peer           *Peer
73 }
74
75 func (p *peerRequests) Len() int {
76         return len(p.requestIndexes)
77 }
78
79 func (p *peerRequests) Less(i, j int) bool {
80         leftRequest := p.requestIndexes[i]
81         rightRequest := p.requestIndexes[j]
82         t := p.peer.t
83         leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
84         rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
85         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
86         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
87         pending := func(index RequestIndex, current bool) int {
88                 ret := t.pendingRequests.Get(index)
89                 if current {
90                         ret--
91                 }
92                 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
93                 // resolved.
94                 if ret < 0 {
95                         panic(ret)
96                 }
97                 return ret
98         }
99         ml := multiless.New()
100         // Push requests that can't be served right now to the end. But we don't throw them away unless
101         // there's a better alternative. This is for when we're using the fast extension and get choked
102         // but our requests could still be good when we get unchoked.
103         if p.peer.peerChoking {
104                 ml = ml.Bool(
105                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
106                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
107                 )
108         }
109         ml = ml.Int(
110                 pending(leftRequest, leftCurrent),
111                 pending(rightRequest, rightCurrent))
112         ml = ml.Bool(!leftCurrent, !rightCurrent)
113         leftPiece := t.piece(int(leftPieceIndex))
114         rightPiece := t.piece(int(rightPieceIndex))
115         ml = ml.Int(
116                 -int(leftPiece.purePriority()),
117                 -int(rightPiece.purePriority()),
118         )
119         ml = ml.Int(
120                 int(leftPiece.availability),
121                 int(rightPiece.availability))
122         leftLastRequested := p.peer.t.lastRequested[leftRequest]
123         rightLastRequested := p.peer.t.lastRequested[rightRequest]
124         ml = ml.EagerSameLess(
125                 leftLastRequested.Equal(rightLastRequested),
126                 leftLastRequested.Before(rightLastRequested),
127         )
128         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
129         ml = ml.Uint32(leftRequest, rightRequest)
130         return ml.MustLess()
131 }
132
133 func (p *peerRequests) Swap(i, j int) {
134         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
135 }
136
137 func (p *peerRequests) Push(x interface{}) {
138         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
139 }
140
141 func (p *peerRequests) Pop() interface{} {
142         last := len(p.requestIndexes) - 1
143         x := p.requestIndexes[last]
144         p.requestIndexes = p.requestIndexes[:last]
145         return x
146 }
147
148 type desiredRequestState struct {
149         Requests   []RequestIndex
150         Interested bool
151 }
152
153 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
154         if !p.t.haveInfo() {
155                 return
156         }
157         input := p.t.getRequestStrategyInput()
158         requestHeap := peerRequests{
159                 peer: p,
160         }
161         request_strategy.GetRequestablePieces(
162                 input,
163                 p.t.cl.pieceRequestOrder[p.t.storage.Capacity],
164                 func(ih InfoHash, pieceIndex int) {
165                         if ih != p.t.infoHash {
166                                 return
167                         }
168                         if !p.peerHasPiece(pieceIndex) {
169                                 return
170                         }
171                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
172                         p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) {
173                                 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
174                                 // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
175                                 //      return
176                                 // }
177                                 if !allowedFast {
178                                         // We must signal interest to request this
179                                         desired.Interested = true
180                                         // We can make or will allow sustaining a request here if we're not choked, or
181                                         // have made the request previously (presumably while unchoked), and haven't had
182                                         // the peer respond yet (and the request was retained because we are using the
183                                         // fast extension).
184                                         if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
185                                                 // We can't request this right now.
186                                                 return
187                                         }
188                                 }
189                                 // Note that we can still be interested if we filter all requests due to being
190                                 // recently requested from another peer.
191                                 if !p.actualRequestState.Requests.Contains(r) {
192                                         if time.Since(p.t.lastRequested[r]) < time.Second {
193                                                 return
194                                         }
195                                 }
196                                 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
197                         })
198                 },
199         )
200         p.t.assertPendingRequests()
201         heap.Init(&requestHeap)
202         for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
203                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
204                 desired.Requests = append(desired.Requests, requestIndex)
205         }
206         return
207 }
208
209 func (p *Peer) maybeUpdateActualRequestState() bool {
210         if p.needRequestUpdate == "" {
211                 return true
212         }
213         var more bool
214         pprof.Do(
215                 context.Background(),
216                 pprof.Labels("update request", p.needRequestUpdate),
217                 func(_ context.Context) {
218                         next := p.getDesiredRequestState()
219                         more = p.applyRequestState(next)
220                 },
221         )
222         return more
223 }
224
225 // Transmit/action the request state to the peer.
226 func (p *Peer) applyRequestState(next desiredRequestState) bool {
227         current := &p.actualRequestState
228         if !p.setInterested(next.Interested) {
229                 return false
230         }
231         more := true
232         cancel := current.Requests.Clone()
233         for _, ri := range next.Requests {
234                 cancel.Remove(ri)
235         }
236         cancel.Iterate(func(req uint32) bool {
237                 more = p.cancel(req)
238                 return more
239         })
240         if !more {
241                 return false
242         }
243         shuffled := false
244         lastPending := 0
245         for i := 0; i < len(next.Requests); i++ {
246                 req := next.Requests[i]
247                 if p.cancelledRequests.Contains(req) {
248                         // Waiting for a reject or piece message, which will suitably trigger us to update our
249                         // requests, so we can skip this one with no additional consideration.
250                         continue
251                 }
252                 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
253                 // in the calculation of the requests. However, if we cancelled requests and they haven't
254                 // been rejected or serviced yet with the fast extension enabled, we can end up with more
255                 // extra outstanding requests. We could subtract the number of outstanding cancels from the
256                 // next request cardinality, but peers might not like that.
257                 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
258                         // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
259                         //      next.Requests.GetCardinality(),
260                         //      p.cancelledRequests.GetCardinality(),
261                         //      current.Requests.GetCardinality(),
262                         //      p.nominalMaxRequests(),
263                         // )
264                         break
265                 }
266                 otherPending := p.t.pendingRequests.Get(next.Requests[0])
267                 if p.actualRequestState.Requests.Contains(next.Requests[0]) {
268                         otherPending--
269                 }
270                 if otherPending < lastPending {
271                         // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
272                         // doesn't, our shuffling condition could be wrong.
273                         panic(lastPending)
274                 }
275                 // If the request has already been requested by another peer, shuffle this and the rest of
276                 // the requests (since according to the increasing condition, the rest of the indices
277                 // already have an outstanding request with another peer).
278                 if !shuffled && otherPending > 0 {
279                         shuffleReqs := next.Requests[i:]
280                         rand.Shuffle(len(shuffleReqs), func(i, j int) {
281                                 shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
282                         })
283                         // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
284                         shuffled = true
285                         // Repeat this index
286                         i--
287                         continue
288                 }
289
290                 more = p.mustRequest(req)
291                 if !more {
292                         break
293                 }
294         }
295         // TODO: This may need to change, we might want to update even if there were no requests due to
296         // filtering them for being recently requested already.
297         p.updateRequestsTimer.Stop()
298         if more {
299                 p.needRequestUpdate = ""
300                 if current.Interested {
301                         p.updateRequestsTimer.Reset(3 * time.Second)
302                 }
303         }
304         return more
305 }