12 "github.com/RoaringBitmap/roaring"
13 "github.com/anacrolix/log"
14 "github.com/anacrolix/missinggo/v2/bitmap"
15 "github.com/anacrolix/multiless"
17 request_strategy "github.com/anacrolix/torrent/request-strategy"
20 func (cl *Client) tickleRequester() {
21 cl.updateRequests.Broadcast()
24 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
25 ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
26 for _, t := range cl.torrents {
28 // This would be removed if metadata is handled here. We have to guard against not
29 // knowing the piece size. If we have no info, we have no pieces too, so the end result
33 rst := request_strategy.Torrent{
35 ChunksPerPiece: t.chunksPerRegularPiece(),
38 rst.Capacity = t.storage.Capacity
40 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
41 for i := range t.pieces {
43 rst.Pieces = append(rst.Pieces, request_strategy.Piece{
44 Request: !t.ignorePieceForRequests(i),
45 Priority: p.purePriority(),
46 Partial: t.piecePartiallyDownloaded(i),
47 Availability: p.availability,
48 Length: int64(p.length()),
49 NumPendingChunks: int(t.pieceNumPendingChunks(i)),
50 IterPendingChunks: p.undirtiedChunksIter(),
53 t.iterPeers(func(p *Peer) {
57 if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
58 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
60 p.piecesReceivedSinceLastRequestUpdate = 0
61 rst.Peers = append(rst.Peers, request_strategy.Peer{
62 Pieces: *p.newPeerPieces(),
63 MaxRequests: p.nominalMaxRequests(),
64 ExistingRequests: p.actualRequestState.Requests,
65 Choking: p.peerChoking,
66 PieceAllowedFast: p.peerAllowedFast,
67 DownloadRate: p.downloadRate(),
68 Age: time.Since(p.completedHandshake),
71 ptr: uintptr(unsafe.Pointer(p)),
77 return request_strategy.Input{
79 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
84 gob.Register(peerId{})
92 func (p peerId) Uintptr() uintptr {
96 func (p peerId) GobEncode() (b []byte, _ error) {
97 *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
98 Data: uintptr(unsafe.Pointer(&p.ptr)),
99 Len: int(unsafe.Sizeof(p.ptr)),
100 Cap: int(unsafe.Sizeof(p.ptr)),
105 func (p *peerId) GobDecode(b []byte) error {
106 if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
109 ptr := unsafe.Pointer(&b[0])
110 p.ptr = *(*uintptr)(ptr)
111 log.Printf("%p", ptr)
112 dst := reflect.SliceHeader{
113 Data: uintptr(unsafe.Pointer(&p.Peer)),
114 Len: int(unsafe.Sizeof(p.Peer)),
115 Cap: int(unsafe.Sizeof(p.Peer)),
117 copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
121 type RequestIndex = request_strategy.RequestIndex
122 type chunkIndexType = request_strategy.ChunkIndex
124 type peerRequests struct {
125 requestIndexes []RequestIndex
127 torrentStrategyInput request_strategy.Torrent
130 func (p peerRequests) Len() int {
131 return len(p.requestIndexes)
134 func (p peerRequests) Less(i, j int) bool {
135 leftRequest := p.requestIndexes[i]
136 rightRequest := p.requestIndexes[j]
138 leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
139 rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
140 leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
141 rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
142 pending := func(index RequestIndex, current bool) int {
143 ret := t.pendingRequests.Get(index)
149 ml := multiless.New()
150 // Push requests that can't be served right now to the end. But we don't throw them away unless
151 // there's a better alternative. This is for when we're using the fast extension and get choked
152 // but our requests could still be good when we get unchoked.
153 if p.peer.peerChoking {
155 !p.peer.peerAllowedFast.Contains(leftPieceIndex),
156 !p.peer.peerAllowedFast.Contains(rightPieceIndex),
160 pending(leftRequest, leftCurrent),
161 pending(rightRequest, rightCurrent))
162 ml = ml.Bool(!leftCurrent, !rightCurrent)
164 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
165 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
168 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
169 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
170 ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
171 ml = ml.Uint32(leftRequest, rightRequest)
175 func (p *peerRequests) Swap(i, j int) {
176 p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
179 func (p *peerRequests) Push(x interface{}) {
180 p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
183 func (p *peerRequests) Pop() interface{} {
184 last := len(p.requestIndexes) - 1
185 x := p.requestIndexes[last]
186 p.requestIndexes = p.requestIndexes[:last]
190 func (p *Peer) getDesiredRequestState() (desired requestState) {
191 input := p.t.cl.getRequestStrategyInput()
192 requestHeap := peerRequests{
195 for _, t := range input.Torrents {
196 if t.InfoHash == p.t.infoHash {
197 requestHeap.torrentStrategyInput = t
201 request_strategy.GetRequestablePieces(
203 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
204 if t.InfoHash != p.t.infoHash {
207 if !p.peerHasPiece(pieceIndex) {
210 allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
211 rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
213 // We must signal interest to request this..
214 desired.Interested = true
215 if p.peerChoking && !p.actualRequestState.Requests.Contains(ci) {
216 // We can't request this right now.
220 requestHeap.requestIndexes = append(
221 requestHeap.requestIndexes,
222 p.t.pieceRequestIndexOffset(pieceIndex)+ci)
226 heap.Init(&requestHeap)
227 for requestHeap.Len() != 0 && desired.Requests.GetCardinality() < uint64(p.nominalMaxRequests()) {
228 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
229 desired.Requests.Add(requestIndex)
234 func (p *Peer) applyNextRequestState() bool {
235 if p.needRequestUpdate == "" {
240 context.Background(),
241 pprof.Labels("update request", p.needRequestUpdate),
242 func(_ context.Context) {
243 next := p.getDesiredRequestState()
244 more = p.applyRequestState(next)
250 func (p *Peer) applyRequestState(next requestState) bool {
251 current := p.actualRequestState
252 if !p.setInterested(next.Interested) {
256 cancel := roaring.AndNot(¤t.Requests, &next.Requests)
257 cancel.Iterate(func(req uint32) bool {
264 next.Requests.Iterate(func(req uint32) bool {
265 // This could happen if the peer chokes us between the next state being generated, and us
266 // trying to transmit the state.
267 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) {
271 more, err = p.request(req)
280 p.needRequestUpdate = ""