]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Shuffle duplicate requests
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "math/rand"
8         "reflect"
9         "runtime/pprof"
10         "time"
11         "unsafe"
12
13         "github.com/anacrolix/log"
14         "github.com/anacrolix/multiless"
15
16         request_strategy "github.com/anacrolix/torrent/request-strategy"
17 )
18
19 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
20         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
21         for _, t := range cl.torrents {
22                 if !t.haveInfo() {
23                         // This would be removed if metadata is handled here. We have to guard against not
24                         // knowing the piece size. If we have no info, we have no pieces too, so the end result
25                         // is the same.
26                         continue
27                 }
28                 rst := request_strategy.Torrent{
29                         InfoHash:       t.infoHash,
30                         ChunksPerPiece: t.chunksPerRegularPiece(),
31                 }
32                 if t.storage != nil {
33                         rst.Capacity = t.storage.Capacity
34                 }
35                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
36                 for i := range t.pieces {
37                         p := &t.pieces[i]
38                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
39                                 Request:           !t.ignorePieceForRequests(i),
40                                 Priority:          p.purePriority(),
41                                 Partial:           t.piecePartiallyDownloaded(i),
42                                 Availability:      p.availability,
43                                 Length:            int64(p.length()),
44                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
45                                 IterPendingChunks: &p.undirtiedChunksIter,
46                         })
47                 }
48                 ts = append(ts, rst)
49         }
50         return request_strategy.Input{
51                 Torrents:           ts,
52                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
53         }
54 }
55
56 func init() {
57         gob.Register(peerId{})
58 }
59
60 type peerId struct {
61         *Peer
62         ptr uintptr
63 }
64
65 func (p peerId) Uintptr() uintptr {
66         return p.ptr
67 }
68
69 func (p peerId) GobEncode() (b []byte, _ error) {
70         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
71                 Data: uintptr(unsafe.Pointer(&p.ptr)),
72                 Len:  int(unsafe.Sizeof(p.ptr)),
73                 Cap:  int(unsafe.Sizeof(p.ptr)),
74         }
75         return
76 }
77
78 func (p *peerId) GobDecode(b []byte) error {
79         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
80                 panic(len(b))
81         }
82         ptr := unsafe.Pointer(&b[0])
83         p.ptr = *(*uintptr)(ptr)
84         log.Printf("%p", ptr)
85         dst := reflect.SliceHeader{
86                 Data: uintptr(unsafe.Pointer(&p.Peer)),
87                 Len:  int(unsafe.Sizeof(p.Peer)),
88                 Cap:  int(unsafe.Sizeof(p.Peer)),
89         }
90         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
91         return nil
92 }
93
94 type (
95         RequestIndex   = request_strategy.RequestIndex
96         chunkIndexType = request_strategy.ChunkIndex
97 )
98
99 type peerRequests struct {
100         requestIndexes       []RequestIndex
101         peer                 *Peer
102         torrentStrategyInput *request_strategy.Torrent
103 }
104
105 func (p *peerRequests) Len() int {
106         return len(p.requestIndexes)
107 }
108
109 func (p *peerRequests) Less(i, j int) bool {
110         leftRequest := p.requestIndexes[i]
111         rightRequest := p.requestIndexes[j]
112         t := p.peer.t
113         leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
114         rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
115         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
116         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
117         pending := func(index RequestIndex, current bool) int {
118                 ret := t.pendingRequests.Get(index)
119                 if current {
120                         ret--
121                 }
122                 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
123                 // resolved.
124                 if ret < 0 {
125                         panic(ret)
126                 }
127                 return ret
128         }
129         ml := multiless.New()
130         // Push requests that can't be served right now to the end. But we don't throw them away unless
131         // there's a better alternative. This is for when we're using the fast extension and get choked
132         // but our requests could still be good when we get unchoked.
133         if p.peer.peerChoking {
134                 ml = ml.Bool(
135                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
136                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
137                 )
138         }
139         ml = ml.Int(
140                 pending(leftRequest, leftCurrent),
141                 pending(rightRequest, rightCurrent))
142         ml = ml.Bool(!leftCurrent, !rightCurrent)
143         ml = ml.Int(
144                 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
145                 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
146         )
147         ml = ml.Int(
148                 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
149                 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
150         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
151         ml = ml.Uint32(leftRequest, rightRequest)
152         return ml.MustLess()
153 }
154
155 func (p *peerRequests) Swap(i, j int) {
156         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
157 }
158
159 func (p *peerRequests) Push(x interface{}) {
160         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
161 }
162
163 func (p *peerRequests) Pop() interface{} {
164         last := len(p.requestIndexes) - 1
165         x := p.requestIndexes[last]
166         p.requestIndexes = p.requestIndexes[:last]
167         return x
168 }
169
170 type desiredRequestState struct {
171         Requests   []RequestIndex
172         Interested bool
173 }
174
175 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
176         input := p.t.cl.getRequestStrategyInput()
177         requestHeap := peerRequests{
178                 peer: p,
179         }
180         for i := range input.Torrents {
181                 t := &input.Torrents[i]
182                 if t.InfoHash == p.t.infoHash {
183                         requestHeap.torrentStrategyInput = t
184                         break
185                 }
186         }
187         request_strategy.GetRequestablePieces(
188                 input,
189                 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
190                         if t.InfoHash != p.t.infoHash {
191                                 return
192                         }
193                         if !p.peerHasPiece(pieceIndex) {
194                                 return
195                         }
196                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
197                         rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
198                                 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
199                                 // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
200                                 //      return
201                                 // }
202                                 if !allowedFast {
203                                         // We must signal interest to request this
204                                         desired.Interested = true
205                                         // We can make or will allow sustaining a request here if we're not choked, or
206                                         // have made the request previously (presumably while unchoked), and haven't had
207                                         // the peer respond yet (and the request was retained because we are using the
208                                         // fast extension).
209                                         if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
210                                                 // We can't request this right now.
211                                                 return
212                                         }
213                                 }
214                                 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
215                         })
216                 },
217         )
218         p.t.assertPendingRequests()
219         heap.Init(&requestHeap)
220         for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
221                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
222                 desired.Requests = append(desired.Requests, requestIndex)
223         }
224         return
225 }
226
227 func (p *Peer) maybeUpdateActualRequestState() bool {
228         if p.needRequestUpdate == "" {
229                 return true
230         }
231         var more bool
232         pprof.Do(
233                 context.Background(),
234                 pprof.Labels("update request", p.needRequestUpdate),
235                 func(_ context.Context) {
236                         next := p.getDesiredRequestState()
237                         more = p.applyRequestState(next)
238                 },
239         )
240         return more
241 }
242
243 // Transmit/action the request state to the peer.
244 func (p *Peer) applyRequestState(next desiredRequestState) bool {
245         current := &p.actualRequestState
246         if !p.setInterested(next.Interested) {
247                 return false
248         }
249         more := true
250         cancel := current.Requests.Clone()
251         for _, ri := range next.Requests {
252                 cancel.Remove(ri)
253         }
254         cancel.Iterate(func(req uint32) bool {
255                 more = p.cancel(req)
256                 return more
257         })
258         if !more {
259                 return false
260         }
261         shuffled := false
262         lastPending := 0
263         for i := 0; i < len(next.Requests); i++ {
264                 req := next.Requests[i]
265                 if p.cancelledRequests.Contains(req) {
266                         // Waiting for a reject or piece message, which will suitably trigger us to update our
267                         // requests, so we can skip this one with no additional consideration.
268                         continue
269                 }
270                 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
271                 // in the calculation of the requests. However, if we cancelled requests and they haven't
272                 // been rejected or serviced yet with the fast extension enabled, we can end up with more
273                 // extra outstanding requests. We could subtract the number of outstanding cancels from the
274                 // next request cardinality, but peers might not like that.
275                 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
276                         // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
277                         //      next.Requests.GetCardinality(),
278                         //      p.cancelledRequests.GetCardinality(),
279                         //      current.Requests.GetCardinality(),
280                         //      p.nominalMaxRequests(),
281                         // )
282                         break
283                 }
284                 otherPending := p.t.pendingRequests.Get(next.Requests[0])
285                 if p.actualRequestState.Requests.Contains(next.Requests[0]) {
286                         otherPending--
287                 }
288                 if otherPending < lastPending {
289                         // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
290                         // doesn't, our shuffling condition could be wrong.
291                         panic(lastPending)
292                 }
293                 // If the request has already been requested by another peer, shuffle this and the rest of
294                 // the requests (since according to the increasing condition, the rest of the indices
295                 // already have an outstanding request with another peer).
296                 if !shuffled && otherPending > 0 {
297                         shuffleReqs := next.Requests[i:]
298                         rand.Shuffle(len(shuffleReqs), func(i, j int) {
299                                 shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
300                         })
301                         // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
302                         shuffled = true
303                         // Repeat this index
304                         i--
305                         continue
306                 }
307
308                 more = p.mustRequest(req)
309                 if !more {
310                         break
311                 }
312         }
313         p.updateRequestsTimer.Stop()
314         if more {
315                 p.needRequestUpdate = ""
316                 if !current.Requests.IsEmpty() {
317                         p.updateRequestsTimer.Reset(3 * time.Second)
318                 }
319         }
320         return more
321 }