12 "github.com/anacrolix/log"
13 "github.com/anacrolix/multiless"
15 request_strategy "github.com/anacrolix/torrent/request-strategy"
18 func (cl *Client) tickleRequester() {
19 cl.updateRequests.Broadcast()
22 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
23 ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
24 for _, t := range cl.torrents {
26 // This would be removed if metadata is handled here. We have to guard against not
27 // knowing the piece size. If we have no info, we have no pieces too, so the end result
31 rst := request_strategy.Torrent{
33 ChunksPerPiece: t.chunksPerRegularPiece(),
36 rst.Capacity = t.storage.Capacity
38 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
39 for i := range t.pieces {
41 rst.Pieces = append(rst.Pieces, request_strategy.Piece{
42 Request: !t.ignorePieceForRequests(i),
43 Priority: p.purePriority(),
44 Partial: t.piecePartiallyDownloaded(i),
45 Availability: p.availability,
46 Length: int64(p.length()),
47 NumPendingChunks: int(t.pieceNumPendingChunks(i)),
48 IterPendingChunks: &p.undirtiedChunksIter,
51 t.iterPeers(func(p *Peer) {
55 if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
56 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
58 p.piecesReceivedSinceLastRequestUpdate = 0
59 rst.Peers = append(rst.Peers, request_strategy.Peer{
60 Pieces: *p.newPeerPieces(),
61 MaxRequests: p.nominalMaxRequests(),
62 ExistingRequests: p.actualRequestState.Requests,
63 Choking: p.peerChoking,
64 PieceAllowedFast: p.peerAllowedFast,
65 DownloadRate: p.downloadRate(),
66 Age: time.Since(p.completedHandshake),
69 ptr: uintptr(unsafe.Pointer(p)),
75 return request_strategy.Input{
77 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
82 gob.Register(peerId{})
90 func (p peerId) Uintptr() uintptr {
94 func (p peerId) GobEncode() (b []byte, _ error) {
95 *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
96 Data: uintptr(unsafe.Pointer(&p.ptr)),
97 Len: int(unsafe.Sizeof(p.ptr)),
98 Cap: int(unsafe.Sizeof(p.ptr)),
103 func (p *peerId) GobDecode(b []byte) error {
104 if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
107 ptr := unsafe.Pointer(&b[0])
108 p.ptr = *(*uintptr)(ptr)
109 log.Printf("%p", ptr)
110 dst := reflect.SliceHeader{
111 Data: uintptr(unsafe.Pointer(&p.Peer)),
112 Len: int(unsafe.Sizeof(p.Peer)),
113 Cap: int(unsafe.Sizeof(p.Peer)),
115 copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
119 type RequestIndex = request_strategy.RequestIndex
120 type chunkIndexType = request_strategy.ChunkIndex
122 type peerRequests struct {
123 requestIndexes []RequestIndex
125 torrentStrategyInput request_strategy.Torrent
128 func (p *peerRequests) Len() int {
129 return len(p.requestIndexes)
132 func (p *peerRequests) Less(i, j int) bool {
133 leftRequest := p.requestIndexes[i]
134 rightRequest := p.requestIndexes[j]
136 leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
137 rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
138 leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
139 rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
140 pending := func(index RequestIndex, current bool) int {
141 ret := t.pendingRequests.Get(index)
145 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
152 ml := multiless.New()
153 // Push requests that can't be served right now to the end. But we don't throw them away unless
154 // there's a better alternative. This is for when we're using the fast extension and get choked
155 // but our requests could still be good when we get unchoked.
156 if p.peer.peerChoking {
158 !p.peer.peerAllowedFast.Contains(leftPieceIndex),
159 !p.peer.peerAllowedFast.Contains(rightPieceIndex),
163 pending(leftRequest, leftCurrent),
164 pending(rightRequest, rightCurrent))
165 ml = ml.Bool(!leftCurrent, !rightCurrent)
167 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
168 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
171 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
172 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
173 ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
174 ml = ml.Uint32(leftRequest, rightRequest)
178 func (p *peerRequests) Swap(i, j int) {
179 p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
182 func (p *peerRequests) Push(x interface{}) {
183 p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
186 func (p *peerRequests) Pop() interface{} {
187 last := len(p.requestIndexes) - 1
188 x := p.requestIndexes[last]
189 p.requestIndexes = p.requestIndexes[:last]
193 type desiredRequestState struct {
194 Requests []RequestIndex
198 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
199 input := p.t.cl.getRequestStrategyInput()
200 requestHeap := peerRequests{
203 for _, t := range input.Torrents {
204 if t.InfoHash == p.t.infoHash {
205 requestHeap.torrentStrategyInput = t
209 request_strategy.GetRequestablePieces(
211 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
212 if t.InfoHash != p.t.infoHash {
215 if !p.peerHasPiece(pieceIndex) {
218 allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
219 rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
220 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
221 //if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
225 // We must signal interest to request this
226 desired.Interested = true
227 // We can make or will allow sustaining a request here if we're not choked, or
228 // have made the request previously (presumably while unchoked), and haven't had
229 // the peer respond yet (and the request was retained because we are using the
231 if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
232 // We can't request this right now.
236 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
240 p.t.assertPendingRequests()
241 heap.Init(&requestHeap)
242 for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
243 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
244 desired.Requests = append(desired.Requests, requestIndex)
249 func (p *Peer) maybeUpdateActualRequestState() bool {
250 if p.needRequestUpdate == "" {
255 context.Background(),
256 pprof.Labels("update request", p.needRequestUpdate),
257 func(_ context.Context) {
258 next := p.getDesiredRequestState()
259 more = p.applyRequestState(next)
265 // Transmit/action the request state to the peer.
266 func (p *Peer) applyRequestState(next desiredRequestState) bool {
267 current := &p.actualRequestState
268 if !p.setInterested(next.Interested) {
272 cancel := current.Requests.Clone()
273 for _, ri := range next.Requests {
276 cancel.Iterate(func(req uint32) bool {
283 for _, req := range next.Requests {
284 if p.cancelledRequests.Contains(req) {
285 // Waiting for a reject or piece message, which will suitably trigger us to update our
286 // requests, so we can skip this one with no additional consideration.
289 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
290 // in the calculation of the requests. However, if we cancelled requests and they haven't
291 // been rejected or serviced yet with the fast extension enabled, we can end up with more
292 // extra outstanding requests. We could subtract the number of outstanding cancels from the
293 // next request cardinality, but peers might not like that.
294 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
295 //log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
296 // next.Requests.GetCardinality(),
297 // p.cancelledRequests.GetCardinality(),
298 // current.Requests.GetCardinality(),
299 // p.nominalMaxRequests(),
303 more = p.mustRequest(req)
308 p.updateRequestsTimer.Stop()
310 p.needRequestUpdate = ""
311 if !current.Requests.IsEmpty() {
312 p.updateRequestsTimer.Reset(3 * time.Second)