13 "github.com/anacrolix/log"
14 "github.com/anacrolix/multiless"
16 request_strategy "github.com/anacrolix/torrent/request-strategy"
19 func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
20 return request_strategy.PieceRequestOrderState{
21 Priority: t.piece(i).purePriority(),
22 Partial: t.piecePartiallyDownloaded(i),
23 Availability: t.piece(i).availability,
28 gob.Register(peerId{})
36 func (p peerId) Uintptr() uintptr {
40 func (p peerId) GobEncode() (b []byte, _ error) {
41 *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
42 Data: uintptr(unsafe.Pointer(&p.ptr)),
43 Len: int(unsafe.Sizeof(p.ptr)),
44 Cap: int(unsafe.Sizeof(p.ptr)),
49 func (p *peerId) GobDecode(b []byte) error {
50 if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
53 ptr := unsafe.Pointer(&b[0])
54 p.ptr = *(*uintptr)(ptr)
56 dst := reflect.SliceHeader{
57 Data: uintptr(unsafe.Pointer(&p.Peer)),
58 Len: int(unsafe.Sizeof(p.Peer)),
59 Cap: int(unsafe.Sizeof(p.Peer)),
61 copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
66 RequestIndex = request_strategy.RequestIndex
67 chunkIndexType = request_strategy.ChunkIndex
70 type peerRequests struct {
71 requestIndexes []RequestIndex
75 func (p *peerRequests) Len() int {
76 return len(p.requestIndexes)
79 func (p *peerRequests) Less(i, j int) bool {
80 leftRequest := p.requestIndexes[i]
81 rightRequest := p.requestIndexes[j]
83 leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
84 rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
85 leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
86 rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
87 pending := func(index RequestIndex, current bool) int {
88 ret := t.pendingRequests.Get(index)
92 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
100 // Push requests that can't be served right now to the end. But we don't throw them away unless
101 // there's a better alternative. This is for when we're using the fast extension and get choked
102 // but our requests could still be good when we get unchoked.
103 if p.peer.peerChoking {
105 !p.peer.peerAllowedFast.Contains(leftPieceIndex),
106 !p.peer.peerAllowedFast.Contains(rightPieceIndex),
110 pending(leftRequest, leftCurrent),
111 pending(rightRequest, rightCurrent))
112 ml = ml.Bool(!leftCurrent, !rightCurrent)
113 leftPiece := t.piece(int(leftPieceIndex))
114 rightPiece := t.piece(int(rightPieceIndex))
116 -int(leftPiece.purePriority()),
117 -int(rightPiece.purePriority()),
120 int(leftPiece.availability),
121 int(rightPiece.availability))
122 ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
123 ml = ml.Uint32(leftRequest, rightRequest)
127 func (p *peerRequests) Swap(i, j int) {
128 p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
131 func (p *peerRequests) Push(x interface{}) {
132 p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
135 func (p *peerRequests) Pop() interface{} {
136 last := len(p.requestIndexes) - 1
137 x := p.requestIndexes[last]
138 p.requestIndexes = p.requestIndexes[:last]
142 type desiredRequestState struct {
143 Requests []RequestIndex
147 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
148 input := p.t.getRequestStrategyInput()
149 requestHeap := peerRequests{
152 request_strategy.GetRequestablePieces(
154 p.t.cl.pieceRequestOrder[p.t.storage.Capacity],
155 func(ih InfoHash, pieceIndex int) {
156 if ih != p.t.infoHash {
159 if !p.peerHasPiece(pieceIndex) {
162 allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
163 p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) {
164 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
165 // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
169 // We must signal interest to request this
170 desired.Interested = true
171 // We can make or will allow sustaining a request here if we're not choked, or
172 // have made the request previously (presumably while unchoked), and haven't had
173 // the peer respond yet (and the request was retained because we are using the
175 if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
176 // We can't request this right now.
180 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
184 p.t.assertPendingRequests()
185 heap.Init(&requestHeap)
186 for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
187 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
188 desired.Requests = append(desired.Requests, requestIndex)
193 func (p *Peer) maybeUpdateActualRequestState() bool {
194 if p.needRequestUpdate == "" {
199 context.Background(),
200 pprof.Labels("update request", p.needRequestUpdate),
201 func(_ context.Context) {
202 next := p.getDesiredRequestState()
203 more = p.applyRequestState(next)
209 // Transmit/action the request state to the peer.
210 func (p *Peer) applyRequestState(next desiredRequestState) bool {
211 current := &p.actualRequestState
212 if !p.setInterested(next.Interested) {
216 cancel := current.Requests.Clone()
217 for _, ri := range next.Requests {
220 cancel.Iterate(func(req uint32) bool {
229 for i := 0; i < len(next.Requests); i++ {
230 req := next.Requests[i]
231 if p.cancelledRequests.Contains(req) {
232 // Waiting for a reject or piece message, which will suitably trigger us to update our
233 // requests, so we can skip this one with no additional consideration.
236 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
237 // in the calculation of the requests. However, if we cancelled requests and they haven't
238 // been rejected or serviced yet with the fast extension enabled, we can end up with more
239 // extra outstanding requests. We could subtract the number of outstanding cancels from the
240 // next request cardinality, but peers might not like that.
241 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
242 // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
243 // next.Requests.GetCardinality(),
244 // p.cancelledRequests.GetCardinality(),
245 // current.Requests.GetCardinality(),
246 // p.nominalMaxRequests(),
250 otherPending := p.t.pendingRequests.Get(next.Requests[0])
251 if p.actualRequestState.Requests.Contains(next.Requests[0]) {
254 if otherPending < lastPending {
255 // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
256 // doesn't, our shuffling condition could be wrong.
259 // If the request has already been requested by another peer, shuffle this and the rest of
260 // the requests (since according to the increasing condition, the rest of the indices
261 // already have an outstanding request with another peer).
262 if !shuffled && otherPending > 0 {
263 shuffleReqs := next.Requests[i:]
264 rand.Shuffle(len(shuffleReqs), func(i, j int) {
265 shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
267 // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
274 more = p.mustRequest(req)
279 p.updateRequestsTimer.Stop()
281 p.needRequestUpdate = ""
282 if !current.Requests.IsEmpty() {
283 p.updateRequestsTimer.Reset(3 * time.Second)