]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Use relative availabilities to determine piece request order
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "reflect"
8         "runtime/pprof"
9         "time"
10         "unsafe"
11
12         "github.com/anacrolix/log"
13         "github.com/anacrolix/multiless"
14
15         request_strategy "github.com/anacrolix/torrent/request-strategy"
16 )
17
18 func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
19         return request_strategy.PieceRequestOrderState{
20                 Priority:     t.piece(i).purePriority(),
21                 Partial:      t.piecePartiallyDownloaded(i),
22                 Availability: t.piece(i).availability(),
23         }
24 }
25
26 func init() {
27         gob.Register(peerId{})
28 }
29
30 type peerId struct {
31         *Peer
32         ptr uintptr
33 }
34
35 func (p peerId) Uintptr() uintptr {
36         return p.ptr
37 }
38
39 func (p peerId) GobEncode() (b []byte, _ error) {
40         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
41                 Data: uintptr(unsafe.Pointer(&p.ptr)),
42                 Len:  int(unsafe.Sizeof(p.ptr)),
43                 Cap:  int(unsafe.Sizeof(p.ptr)),
44         }
45         return
46 }
47
48 func (p *peerId) GobDecode(b []byte) error {
49         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
50                 panic(len(b))
51         }
52         ptr := unsafe.Pointer(&b[0])
53         p.ptr = *(*uintptr)(ptr)
54         log.Printf("%p", ptr)
55         dst := reflect.SliceHeader{
56                 Data: uintptr(unsafe.Pointer(&p.Peer)),
57                 Len:  int(unsafe.Sizeof(p.Peer)),
58                 Cap:  int(unsafe.Sizeof(p.Peer)),
59         }
60         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
61         return nil
62 }
63
64 type (
65         RequestIndex   = request_strategy.RequestIndex
66         chunkIndexType = request_strategy.ChunkIndex
67 )
68
69 type peerRequests struct {
70         requestIndexes []RequestIndex
71         peer           *Peer
72 }
73
74 func (p *peerRequests) Len() int {
75         return len(p.requestIndexes)
76 }
77
78 func (p *peerRequests) Less(i, j int) bool {
79         leftRequest := p.requestIndexes[i]
80         rightRequest := p.requestIndexes[j]
81         t := p.peer.t
82         leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
83         rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
84         ml := multiless.New()
85         // Push requests that can't be served right now to the end. But we don't throw them away unless
86         // there's a better alternative. This is for when we're using the fast extension and get choked
87         // but our requests could still be good when we get unchoked.
88         if p.peer.peerChoking {
89                 ml = ml.Bool(
90                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
91                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
92                 )
93         }
94         leftPeer := t.pendingRequests[leftRequest]
95         rightPeer := t.pendingRequests[rightRequest]
96         ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer)
97         ml = ml.Bool(rightPeer == nil, leftPeer == nil)
98         if ml.Ok() {
99                 return ml.MustLess()
100         }
101         if leftPeer != nil {
102                 // The right peer should also be set, or we'd have resolved the computation by now.
103                 ml = ml.Uint64(
104                         rightPeer.requestState.Requests.GetCardinality(),
105                         leftPeer.requestState.Requests.GetCardinality(),
106                 )
107                 // Could either of the lastRequested be Zero? That's what checking an existing peer is for.
108                 leftLast := t.lastRequested[leftRequest]
109                 rightLast := t.lastRequested[rightRequest]
110                 if leftLast.IsZero() || rightLast.IsZero() {
111                         panic("expected non-zero last requested times")
112                 }
113                 // We want the most-recently requested on the left. Clients like Transmission serve requests
114                 // in received order, so the most recently-requested is the one that has the longest until
115                 // it will be served and therefore is the best candidate to cancel.
116                 ml = ml.CmpInt64(rightLast.Sub(leftLast).Nanoseconds())
117         }
118         leftPiece := t.piece(int(leftPieceIndex))
119         rightPiece := t.piece(int(rightPieceIndex))
120         ml = ml.Int(
121                 // Technically we would be happy with the cached priority here, except we don't actually
122                 // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve
123                 // the priority through Piece.purePriority, which is probably slower.
124                 -int(leftPiece.purePriority()),
125                 -int(rightPiece.purePriority()),
126         )
127         ml = ml.Int(
128                 int(leftPiece.relativeAvailability),
129                 int(rightPiece.relativeAvailability))
130         return ml.Less()
131 }
132
133 func (p *peerRequests) Swap(i, j int) {
134         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
135 }
136
137 func (p *peerRequests) Push(x interface{}) {
138         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
139 }
140
141 func (p *peerRequests) Pop() interface{} {
142         last := len(p.requestIndexes) - 1
143         x := p.requestIndexes[last]
144         p.requestIndexes = p.requestIndexes[:last]
145         return x
146 }
147
148 type desiredRequestState struct {
149         Requests   peerRequests
150         Interested bool
151 }
152
153 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
154         if !p.t.haveInfo() {
155                 return
156         }
157         input := p.t.getRequestStrategyInput()
158         requestHeap := peerRequests{
159                 peer: p,
160         }
161         request_strategy.GetRequestablePieces(
162                 input,
163                 p.t.getPieceRequestOrder(),
164                 func(ih InfoHash, pieceIndex int) {
165                         if ih != p.t.infoHash {
166                                 return
167                         }
168                         if !p.peerHasPiece(pieceIndex) {
169                                 return
170                         }
171                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
172                         p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) {
173                                 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
174                                 if !allowedFast {
175                                         // We must signal interest to request this. TODO: We could set interested if the
176                                         // peers pieces (minus the allowed fast set) overlap with our missing pieces if
177                                         // there are any readers, or any pending pieces.
178                                         desired.Interested = true
179                                         // We can make or will allow sustaining a request here if we're not choked, or
180                                         // have made the request previously (presumably while unchoked), and haven't had
181                                         // the peer respond yet (and the request was retained because we are using the
182                                         // fast extension).
183                                         if p.peerChoking && !p.requestState.Requests.Contains(r) {
184                                                 // We can't request this right now.
185                                                 return
186                                         }
187                                 }
188                                 if p.requestState.Cancelled.Contains(r) {
189                                         // Can't re-request while awaiting acknowledgement.
190                                         return
191                                 }
192                                 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
193                         })
194                 },
195         )
196         p.t.assertPendingRequests()
197         desired.Requests = requestHeap
198         return
199 }
200
201 func (p *Peer) maybeUpdateActualRequestState() bool {
202         if p.needRequestUpdate == "" {
203                 return true
204         }
205         var more bool
206         pprof.Do(
207                 context.Background(),
208                 pprof.Labels("update request", p.needRequestUpdate),
209                 func(_ context.Context) {
210                         next := p.getDesiredRequestState()
211                         more = p.applyRequestState(next)
212                 },
213         )
214         return more
215 }
216
217 // Transmit/action the request state to the peer.
218 func (p *Peer) applyRequestState(next desiredRequestState) bool {
219         current := &p.requestState
220         if !p.setInterested(next.Interested) {
221                 return false
222         }
223         more := true
224         requestHeap := &next.Requests
225         t := p.t
226         heap.Init(requestHeap)
227         for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() {
228                 req := heap.Pop(requestHeap).(RequestIndex)
229                 existing := t.requestingPeer(req)
230                 if existing != nil && existing != p {
231                         // Don't steal from the poor.
232                         diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1)
233                         // Steal a request that leaves us with one more request than the existing peer
234                         // connection if the stealer more recently received a chunk.
235                         if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) {
236                                 continue
237                         }
238                         t.cancelRequest(req)
239                 }
240                 more = p.mustRequest(req)
241                 if !more {
242                         break
243                 }
244         }
245         // TODO: This may need to change, we might want to update even if there were no requests due to
246         // filtering them for being recently requested already.
247         p.updateRequestsTimer.Stop()
248         if more {
249                 p.needRequestUpdate = ""
250                 if current.Interested {
251                         p.updateRequestsTimer.Reset(3 * time.Second)
252                 }
253         }
254         return more
255 }