13 "github.com/anacrolix/log"
14 "github.com/anacrolix/multiless"
16 request_strategy "github.com/anacrolix/torrent/request-strategy"
19 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
20 ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
21 for _, t := range cl.torrents {
23 // This would be removed if metadata is handled here. We have to guard against not
24 // knowing the piece size. If we have no info, we have no pieces too, so the end result
28 rst := request_strategy.Torrent{
30 ChunksPerPiece: t.chunksPerRegularPiece(),
33 rst.Capacity = t.storage.Capacity
35 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
36 for i := range t.pieces {
38 rst.Pieces = append(rst.Pieces, request_strategy.Piece{
39 Request: !t.ignorePieceForRequests(i),
40 Priority: p.purePriority(),
41 Partial: t.piecePartiallyDownloaded(i),
42 Availability: p.availability,
43 Length: int64(p.length()),
44 NumPendingChunks: int(t.pieceNumPendingChunks(i)),
45 IterPendingChunks: &p.undirtiedChunksIter,
50 return request_strategy.Input{
52 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
57 gob.Register(peerId{})
65 func (p peerId) Uintptr() uintptr {
69 func (p peerId) GobEncode() (b []byte, _ error) {
70 *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
71 Data: uintptr(unsafe.Pointer(&p.ptr)),
72 Len: int(unsafe.Sizeof(p.ptr)),
73 Cap: int(unsafe.Sizeof(p.ptr)),
78 func (p *peerId) GobDecode(b []byte) error {
79 if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
82 ptr := unsafe.Pointer(&b[0])
83 p.ptr = *(*uintptr)(ptr)
85 dst := reflect.SliceHeader{
86 Data: uintptr(unsafe.Pointer(&p.Peer)),
87 Len: int(unsafe.Sizeof(p.Peer)),
88 Cap: int(unsafe.Sizeof(p.Peer)),
90 copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
95 RequestIndex = request_strategy.RequestIndex
96 chunkIndexType = request_strategy.ChunkIndex
99 type peerRequests struct {
100 requestIndexes []RequestIndex
102 torrentStrategyInput *request_strategy.Torrent
105 func (p *peerRequests) Len() int {
106 return len(p.requestIndexes)
109 func (p *peerRequests) Less(i, j int) bool {
110 leftRequest := p.requestIndexes[i]
111 rightRequest := p.requestIndexes[j]
113 leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
114 rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
115 leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
116 rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
117 pending := func(index RequestIndex, current bool) int {
118 ret := t.pendingRequests.Get(index)
122 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
129 ml := multiless.New()
130 // Push requests that can't be served right now to the end. But we don't throw them away unless
131 // there's a better alternative. This is for when we're using the fast extension and get choked
132 // but our requests could still be good when we get unchoked.
133 if p.peer.peerChoking {
135 !p.peer.peerAllowedFast.Contains(leftPieceIndex),
136 !p.peer.peerAllowedFast.Contains(rightPieceIndex),
140 pending(leftRequest, leftCurrent),
141 pending(rightRequest, rightCurrent))
142 ml = ml.Bool(!leftCurrent, !rightCurrent)
144 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
145 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
148 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
149 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
150 ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
151 ml = ml.Uint32(leftRequest, rightRequest)
155 func (p *peerRequests) Swap(i, j int) {
156 p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
159 func (p *peerRequests) Push(x interface{}) {
160 p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
163 func (p *peerRequests) Pop() interface{} {
164 last := len(p.requestIndexes) - 1
165 x := p.requestIndexes[last]
166 p.requestIndexes = p.requestIndexes[:last]
170 type desiredRequestState struct {
171 Requests []RequestIndex
175 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
176 input := p.t.cl.getRequestStrategyInput()
177 requestHeap := peerRequests{
180 for i := range input.Torrents {
181 t := &input.Torrents[i]
182 if t.InfoHash == p.t.infoHash {
183 requestHeap.torrentStrategyInput = t
187 request_strategy.GetRequestablePieces(
189 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
190 if t.InfoHash != p.t.infoHash {
193 if !p.peerHasPiece(pieceIndex) {
196 allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
197 rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
198 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
199 // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
203 // We must signal interest to request this
204 desired.Interested = true
205 // We can make or will allow sustaining a request here if we're not choked, or
206 // have made the request previously (presumably while unchoked), and haven't had
207 // the peer respond yet (and the request was retained because we are using the
209 if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
210 // We can't request this right now.
214 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
218 p.t.assertPendingRequests()
219 heap.Init(&requestHeap)
220 for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
221 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
222 desired.Requests = append(desired.Requests, requestIndex)
227 func (p *Peer) maybeUpdateActualRequestState() bool {
228 if p.needRequestUpdate == "" {
233 context.Background(),
234 pprof.Labels("update request", p.needRequestUpdate),
235 func(_ context.Context) {
236 next := p.getDesiredRequestState()
237 more = p.applyRequestState(next)
243 // Transmit/action the request state to the peer.
244 func (p *Peer) applyRequestState(next desiredRequestState) bool {
245 current := &p.actualRequestState
246 if !p.setInterested(next.Interested) {
250 cancel := current.Requests.Clone()
251 for _, ri := range next.Requests {
254 cancel.Iterate(func(req uint32) bool {
263 for i := 0; i < len(next.Requests); i++ {
264 req := next.Requests[i]
265 if p.cancelledRequests.Contains(req) {
266 // Waiting for a reject or piece message, which will suitably trigger us to update our
267 // requests, so we can skip this one with no additional consideration.
270 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
271 // in the calculation of the requests. However, if we cancelled requests and they haven't
272 // been rejected or serviced yet with the fast extension enabled, we can end up with more
273 // extra outstanding requests. We could subtract the number of outstanding cancels from the
274 // next request cardinality, but peers might not like that.
275 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
276 // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
277 // next.Requests.GetCardinality(),
278 // p.cancelledRequests.GetCardinality(),
279 // current.Requests.GetCardinality(),
280 // p.nominalMaxRequests(),
284 otherPending := p.t.pendingRequests.Get(next.Requests[0])
285 if p.actualRequestState.Requests.Contains(next.Requests[0]) {
288 if otherPending < lastPending {
289 // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
290 // doesn't, our shuffling condition could be wrong.
293 // If the request has already been requested by another peer, shuffle this and the rest of
294 // the requests (since according to the increasing condition, the rest of the indices
295 // already have an outstanding request with another peer).
296 if !shuffled && otherPending > 0 {
297 shuffleReqs := next.Requests[i:]
298 rand.Shuffle(len(shuffleReqs), func(i, j int) {
299 shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
301 // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
308 more = p.mustRequest(req)
313 p.updateRequestsTimer.Stop()
315 p.needRequestUpdate = ""
316 if !current.Requests.IsEmpty() {
317 p.updateRequestsTimer.Reset(3 * time.Second)