13 "github.com/anacrolix/log"
14 "github.com/anacrolix/multiless"
16 request_strategy "github.com/anacrolix/torrent/request-strategy"
19 func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
20 return request_strategy.PieceRequestOrderState{
21 Priority: t.piece(i).purePriority(),
22 Partial: t.piecePartiallyDownloaded(i),
23 Availability: t.piece(i).availability,
28 gob.Register(peerId{})
36 func (p peerId) Uintptr() uintptr {
40 func (p peerId) GobEncode() (b []byte, _ error) {
41 *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
42 Data: uintptr(unsafe.Pointer(&p.ptr)),
43 Len: int(unsafe.Sizeof(p.ptr)),
44 Cap: int(unsafe.Sizeof(p.ptr)),
49 func (p *peerId) GobDecode(b []byte) error {
50 if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
53 ptr := unsafe.Pointer(&b[0])
54 p.ptr = *(*uintptr)(ptr)
56 dst := reflect.SliceHeader{
57 Data: uintptr(unsafe.Pointer(&p.Peer)),
58 Len: int(unsafe.Sizeof(p.Peer)),
59 Cap: int(unsafe.Sizeof(p.Peer)),
61 copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
66 RequestIndex = request_strategy.RequestIndex
67 chunkIndexType = request_strategy.ChunkIndex
70 type peerRequests struct {
71 requestIndexes []RequestIndex
75 func (p *peerRequests) Len() int {
76 return len(p.requestIndexes)
79 func (p *peerRequests) Less(i, j int) bool {
80 leftRequest := p.requestIndexes[i]
81 rightRequest := p.requestIndexes[j]
83 leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
84 rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
85 leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
86 rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
87 pending := func(index RequestIndex, current bool) int {
88 ret := t.pendingRequests.Get(index)
92 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
100 // Push requests that can't be served right now to the end. But we don't throw them away unless
101 // there's a better alternative. This is for when we're using the fast extension and get choked
102 // but our requests could still be good when we get unchoked.
103 if p.peer.peerChoking {
105 !p.peer.peerAllowedFast.Contains(leftPieceIndex),
106 !p.peer.peerAllowedFast.Contains(rightPieceIndex),
110 pending(leftRequest, leftCurrent),
111 pending(rightRequest, rightCurrent))
112 ml = ml.Bool(!leftCurrent, !rightCurrent)
113 leftPiece := t.piece(int(leftPieceIndex))
114 rightPiece := t.piece(int(rightPieceIndex))
116 -int(leftPiece.purePriority()),
117 -int(rightPiece.purePriority()),
120 int(leftPiece.availability),
121 int(rightPiece.availability))
122 leftLastRequested := p.peer.t.lastRequested[leftRequest]
123 rightLastRequested := p.peer.t.lastRequested[rightRequest]
124 ml = ml.EagerSameLess(
125 leftLastRequested.Equal(rightLastRequested),
126 leftLastRequested.Before(rightLastRequested),
128 ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
129 ml = ml.Uint32(leftRequest, rightRequest)
133 func (p *peerRequests) Swap(i, j int) {
134 p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
137 func (p *peerRequests) Push(x interface{}) {
138 p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
141 func (p *peerRequests) Pop() interface{} {
142 last := len(p.requestIndexes) - 1
143 x := p.requestIndexes[last]
144 p.requestIndexes = p.requestIndexes[:last]
148 type desiredRequestState struct {
149 Requests []RequestIndex
153 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
154 input := p.t.getRequestStrategyInput()
155 requestHeap := peerRequests{
158 request_strategy.GetRequestablePieces(
160 p.t.cl.pieceRequestOrder[p.t.storage.Capacity],
161 func(ih InfoHash, pieceIndex int) {
162 if ih != p.t.infoHash {
165 if !p.peerHasPiece(pieceIndex) {
168 allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
169 p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) {
170 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
171 // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
175 // We must signal interest to request this
176 desired.Interested = true
177 // We can make or will allow sustaining a request here if we're not choked, or
178 // have made the request previously (presumably while unchoked), and haven't had
179 // the peer respond yet (and the request was retained because we are using the
181 if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
182 // We can't request this right now.
186 // Note that we can still be interested if we filter all requests due to being
187 // recently requested from another peer.
188 if !p.actualRequestState.Requests.Contains(r) {
189 if time.Since(p.t.lastRequested[r]) < time.Second {
193 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
197 p.t.assertPendingRequests()
198 heap.Init(&requestHeap)
199 for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
200 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
201 desired.Requests = append(desired.Requests, requestIndex)
206 func (p *Peer) maybeUpdateActualRequestState() bool {
207 if p.needRequestUpdate == "" {
212 context.Background(),
213 pprof.Labels("update request", p.needRequestUpdate),
214 func(_ context.Context) {
215 next := p.getDesiredRequestState()
216 more = p.applyRequestState(next)
222 // Transmit/action the request state to the peer.
223 func (p *Peer) applyRequestState(next desiredRequestState) bool {
224 current := &p.actualRequestState
225 if !p.setInterested(next.Interested) {
229 cancel := current.Requests.Clone()
230 for _, ri := range next.Requests {
233 cancel.Iterate(func(req uint32) bool {
242 for i := 0; i < len(next.Requests); i++ {
243 req := next.Requests[i]
244 if p.cancelledRequests.Contains(req) {
245 // Waiting for a reject or piece message, which will suitably trigger us to update our
246 // requests, so we can skip this one with no additional consideration.
249 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
250 // in the calculation of the requests. However, if we cancelled requests and they haven't
251 // been rejected or serviced yet with the fast extension enabled, we can end up with more
252 // extra outstanding requests. We could subtract the number of outstanding cancels from the
253 // next request cardinality, but peers might not like that.
254 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
255 // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
256 // next.Requests.GetCardinality(),
257 // p.cancelledRequests.GetCardinality(),
258 // current.Requests.GetCardinality(),
259 // p.nominalMaxRequests(),
263 otherPending := p.t.pendingRequests.Get(next.Requests[0])
264 if p.actualRequestState.Requests.Contains(next.Requests[0]) {
267 if otherPending < lastPending {
268 // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
269 // doesn't, our shuffling condition could be wrong.
272 // If the request has already been requested by another peer, shuffle this and the rest of
273 // the requests (since according to the increasing condition, the rest of the indices
274 // already have an outstanding request with another peer).
275 if !shuffled && otherPending > 0 {
276 shuffleReqs := next.Requests[i:]
277 rand.Shuffle(len(shuffleReqs), func(i, j int) {
278 shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
280 // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
287 more = p.mustRequest(req)
292 // TODO: This may need to change, we might want to update even if there were no requests due to
293 // filtering them for being recently requested already.
294 p.updateRequestsTimer.Stop()
296 p.needRequestUpdate = ""
297 if current.Interested {
298 p.updateRequestsTimer.Reset(3 * time.Second)