12 "github.com/anacrolix/log"
13 "github.com/anacrolix/multiless"
15 request_strategy "github.com/anacrolix/torrent/request-strategy"
18 func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
19 return request_strategy.PieceRequestOrderState{
20 Priority: t.piece(i).purePriority(),
21 Partial: t.piecePartiallyDownloaded(i),
22 Availability: t.piece(i).availability,
27 gob.Register(peerId{})
35 func (p peerId) Uintptr() uintptr {
39 func (p peerId) GobEncode() (b []byte, _ error) {
40 *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
41 Data: uintptr(unsafe.Pointer(&p.ptr)),
42 Len: int(unsafe.Sizeof(p.ptr)),
43 Cap: int(unsafe.Sizeof(p.ptr)),
48 func (p *peerId) GobDecode(b []byte) error {
49 if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
52 ptr := unsafe.Pointer(&b[0])
53 p.ptr = *(*uintptr)(ptr)
55 dst := reflect.SliceHeader{
56 Data: uintptr(unsafe.Pointer(&p.Peer)),
57 Len: int(unsafe.Sizeof(p.Peer)),
58 Cap: int(unsafe.Sizeof(p.Peer)),
60 copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
65 RequestIndex = request_strategy.RequestIndex
66 chunkIndexType = request_strategy.ChunkIndex
69 type peerRequests struct {
70 requestIndexes []RequestIndex
74 func (p *peerRequests) Len() int {
75 return len(p.requestIndexes)
78 func (p *peerRequests) Less(i, j int) bool {
79 leftRequest := p.requestIndexes[i]
80 rightRequest := p.requestIndexes[j]
82 leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
83 rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
85 // Push requests that can't be served right now to the end. But we don't throw them away unless
86 // there's a better alternative. This is for when we're using the fast extension and get choked
87 // but our requests could still be good when we get unchoked.
88 if p.peer.peerChoking {
90 !p.peer.peerAllowedFast.Contains(leftPieceIndex),
91 !p.peer.peerAllowedFast.Contains(rightPieceIndex),
94 leftPeer := t.pendingRequests[leftRequest]
95 rightPeer := t.pendingRequests[rightRequest]
96 ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer)
97 ml = ml.Bool(rightPeer == nil, leftPeer == nil)
102 // The right peer should also be set, or we'd have resolved the computation by now.
104 rightPeer.actualRequestState.Requests.GetCardinality(),
105 leftPeer.actualRequestState.Requests.GetCardinality(),
107 // Could either of the lastRequested be Zero? That's what checking an existing peer is for.
108 leftLast := t.lastRequested[leftRequest]
109 rightLast := t.lastRequested[rightRequest]
110 if leftLast.IsZero() || rightLast.IsZero() {
111 panic("expected non-zero last requested times")
113 // We want the most-recently requested on the left. Clients like Transmission serve requests
114 // in received order, so the most recently-requested is the one that has the longest until
115 // it will be served and therefore is the best candidate to cancel.
116 ml = ml.CmpInt64(rightLast.Sub(leftLast).Nanoseconds())
118 leftPiece := t.piece(int(leftPieceIndex))
119 rightPiece := t.piece(int(rightPieceIndex))
121 // Technically we would be happy with the cached priority here, except we don't actually
122 // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve
123 // the priority through Piece.purePriority, which is probably slower.
124 -int(leftPiece.purePriority()),
125 -int(rightPiece.purePriority()),
128 int(leftPiece.availability),
129 int(rightPiece.availability))
133 func (p *peerRequests) Swap(i, j int) {
134 p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
137 func (p *peerRequests) Push(x interface{}) {
138 p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
141 func (p *peerRequests) Pop() interface{} {
142 last := len(p.requestIndexes) - 1
143 x := p.requestIndexes[last]
144 p.requestIndexes = p.requestIndexes[:last]
148 type desiredRequestState struct {
149 Requests peerRequests
153 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
157 input := p.t.getRequestStrategyInput()
158 requestHeap := peerRequests{
161 request_strategy.GetRequestablePieces(
163 p.t.getPieceRequestOrder(),
164 func(ih InfoHash, pieceIndex int) {
165 if ih != p.t.infoHash {
168 if !p.peerHasPiece(pieceIndex) {
171 allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
172 p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) {
173 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
174 // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
178 // We must signal interest to request this. TODO: We could set interested if the
179 // peers pieces (minus the allowed fast set) overlap with our missing pieces if
180 // there are any readers, or any pending pieces.
181 desired.Interested = true
182 // We can make or will allow sustaining a request here if we're not choked, or
183 // have made the request previously (presumably while unchoked), and haven't had
184 // the peer respond yet (and the request was retained because we are using the
186 if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
187 // We can't request this right now.
191 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
195 p.t.assertPendingRequests()
196 desired.Requests = requestHeap
200 func (p *Peer) maybeUpdateActualRequestState() bool {
201 if p.needRequestUpdate == "" {
206 context.Background(),
207 pprof.Labels("update request", p.needRequestUpdate),
208 func(_ context.Context) {
209 next := p.getDesiredRequestState()
210 more = p.applyRequestState(next)
216 // Transmit/action the request state to the peer.
217 func (p *Peer) applyRequestState(next desiredRequestState) bool {
218 current := &p.actualRequestState
219 if !p.setInterested(next.Interested) {
223 requestHeap := &next.Requests
225 heap.Init(requestHeap)
226 for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() {
227 req := heap.Pop(requestHeap).(RequestIndex)
228 if p.cancelledRequests.Contains(req) {
229 // Waiting for a reject or piece message, which will suitably trigger us to update our
230 // requests, so we can skip this one with no additional consideration.
233 existing := t.requestingPeer(req)
234 if existing != nil && existing != p {
235 // Don't steal from the poor.
236 diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1)
237 // Steal a request that leaves us with one more request than the existing peer
238 // connection if the stealer more recently received a chunk.
239 if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) {
244 more = p.mustRequest(req)
249 // TODO: This may need to change, we might want to update even if there were no requests due to
250 // filtering them for being recently requested already.
251 p.updateRequestsTimer.Stop()
253 p.needRequestUpdate = ""
254 if current.Interested {
255 p.updateRequestsTimer.Reset(3 * time.Second)