]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Add more comprehensive pending requests assertions
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "math/rand"
8         "reflect"
9         "runtime/pprof"
10         "time"
11         "unsafe"
12
13         "github.com/RoaringBitmap/roaring"
14         "github.com/anacrolix/log"
15         "github.com/anacrolix/multiless"
16
17         request_strategy "github.com/anacrolix/torrent/request-strategy"
18 )
19
20 func (cl *Client) tickleRequester() {
21         cl.updateRequests.Broadcast()
22 }
23
24 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
25         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
26         for _, t := range cl.torrents {
27                 if !t.haveInfo() {
28                         // This would be removed if metadata is handled here. We have to guard against not
29                         // knowing the piece size. If we have no info, we have no pieces too, so the end result
30                         // is the same.
31                         continue
32                 }
33                 rst := request_strategy.Torrent{
34                         InfoHash:       t.infoHash,
35                         ChunksPerPiece: t.chunksPerRegularPiece(),
36                 }
37                 if t.storage != nil {
38                         rst.Capacity = t.storage.Capacity
39                 }
40                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
41                 for i := range t.pieces {
42                         p := &t.pieces[i]
43                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
44                                 Request:           !t.ignorePieceForRequests(i),
45                                 Priority:          p.purePriority(),
46                                 Partial:           t.piecePartiallyDownloaded(i),
47                                 Availability:      p.availability,
48                                 Length:            int64(p.length()),
49                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
50                                 IterPendingChunks: &p.undirtiedChunksIter,
51                         })
52                 }
53                 t.iterPeers(func(p *Peer) {
54                         if p.closed.IsSet() {
55                                 return
56                         }
57                         if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
58                                 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
59                         }
60                         p.piecesReceivedSinceLastRequestUpdate = 0
61                         rst.Peers = append(rst.Peers, request_strategy.Peer{
62                                 Pieces:           *p.newPeerPieces(),
63                                 MaxRequests:      p.nominalMaxRequests(),
64                                 ExistingRequests: p.actualRequestState.Requests,
65                                 Choking:          p.peerChoking,
66                                 PieceAllowedFast: p.peerAllowedFast,
67                                 DownloadRate:     p.downloadRate(),
68                                 Age:              time.Since(p.completedHandshake),
69                                 Id: peerId{
70                                         Peer: p,
71                                         ptr:  uintptr(unsafe.Pointer(p)),
72                                 },
73                         })
74                 })
75                 ts = append(ts, rst)
76         }
77         return request_strategy.Input{
78                 Torrents:           ts,
79                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
80         }
81 }
82
83 func init() {
84         gob.Register(peerId{})
85 }
86
87 type peerId struct {
88         *Peer
89         ptr uintptr
90 }
91
92 func (p peerId) Uintptr() uintptr {
93         return p.ptr
94 }
95
96 func (p peerId) GobEncode() (b []byte, _ error) {
97         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
98                 Data: uintptr(unsafe.Pointer(&p.ptr)),
99                 Len:  int(unsafe.Sizeof(p.ptr)),
100                 Cap:  int(unsafe.Sizeof(p.ptr)),
101         }
102         return
103 }
104
105 func (p *peerId) GobDecode(b []byte) error {
106         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
107                 panic(len(b))
108         }
109         ptr := unsafe.Pointer(&b[0])
110         p.ptr = *(*uintptr)(ptr)
111         log.Printf("%p", ptr)
112         dst := reflect.SliceHeader{
113                 Data: uintptr(unsafe.Pointer(&p.Peer)),
114                 Len:  int(unsafe.Sizeof(p.Peer)),
115                 Cap:  int(unsafe.Sizeof(p.Peer)),
116         }
117         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
118         return nil
119 }
120
121 type RequestIndex = request_strategy.RequestIndex
122 type chunkIndexType = request_strategy.ChunkIndex
123
124 type peerRequests struct {
125         requestIndexes       []RequestIndex
126         peer                 *Peer
127         torrentStrategyInput request_strategy.Torrent
128 }
129
130 func (p *peerRequests) Len() int {
131         return len(p.requestIndexes)
132 }
133
134 func (p *peerRequests) Less(i, j int) bool {
135         leftRequest := p.requestIndexes[i]
136         rightRequest := p.requestIndexes[j]
137         t := p.peer.t
138         leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
139         rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
140         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
141         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
142         pending := func(index RequestIndex, current bool) int {
143                 ret := t.pendingRequests.Get(index)
144                 if current {
145                         ret--
146                 }
147                 // I have a hunch that this could trigger for requests for chunks that are choked and not
148                 // allowed fast, since the current conn shouldn't already be included. It's a very specific
149                 // circumstance, and if it triggers I will fix it.
150                 if ret < 0 {
151                         panic(ret)
152                 }
153                 return ret
154         }
155         ml := multiless.New()
156         // Push requests that can't be served right now to the end. But we don't throw them away unless
157         // there's a better alternative. This is for when we're using the fast extension and get choked
158         // but our requests could still be good when we get unchoked.
159         if p.peer.peerChoking {
160                 ml = ml.Bool(
161                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
162                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
163                 )
164         }
165         ml = ml.Int(
166                 pending(leftRequest, leftCurrent),
167                 pending(rightRequest, rightCurrent))
168         ml = ml.Bool(!leftCurrent, !rightCurrent)
169         ml = ml.Int(
170                 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
171                 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
172         )
173         ml = ml.Int(
174                 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
175                 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
176         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
177         ml = ml.Uint32(leftRequest, rightRequest)
178         return ml.MustLess()
179 }
180
181 func (p *peerRequests) Swap(i, j int) {
182         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
183 }
184
185 func (p *peerRequests) Push(x interface{}) {
186         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
187 }
188
189 func (p *peerRequests) Pop() interface{} {
190         last := len(p.requestIndexes) - 1
191         x := p.requestIndexes[last]
192         p.requestIndexes = p.requestIndexes[:last]
193         return x
194 }
195
196 func (p *Peer) getDesiredRequestState() (desired requestState) {
197         input := p.t.cl.getRequestStrategyInput()
198         requestHeap := peerRequests{
199                 peer: p,
200         }
201         for _, t := range input.Torrents {
202                 if t.InfoHash == p.t.infoHash {
203                         requestHeap.torrentStrategyInput = t
204                         break
205                 }
206         }
207         request_strategy.GetRequestablePieces(
208                 input,
209                 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
210                         if t.InfoHash != p.t.infoHash {
211                                 return
212                         }
213                         if !p.peerHasPiece(pieceIndex) {
214                                 return
215                         }
216                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
217                         rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
218                                 if !allowedFast {
219                                         // We must signal interest to request this
220                                         desired.Interested = true
221                                         // We can make or will allow sustaining a request here if we're not choked, or
222                                         // have made the request previously (presumably while unchoked), and haven't had
223                                         // the peer respond yet (and the request was retained because we are using the
224                                         // fast extension).
225                                         if p.peerChoking && !p.actualRequestState.Requests.Contains(ci) {
226                                                 // We can't request this right now.
227                                                 return
228                                         }
229                                 }
230                                 requestHeap.requestIndexes = append(
231                                         requestHeap.requestIndexes,
232                                         p.t.pieceRequestIndexOffset(pieceIndex)+ci)
233                         })
234                 },
235         )
236         p.t.assertPendingRequests()
237         heap.Init(&requestHeap)
238         for requestHeap.Len() != 0 && desired.Requests.GetCardinality() < uint64(p.nominalMaxRequests()) {
239                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
240                 desired.Requests.Add(requestIndex)
241         }
242         return
243 }
244
245 func (p *Peer) maybeUpdateActualRequestState() bool {
246         if p.needRequestUpdate == "" {
247                 return true
248         }
249         var more bool
250         pprof.Do(
251                 context.Background(),
252                 pprof.Labels("update request", p.needRequestUpdate),
253                 func(_ context.Context) {
254                         next := p.getDesiredRequestState()
255                         more = p.applyRequestState(next)
256                 },
257         )
258         return more
259 }
260
261 // Transmit/action the request state to the peer.
262 func (p *Peer) applyRequestState(next requestState) bool {
263         current := &p.actualRequestState
264         if !p.setInterested(next.Interested) {
265                 return false
266         }
267         more := true
268         cancel := roaring.AndNot(&current.Requests, &next.Requests)
269         cancel.Iterate(func(req uint32) bool {
270                 more = p.cancel(req)
271                 return more
272         })
273         if !more {
274                 return false
275         }
276         // We randomize the order in which requests are issued, to reduce the overlap with requests to
277         // other peers. Note that although it really depends on what order the peer services the
278         // requests, if we are only able to issue some requests before buffering, or the peer starts
279         // handling our requests before they've all arrived, then this randomization should reduce
280         // overlap. Note however that if we received the desired requests in priority order, then
281         // randomizing would throw away that benefit.
282         for _, x := range rand.Perm(int(next.Requests.GetCardinality())) {
283                 req, err := next.Requests.Select(uint32(x))
284                 if err != nil {
285                         panic(err)
286                 }
287                 if p.cancelledRequests.Contains(req) {
288                         // Waiting for a reject or piece message, which will suitably trigger us to update our
289                         // requests, so we can skip this one with no additional consideration.
290                         continue
291                 }
292                 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
293                 // in the calculation of the requests. However if we cancelled requests and they haven't
294                 // been rejected or serviced yet with the fast extension enabled, we can end up with more
295                 // extra outstanding requests. We could subtract the number of outstanding cancels from the
296                 // next request cardinality, but peers might not like that.
297                 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
298                         //log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
299                         //      next.Requests.GetCardinality(),
300                         //      p.cancelledRequests.GetCardinality(),
301                         //      current.Requests.GetCardinality(),
302                         //      p.nominalMaxRequests(),
303                         //)
304                         break
305                 }
306                 more, err = p.request(req)
307                 if err != nil {
308                         panic(err)
309                 }
310                 if !more {
311                         break
312                 }
313         }
314         p.updateRequestsTimer.Stop()
315         if more {
316                 p.needRequestUpdate = ""
317                 if !current.Requests.IsEmpty() {
318                         p.updateRequestsTimer.Reset(3 * time.Second)
319                 }
320         }
321         return more
322 }