13 "github.com/anacrolix/log"
14 "github.com/anacrolix/multiless"
15 "github.com/anacrolix/torrent/metainfo"
17 request_strategy "github.com/anacrolix/torrent/request-strategy"
20 // Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
21 func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
22 input.MaxUnverifiedBytes = cl.config.MaxUnverifiedBytes
23 if !primaryTorrent.haveInfo() {
26 if capFunc := primaryTorrent.storage.Capacity; capFunc != nil {
27 if cap, ok := (*capFunc)(); ok {
31 input.Torrents = make(map[metainfo.Hash]request_strategy.Torrent, len(cl.torrents))
32 for _, t := range cl.torrents {
34 // This would be removed if metadata is handled here. Determining chunks per piece
35 // requires the info. If we have no info, we have no pieces too, so the end result is
39 if t.storage.Capacity != primaryTorrent.storage.Capacity {
42 input.Torrents[t.infoHash] = t.requestStrategyTorrentInput()
47 func (t *Torrent) getRequestStrategyInput() request_strategy.Input {
48 return t.cl.getRequestStrategyInput(t)
51 func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent {
52 rst := request_strategy.Torrent{
54 ChunksPerPiece: t.chunksPerRegularPiece(),
56 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
57 for i := range t.pieces {
58 rst.Pieces = append(rst.Pieces, t.makeRequestStrategyPiece(i))
63 func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
64 return request_strategy.PieceRequestOrderState{
65 Priority: t.piece(i).purePriority(),
66 Partial: t.piecePartiallyDownloaded(i),
67 Availability: t.piece(i).availability,
71 func (t *Torrent) makeRequestStrategyPiece(i int) request_strategy.Piece {
73 return request_strategy.Piece{
74 Request: !t.ignorePieceForRequests(i),
75 Priority: p.purePriority(),
76 Partial: t.piecePartiallyDownloaded(i),
77 Availability: p.availability,
78 Length: int64(p.length()),
79 NumPendingChunks: int(t.pieceNumPendingChunks(i)),
80 IterPendingChunks: &p.undirtiedChunksIter,
85 gob.Register(peerId{})
93 func (p peerId) Uintptr() uintptr {
97 func (p peerId) GobEncode() (b []byte, _ error) {
98 *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
99 Data: uintptr(unsafe.Pointer(&p.ptr)),
100 Len: int(unsafe.Sizeof(p.ptr)),
101 Cap: int(unsafe.Sizeof(p.ptr)),
106 func (p *peerId) GobDecode(b []byte) error {
107 if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
110 ptr := unsafe.Pointer(&b[0])
111 p.ptr = *(*uintptr)(ptr)
112 log.Printf("%p", ptr)
113 dst := reflect.SliceHeader{
114 Data: uintptr(unsafe.Pointer(&p.Peer)),
115 Len: int(unsafe.Sizeof(p.Peer)),
116 Cap: int(unsafe.Sizeof(p.Peer)),
118 copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
123 RequestIndex = request_strategy.RequestIndex
124 chunkIndexType = request_strategy.ChunkIndex
127 type peerRequests struct {
128 requestIndexes []RequestIndex
130 torrentStrategyInput request_strategy.Torrent
133 func (p *peerRequests) Len() int {
134 return len(p.requestIndexes)
137 func (p *peerRequests) Less(i, j int) bool {
138 leftRequest := p.requestIndexes[i]
139 rightRequest := p.requestIndexes[j]
141 leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
142 rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
143 leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
144 rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
145 pending := func(index RequestIndex, current bool) int {
146 ret := t.pendingRequests.Get(index)
150 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
157 ml := multiless.New()
158 // Push requests that can't be served right now to the end. But we don't throw them away unless
159 // there's a better alternative. This is for when we're using the fast extension and get choked
160 // but our requests could still be good when we get unchoked.
161 if p.peer.peerChoking {
163 !p.peer.peerAllowedFast.Contains(leftPieceIndex),
164 !p.peer.peerAllowedFast.Contains(rightPieceIndex),
168 pending(leftRequest, leftCurrent),
169 pending(rightRequest, rightCurrent))
170 ml = ml.Bool(!leftCurrent, !rightCurrent)
172 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
173 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
176 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
177 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
178 ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
179 ml = ml.Uint32(leftRequest, rightRequest)
183 func (p *peerRequests) Swap(i, j int) {
184 p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
187 func (p *peerRequests) Push(x interface{}) {
188 p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
191 func (p *peerRequests) Pop() interface{} {
192 last := len(p.requestIndexes) - 1
193 x := p.requestIndexes[last]
194 p.requestIndexes = p.requestIndexes[:last]
198 type desiredRequestState struct {
199 Requests []RequestIndex
203 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
204 input := p.t.getRequestStrategyInput()
205 requestHeap := peerRequests{
208 requestHeap.torrentStrategyInput = input.Torrents[p.t.infoHash]
209 request_strategy.GetRequestablePieces(
211 p.t.cl.pieceRequestOrder[p.t.storage.Capacity],
212 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
213 if t.InfoHash != p.t.infoHash {
216 if !p.peerHasPiece(pieceIndex) {
219 allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
220 rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
221 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
222 // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
226 // We must signal interest to request this
227 desired.Interested = true
228 // We can make or will allow sustaining a request here if we're not choked, or
229 // have made the request previously (presumably while unchoked), and haven't had
230 // the peer respond yet (and the request was retained because we are using the
232 if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
233 // We can't request this right now.
237 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
241 p.t.assertPendingRequests()
242 heap.Init(&requestHeap)
243 for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
244 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
245 desired.Requests = append(desired.Requests, requestIndex)
250 func (p *Peer) maybeUpdateActualRequestState() bool {
251 if p.needRequestUpdate == "" {
256 context.Background(),
257 pprof.Labels("update request", p.needRequestUpdate),
258 func(_ context.Context) {
259 next := p.getDesiredRequestState()
260 more = p.applyRequestState(next)
266 // Transmit/action the request state to the peer.
267 func (p *Peer) applyRequestState(next desiredRequestState) bool {
268 current := &p.actualRequestState
269 if !p.setInterested(next.Interested) {
273 cancel := current.Requests.Clone()
274 for _, ri := range next.Requests {
277 cancel.Iterate(func(req uint32) bool {
286 for i := 0; i < len(next.Requests); i++ {
287 req := next.Requests[i]
288 if p.cancelledRequests.Contains(req) {
289 // Waiting for a reject or piece message, which will suitably trigger us to update our
290 // requests, so we can skip this one with no additional consideration.
293 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
294 // in the calculation of the requests. However, if we cancelled requests and they haven't
295 // been rejected or serviced yet with the fast extension enabled, we can end up with more
296 // extra outstanding requests. We could subtract the number of outstanding cancels from the
297 // next request cardinality, but peers might not like that.
298 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
299 // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
300 // next.Requests.GetCardinality(),
301 // p.cancelledRequests.GetCardinality(),
302 // current.Requests.GetCardinality(),
303 // p.nominalMaxRequests(),
307 otherPending := p.t.pendingRequests.Get(next.Requests[0])
308 if p.actualRequestState.Requests.Contains(next.Requests[0]) {
311 if otherPending < lastPending {
312 // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
313 // doesn't, our shuffling condition could be wrong.
316 // If the request has already been requested by another peer, shuffle this and the rest of
317 // the requests (since according to the increasing condition, the rest of the indices
318 // already have an outstanding request with another peer).
319 if !shuffled && otherPending > 0 {
320 shuffleReqs := next.Requests[i:]
321 rand.Shuffle(len(shuffleReqs), func(i, j int) {
322 shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
324 // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
331 more = p.mustRequest(req)
336 p.updateRequestsTimer.Stop()
338 p.needRequestUpdate = ""
339 if !current.Requests.IsEmpty() {
340 p.updateRequestsTimer.Reset(3 * time.Second)