12 "github.com/RoaringBitmap/roaring"
13 "github.com/anacrolix/log"
14 "github.com/anacrolix/multiless"
16 request_strategy "github.com/anacrolix/torrent/request-strategy"
19 func (cl *Client) tickleRequester() {
20 cl.updateRequests.Broadcast()
23 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
24 ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
25 for _, t := range cl.torrents {
27 // This would be removed if metadata is handled here. We have to guard against not
28 // knowing the piece size. If we have no info, we have no pieces too, so the end result
32 rst := request_strategy.Torrent{
34 ChunksPerPiece: t.chunksPerRegularPiece(),
37 rst.Capacity = t.storage.Capacity
39 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
40 for i := range t.pieces {
42 rst.Pieces = append(rst.Pieces, request_strategy.Piece{
43 Request: !t.ignorePieceForRequests(i),
44 Priority: p.purePriority(),
45 Partial: t.piecePartiallyDownloaded(i),
46 Availability: p.availability,
47 Length: int64(p.length()),
48 NumPendingChunks: int(t.pieceNumPendingChunks(i)),
49 IterPendingChunks: p.undirtiedChunksIter(),
52 t.iterPeers(func(p *Peer) {
56 if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
57 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
59 p.piecesReceivedSinceLastRequestUpdate = 0
60 rst.Peers = append(rst.Peers, request_strategy.Peer{
61 Pieces: *p.newPeerPieces(),
62 MaxRequests: p.nominalMaxRequests(),
63 ExistingRequests: p.actualRequestState.Requests,
64 Choking: p.peerChoking,
65 PieceAllowedFast: p.peerAllowedFast,
66 DownloadRate: p.downloadRate(),
67 Age: time.Since(p.completedHandshake),
70 ptr: uintptr(unsafe.Pointer(p)),
76 return request_strategy.Input{
78 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
83 gob.Register(peerId{})
91 func (p peerId) Uintptr() uintptr {
95 func (p peerId) GobEncode() (b []byte, _ error) {
96 *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
97 Data: uintptr(unsafe.Pointer(&p.ptr)),
98 Len: int(unsafe.Sizeof(p.ptr)),
99 Cap: int(unsafe.Sizeof(p.ptr)),
104 func (p *peerId) GobDecode(b []byte) error {
105 if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
108 ptr := unsafe.Pointer(&b[0])
109 p.ptr = *(*uintptr)(ptr)
110 log.Printf("%p", ptr)
111 dst := reflect.SliceHeader{
112 Data: uintptr(unsafe.Pointer(&p.Peer)),
113 Len: int(unsafe.Sizeof(p.Peer)),
114 Cap: int(unsafe.Sizeof(p.Peer)),
116 copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
120 type RequestIndex = request_strategy.RequestIndex
121 type chunkIndexType = request_strategy.ChunkIndex
123 type peerRequests struct {
124 requestIndexes []RequestIndex
126 torrentStrategyInput request_strategy.Torrent
129 func (p *peerRequests) Len() int {
130 return len(p.requestIndexes)
133 func (p *peerRequests) Less(i, j int) bool {
134 leftRequest := p.requestIndexes[i]
135 rightRequest := p.requestIndexes[j]
137 leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
138 rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
139 leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
140 rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
141 pending := func(index RequestIndex, current bool) int {
142 ret := t.pendingRequests.Get(index)
146 // I have a hunch that this could trigger for requests for chunks that are choked and not
147 // allowed fast, since the current conn shouldn't already be included. It's a very specific
148 // circumstance, and if it triggers I will fix it.
154 ml := multiless.New()
155 // Push requests that can't be served right now to the end. But we don't throw them away unless
156 // there's a better alternative. This is for when we're using the fast extension and get choked
157 // but our requests could still be good when we get unchoked.
158 if p.peer.peerChoking {
160 !p.peer.peerAllowedFast.Contains(leftPieceIndex),
161 !p.peer.peerAllowedFast.Contains(rightPieceIndex),
165 pending(leftRequest, leftCurrent),
166 pending(rightRequest, rightCurrent))
167 ml = ml.Bool(!leftCurrent, !rightCurrent)
169 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
170 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
173 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
174 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
175 ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
176 ml = ml.Uint32(leftRequest, rightRequest)
180 func (p *peerRequests) Swap(i, j int) {
181 p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
184 func (p *peerRequests) Push(x interface{}) {
185 p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
188 func (p *peerRequests) Pop() interface{} {
189 last := len(p.requestIndexes) - 1
190 x := p.requestIndexes[last]
191 p.requestIndexes = p.requestIndexes[:last]
195 func (p *Peer) getDesiredRequestState() (desired requestState) {
196 input := p.t.cl.getRequestStrategyInput()
197 requestHeap := peerRequests{
200 for _, t := range input.Torrents {
201 if t.InfoHash == p.t.infoHash {
202 requestHeap.torrentStrategyInput = t
206 request_strategy.GetRequestablePieces(
208 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
209 if t.InfoHash != p.t.infoHash {
212 if !p.peerHasPiece(pieceIndex) {
215 allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
216 rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
218 // We must signal interest to request this..
219 desired.Interested = true
220 if p.peerChoking && !p.actualRequestState.Requests.Contains(ci) {
221 // We can't request this right now.
225 requestHeap.requestIndexes = append(
226 requestHeap.requestIndexes,
227 p.t.pieceRequestIndexOffset(pieceIndex)+ci)
231 heap.Init(&requestHeap)
232 for requestHeap.Len() != 0 && desired.Requests.GetCardinality() < uint64(p.nominalMaxRequests()) {
233 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
234 desired.Requests.Add(requestIndex)
239 func (p *Peer) applyNextRequestState() bool {
240 if p.needRequestUpdate == "" {
245 context.Background(),
246 pprof.Labels("update request", p.needRequestUpdate),
247 func(_ context.Context) {
248 next := p.getDesiredRequestState()
249 more = p.applyRequestState(next)
255 func (p *Peer) applyRequestState(next requestState) bool {
256 current := p.actualRequestState
257 if !p.setInterested(next.Interested) {
261 cancel := roaring.AndNot(¤t.Requests, &next.Requests)
262 cancel.Iterate(func(req uint32) bool {
269 next.Requests.Iterate(func(req uint32) bool {
270 if p.cancelledRequests.Contains(req) {
271 log.Printf("waiting for cancelled request %v", req)
274 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
275 log.Printf("not assigning all requests [desired=%v, cancelled=%v, max=%v]",
276 current.Requests.GetCardinality(),
277 p.cancelledRequests.GetCardinality(),
278 p.nominalMaxRequests(),
283 more, err = p.request(req)
290 p.needRequestUpdate = ""