]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Use interfaces to lazily expose the bare minimum inputs to GetRequestablePieces
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "math/rand"
8         "reflect"
9         "runtime/pprof"
10         "time"
11         "unsafe"
12
13         "github.com/anacrolix/log"
14         "github.com/anacrolix/multiless"
15
16         request_strategy "github.com/anacrolix/torrent/request-strategy"
17 )
18
19 func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
20         return request_strategy.PieceRequestOrderState{
21                 Priority:     t.piece(i).purePriority(),
22                 Partial:      t.piecePartiallyDownloaded(i),
23                 Availability: t.piece(i).availability,
24         }
25 }
26
27 func init() {
28         gob.Register(peerId{})
29 }
30
31 type peerId struct {
32         *Peer
33         ptr uintptr
34 }
35
36 func (p peerId) Uintptr() uintptr {
37         return p.ptr
38 }
39
40 func (p peerId) GobEncode() (b []byte, _ error) {
41         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
42                 Data: uintptr(unsafe.Pointer(&p.ptr)),
43                 Len:  int(unsafe.Sizeof(p.ptr)),
44                 Cap:  int(unsafe.Sizeof(p.ptr)),
45         }
46         return
47 }
48
49 func (p *peerId) GobDecode(b []byte) error {
50         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
51                 panic(len(b))
52         }
53         ptr := unsafe.Pointer(&b[0])
54         p.ptr = *(*uintptr)(ptr)
55         log.Printf("%p", ptr)
56         dst := reflect.SliceHeader{
57                 Data: uintptr(unsafe.Pointer(&p.Peer)),
58                 Len:  int(unsafe.Sizeof(p.Peer)),
59                 Cap:  int(unsafe.Sizeof(p.Peer)),
60         }
61         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
62         return nil
63 }
64
65 type (
66         RequestIndex   = request_strategy.RequestIndex
67         chunkIndexType = request_strategy.ChunkIndex
68 )
69
70 type peerRequests struct {
71         requestIndexes []RequestIndex
72         peer           *Peer
73 }
74
75 func (p *peerRequests) Len() int {
76         return len(p.requestIndexes)
77 }
78
79 func (p *peerRequests) Less(i, j int) bool {
80         leftRequest := p.requestIndexes[i]
81         rightRequest := p.requestIndexes[j]
82         t := p.peer.t
83         leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
84         rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
85         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
86         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
87         pending := func(index RequestIndex, current bool) int {
88                 ret := t.pendingRequests.Get(index)
89                 if current {
90                         ret--
91                 }
92                 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
93                 // resolved.
94                 if ret < 0 {
95                         panic(ret)
96                 }
97                 return ret
98         }
99         ml := multiless.New()
100         // Push requests that can't be served right now to the end. But we don't throw them away unless
101         // there's a better alternative. This is for when we're using the fast extension and get choked
102         // but our requests could still be good when we get unchoked.
103         if p.peer.peerChoking {
104                 ml = ml.Bool(
105                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
106                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
107                 )
108         }
109         ml = ml.Int(
110                 pending(leftRequest, leftCurrent),
111                 pending(rightRequest, rightCurrent))
112         ml = ml.Bool(!leftCurrent, !rightCurrent)
113         leftPiece := t.piece(int(leftPieceIndex))
114         rightPiece := t.piece(int(rightPieceIndex))
115         ml = ml.Int(
116                 -int(leftPiece.purePriority()),
117                 -int(rightPiece.purePriority()),
118         )
119         ml = ml.Int(
120                 int(leftPiece.availability),
121                 int(rightPiece.availability))
122         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
123         ml = ml.Uint32(leftRequest, rightRequest)
124         return ml.MustLess()
125 }
126
127 func (p *peerRequests) Swap(i, j int) {
128         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
129 }
130
131 func (p *peerRequests) Push(x interface{}) {
132         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
133 }
134
135 func (p *peerRequests) Pop() interface{} {
136         last := len(p.requestIndexes) - 1
137         x := p.requestIndexes[last]
138         p.requestIndexes = p.requestIndexes[:last]
139         return x
140 }
141
142 type desiredRequestState struct {
143         Requests   []RequestIndex
144         Interested bool
145 }
146
147 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
148         input := p.t.getRequestStrategyInput()
149         requestHeap := peerRequests{
150                 peer: p,
151         }
152         request_strategy.GetRequestablePieces(
153                 input,
154                 p.t.cl.pieceRequestOrder[p.t.storage.Capacity],
155                 func(ih InfoHash, pieceIndex int) {
156                         if ih != p.t.infoHash {
157                                 return
158                         }
159                         if !p.peerHasPiece(pieceIndex) {
160                                 return
161                         }
162                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
163                         p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) {
164                                 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
165                                 // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
166                                 //      return
167                                 // }
168                                 if !allowedFast {
169                                         // We must signal interest to request this
170                                         desired.Interested = true
171                                         // We can make or will allow sustaining a request here if we're not choked, or
172                                         // have made the request previously (presumably while unchoked), and haven't had
173                                         // the peer respond yet (and the request was retained because we are using the
174                                         // fast extension).
175                                         if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
176                                                 // We can't request this right now.
177                                                 return
178                                         }
179                                 }
180                                 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
181                         })
182                 },
183         )
184         p.t.assertPendingRequests()
185         heap.Init(&requestHeap)
186         for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
187                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
188                 desired.Requests = append(desired.Requests, requestIndex)
189         }
190         return
191 }
192
193 func (p *Peer) maybeUpdateActualRequestState() bool {
194         if p.needRequestUpdate == "" {
195                 return true
196         }
197         var more bool
198         pprof.Do(
199                 context.Background(),
200                 pprof.Labels("update request", p.needRequestUpdate),
201                 func(_ context.Context) {
202                         next := p.getDesiredRequestState()
203                         more = p.applyRequestState(next)
204                 },
205         )
206         return more
207 }
208
209 // Transmit/action the request state to the peer.
210 func (p *Peer) applyRequestState(next desiredRequestState) bool {
211         current := &p.actualRequestState
212         if !p.setInterested(next.Interested) {
213                 return false
214         }
215         more := true
216         cancel := current.Requests.Clone()
217         for _, ri := range next.Requests {
218                 cancel.Remove(ri)
219         }
220         cancel.Iterate(func(req uint32) bool {
221                 more = p.cancel(req)
222                 return more
223         })
224         if !more {
225                 return false
226         }
227         shuffled := false
228         lastPending := 0
229         for i := 0; i < len(next.Requests); i++ {
230                 req := next.Requests[i]
231                 if p.cancelledRequests.Contains(req) {
232                         // Waiting for a reject or piece message, which will suitably trigger us to update our
233                         // requests, so we can skip this one with no additional consideration.
234                         continue
235                 }
236                 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
237                 // in the calculation of the requests. However, if we cancelled requests and they haven't
238                 // been rejected or serviced yet with the fast extension enabled, we can end up with more
239                 // extra outstanding requests. We could subtract the number of outstanding cancels from the
240                 // next request cardinality, but peers might not like that.
241                 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
242                         // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
243                         //      next.Requests.GetCardinality(),
244                         //      p.cancelledRequests.GetCardinality(),
245                         //      current.Requests.GetCardinality(),
246                         //      p.nominalMaxRequests(),
247                         // )
248                         break
249                 }
250                 otherPending := p.t.pendingRequests.Get(next.Requests[0])
251                 if p.actualRequestState.Requests.Contains(next.Requests[0]) {
252                         otherPending--
253                 }
254                 if otherPending < lastPending {
255                         // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
256                         // doesn't, our shuffling condition could be wrong.
257                         panic(lastPending)
258                 }
259                 // If the request has already been requested by another peer, shuffle this and the rest of
260                 // the requests (since according to the increasing condition, the rest of the indices
261                 // already have an outstanding request with another peer).
262                 if !shuffled && otherPending > 0 {
263                         shuffleReqs := next.Requests[i:]
264                         rand.Shuffle(len(shuffleReqs), func(i, j int) {
265                                 shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
266                         })
267                         // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
268                         shuffled = true
269                         // Repeat this index
270                         i--
271                         continue
272                 }
273
274                 more = p.mustRequest(req)
275                 if !more {
276                         break
277                 }
278         }
279         p.updateRequestsTimer.Stop()
280         if more {
281                 p.needRequestUpdate = ""
282                 if !current.Requests.IsEmpty() {
283                         p.updateRequestsTimer.Reset(3 * time.Second)
284                 }
285         }
286         return more
287 }