]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Retain the desired request ordering
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "reflect"
8         "runtime/pprof"
9         "time"
10         "unsafe"
11
12         "github.com/anacrolix/log"
13         "github.com/anacrolix/multiless"
14
15         request_strategy "github.com/anacrolix/torrent/request-strategy"
16 )
17
18 func (cl *Client) tickleRequester() {
19         cl.updateRequests.Broadcast()
20 }
21
22 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
23         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
24         for _, t := range cl.torrents {
25                 if !t.haveInfo() {
26                         // This would be removed if metadata is handled here. We have to guard against not
27                         // knowing the piece size. If we have no info, we have no pieces too, so the end result
28                         // is the same.
29                         continue
30                 }
31                 rst := request_strategy.Torrent{
32                         InfoHash:       t.infoHash,
33                         ChunksPerPiece: t.chunksPerRegularPiece(),
34                 }
35                 if t.storage != nil {
36                         rst.Capacity = t.storage.Capacity
37                 }
38                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
39                 for i := range t.pieces {
40                         p := &t.pieces[i]
41                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
42                                 Request:           !t.ignorePieceForRequests(i),
43                                 Priority:          p.purePriority(),
44                                 Partial:           t.piecePartiallyDownloaded(i),
45                                 Availability:      p.availability,
46                                 Length:            int64(p.length()),
47                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
48                                 IterPendingChunks: &p.undirtiedChunksIter,
49                         })
50                 }
51                 t.iterPeers(func(p *Peer) {
52                         if p.closed.IsSet() {
53                                 return
54                         }
55                         if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
56                                 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
57                         }
58                         p.piecesReceivedSinceLastRequestUpdate = 0
59                         rst.Peers = append(rst.Peers, request_strategy.Peer{
60                                 Pieces:           *p.newPeerPieces(),
61                                 MaxRequests:      p.nominalMaxRequests(),
62                                 ExistingRequests: p.actualRequestState.Requests,
63                                 Choking:          p.peerChoking,
64                                 PieceAllowedFast: p.peerAllowedFast,
65                                 DownloadRate:     p.downloadRate(),
66                                 Age:              time.Since(p.completedHandshake),
67                                 Id: peerId{
68                                         Peer: p,
69                                         ptr:  uintptr(unsafe.Pointer(p)),
70                                 },
71                         })
72                 })
73                 ts = append(ts, rst)
74         }
75         return request_strategy.Input{
76                 Torrents:           ts,
77                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
78         }
79 }
80
81 func init() {
82         gob.Register(peerId{})
83 }
84
85 type peerId struct {
86         *Peer
87         ptr uintptr
88 }
89
90 func (p peerId) Uintptr() uintptr {
91         return p.ptr
92 }
93
94 func (p peerId) GobEncode() (b []byte, _ error) {
95         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
96                 Data: uintptr(unsafe.Pointer(&p.ptr)),
97                 Len:  int(unsafe.Sizeof(p.ptr)),
98                 Cap:  int(unsafe.Sizeof(p.ptr)),
99         }
100         return
101 }
102
103 func (p *peerId) GobDecode(b []byte) error {
104         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
105                 panic(len(b))
106         }
107         ptr := unsafe.Pointer(&b[0])
108         p.ptr = *(*uintptr)(ptr)
109         log.Printf("%p", ptr)
110         dst := reflect.SliceHeader{
111                 Data: uintptr(unsafe.Pointer(&p.Peer)),
112                 Len:  int(unsafe.Sizeof(p.Peer)),
113                 Cap:  int(unsafe.Sizeof(p.Peer)),
114         }
115         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
116         return nil
117 }
118
119 type RequestIndex = request_strategy.RequestIndex
120 type chunkIndexType = request_strategy.ChunkIndex
121
122 type peerRequests struct {
123         requestIndexes       []RequestIndex
124         peer                 *Peer
125         torrentStrategyInput request_strategy.Torrent
126 }
127
128 func (p *peerRequests) Len() int {
129         return len(p.requestIndexes)
130 }
131
132 func (p *peerRequests) Less(i, j int) bool {
133         leftRequest := p.requestIndexes[i]
134         rightRequest := p.requestIndexes[j]
135         t := p.peer.t
136         leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
137         rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
138         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
139         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
140         pending := func(index RequestIndex, current bool) int {
141                 ret := t.pendingRequests.Get(index)
142                 if current {
143                         ret--
144                 }
145                 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
146                 // resolved.
147                 if ret < 0 {
148                         panic(ret)
149                 }
150                 return ret
151         }
152         ml := multiless.New()
153         // Push requests that can't be served right now to the end. But we don't throw them away unless
154         // there's a better alternative. This is for when we're using the fast extension and get choked
155         // but our requests could still be good when we get unchoked.
156         if p.peer.peerChoking {
157                 ml = ml.Bool(
158                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
159                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
160                 )
161         }
162         ml = ml.Int(
163                 pending(leftRequest, leftCurrent),
164                 pending(rightRequest, rightCurrent))
165         ml = ml.Bool(!leftCurrent, !rightCurrent)
166         ml = ml.Int(
167                 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
168                 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
169         )
170         ml = ml.Int(
171                 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
172                 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
173         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
174         ml = ml.Uint32(leftRequest, rightRequest)
175         return ml.MustLess()
176 }
177
178 func (p *peerRequests) Swap(i, j int) {
179         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
180 }
181
182 func (p *peerRequests) Push(x interface{}) {
183         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
184 }
185
186 func (p *peerRequests) Pop() interface{} {
187         last := len(p.requestIndexes) - 1
188         x := p.requestIndexes[last]
189         p.requestIndexes = p.requestIndexes[:last]
190         return x
191 }
192
193 type desiredRequestState struct {
194         Requests   []RequestIndex
195         Interested bool
196 }
197
198 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
199         input := p.t.cl.getRequestStrategyInput()
200         requestHeap := peerRequests{
201                 peer: p,
202         }
203         for _, t := range input.Torrents {
204                 if t.InfoHash == p.t.infoHash {
205                         requestHeap.torrentStrategyInput = t
206                         break
207                 }
208         }
209         request_strategy.GetRequestablePieces(
210                 input,
211                 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
212                         if t.InfoHash != p.t.infoHash {
213                                 return
214                         }
215                         if !p.peerHasPiece(pieceIndex) {
216                                 return
217                         }
218                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
219                         rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
220                                 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
221                                 //if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
222                                 //      return
223                                 //}
224                                 if !allowedFast {
225                                         // We must signal interest to request this
226                                         desired.Interested = true
227                                         // We can make or will allow sustaining a request here if we're not choked, or
228                                         // have made the request previously (presumably while unchoked), and haven't had
229                                         // the peer respond yet (and the request was retained because we are using the
230                                         // fast extension).
231                                         if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
232                                                 // We can't request this right now.
233                                                 return
234                                         }
235                                 }
236                                 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
237                         })
238                 },
239         )
240         p.t.assertPendingRequests()
241         heap.Init(&requestHeap)
242         for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
243                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
244                 desired.Requests = append(desired.Requests, requestIndex)
245         }
246         return
247 }
248
249 func (p *Peer) maybeUpdateActualRequestState() bool {
250         if p.needRequestUpdate == "" {
251                 return true
252         }
253         var more bool
254         pprof.Do(
255                 context.Background(),
256                 pprof.Labels("update request", p.needRequestUpdate),
257                 func(_ context.Context) {
258                         next := p.getDesiredRequestState()
259                         more = p.applyRequestState(next)
260                 },
261         )
262         return more
263 }
264
265 // Transmit/action the request state to the peer.
266 func (p *Peer) applyRequestState(next desiredRequestState) bool {
267         current := &p.actualRequestState
268         if !p.setInterested(next.Interested) {
269                 return false
270         }
271         more := true
272         cancel := current.Requests.Clone()
273         for _, ri := range next.Requests {
274                 cancel.Remove(ri)
275         }
276         cancel.Iterate(func(req uint32) bool {
277                 more = p.cancel(req)
278                 return more
279         })
280         if !more {
281                 return false
282         }
283         for _, req := range next.Requests {
284                 if p.cancelledRequests.Contains(req) {
285                         // Waiting for a reject or piece message, which will suitably trigger us to update our
286                         // requests, so we can skip this one with no additional consideration.
287                         continue
288                 }
289                 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
290                 // in the calculation of the requests. However, if we cancelled requests and they haven't
291                 // been rejected or serviced yet with the fast extension enabled, we can end up with more
292                 // extra outstanding requests. We could subtract the number of outstanding cancels from the
293                 // next request cardinality, but peers might not like that.
294                 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
295                         //log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
296                         //      next.Requests.GetCardinality(),
297                         //      p.cancelledRequests.GetCardinality(),
298                         //      current.Requests.GetCardinality(),
299                         //      p.nominalMaxRequests(),
300                         //)
301                         break
302                 }
303                 more = p.mustRequest(req)
304                 if !more {
305                         break
306                 }
307         }
308         p.updateRequestsTimer.Stop()
309         if more {
310                 p.needRequestUpdate = ""
311                 if !current.Requests.IsEmpty() {
312                         p.updateRequestsTimer.Reset(3 * time.Second)
313                 }
314         }
315         return more
316 }