10 "github.com/RoaringBitmap/roaring"
11 "github.com/anacrolix/chansync/events"
12 "github.com/anacrolix/log"
13 "github.com/anacrolix/missinggo/v2/bitmap"
14 "github.com/anacrolix/multiless"
16 request_strategy "github.com/anacrolix/torrent/request-strategy"
19 // Calculate requests individually for each peer.
20 const peerRequesting = true
22 func (cl *Client) requester() {
24 update := func() events.Signaled {
28 return cl.updateRequests.Signaled()
30 minWait := time.After(100 * time.Millisecond)
31 maxWait := time.After(1000 * time.Millisecond)
33 case <-cl.closed.Done():
39 case <-cl.closed.Done():
47 func (cl *Client) tickleRequester() {
48 cl.updateRequests.Broadcast()
51 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
52 ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
53 for _, t := range cl.torrents {
55 // This would be removed if metadata is handled here. We have to guard against not
56 // knowing the piece size. If we have no info, we have no pieces too, so the end result
60 rst := request_strategy.Torrent{
62 ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize),
65 rst.Capacity = t.storage.Capacity
67 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
68 for i := range t.pieces {
70 rst.Pieces = append(rst.Pieces, request_strategy.Piece{
71 Request: !t.ignorePieceForRequests(i),
72 Priority: p.purePriority(),
73 Partial: t.piecePartiallyDownloaded(i),
74 Availability: p.availability,
75 Length: int64(p.length()),
76 NumPendingChunks: int(t.pieceNumPendingChunks(i)),
77 IterPendingChunks: p.undirtiedChunksIter(),
80 t.iterPeers(func(p *Peer) {
84 if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
85 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
87 p.piecesReceivedSinceLastRequestUpdate = 0
88 rst.Peers = append(rst.Peers, request_strategy.Peer{
89 Pieces: *p.newPeerPieces(),
90 MaxRequests: p.nominalMaxRequests(),
91 ExistingRequests: p.actualRequestState.Requests,
92 Choking: p.peerChoking,
93 PieceAllowedFast: p.peerAllowedFast,
94 DownloadRate: p.downloadRate(),
95 Age: time.Since(p.completedHandshake),
98 ptr: uintptr(unsafe.Pointer(p)),
104 return request_strategy.Input{
106 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
110 func (cl *Client) doRequests() {
111 input := cl.getRequestStrategyInput()
112 nextPeerStates := request_strategy.Run(input)
113 for p, state := range nextPeerStates {
114 setPeerNextRequestState(p, state)
119 gob.Register(peerId{})
127 func (p peerId) Uintptr() uintptr {
131 func (p peerId) GobEncode() (b []byte, _ error) {
132 *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
133 Data: uintptr(unsafe.Pointer(&p.ptr)),
134 Len: int(unsafe.Sizeof(p.ptr)),
135 Cap: int(unsafe.Sizeof(p.ptr)),
140 func (p *peerId) GobDecode(b []byte) error {
141 if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
144 ptr := unsafe.Pointer(&b[0])
145 p.ptr = *(*uintptr)(ptr)
146 log.Printf("%p", ptr)
147 dst := reflect.SliceHeader{
148 Data: uintptr(unsafe.Pointer(&p.Peer)),
149 Len: int(unsafe.Sizeof(p.Peer)),
150 Cap: int(unsafe.Sizeof(p.Peer)),
152 copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
156 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
157 p := _p.(peerId).Peer
158 p.nextRequestState = rp
159 p.onNextRequestStateChanged()
162 type RequestIndex = request_strategy.RequestIndex
163 type chunkIndexType = request_strategy.ChunkIndex
165 type peerRequests struct {
166 requestIndexes []RequestIndex
168 torrentStrategyInput request_strategy.Torrent
171 func (p peerRequests) Len() int {
172 return len(p.requestIndexes)
175 func (p peerRequests) Less(i, j int) bool {
176 leftRequest := p.requestIndexes[i]
177 rightRequest := p.requestIndexes[j]
179 leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
180 rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
181 leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
182 rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
183 pending := func(index RequestIndex, current bool) int {
184 ret := t.pendingRequests[index]
190 ml := multiless.New()
192 pending(leftRequest, leftCurrent),
193 pending(rightRequest, rightCurrent))
194 ml = ml.Bool(rightCurrent, leftCurrent)
196 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
197 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority))
199 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
200 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
201 ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
202 ml = ml.Uint32(leftRequest, rightRequest)
206 func (p peerRequests) Swap(i, j int) {
207 p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
210 func (p *peerRequests) Push(x interface{}) {
211 p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
214 func (p *peerRequests) Pop() interface{} {
215 last := len(p.requestIndexes) - 1
216 x := p.requestIndexes[last]
217 p.requestIndexes = p.requestIndexes[:last]
221 func (p *Peer) getDesiredRequestState() (desired requestState) {
222 input := p.t.cl.getRequestStrategyInput()
223 requestHeap := peerRequests{
227 for _, t := range input.Torrents {
228 if t.InfoHash == p.t.infoHash {
229 requestHeap.torrentStrategyInput = t
233 request_strategy.GetRequestablePieces(
235 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
236 if t.InfoHash != p.t.infoHash {
239 if !p.peerHasPiece(pieceIndex) {
242 rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
243 requestHeap.requestIndexes = append(
244 requestHeap.requestIndexes,
245 p.t.pieceRequestIndexOffset(pieceIndex)+ci)
249 heap.Init(&requestHeap)
250 for requestHeap.Len() != 0 && desired.Requests.GetCardinality() < uint64(p.nominalMaxRequests()) {
251 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
252 pieceIndex := requestIndex / p.t.chunksPerRegularPiece()
253 allowedFast := p.peerAllowedFast.Contains(pieceIndex)
255 desired.Interested = true
257 if allowedFast || !p.peerChoking {
258 desired.Requests.Add(requestIndex)
264 func (p *Peer) applyNextRequestState() bool {
265 next := p.getDesiredRequestState()
266 current := p.actualRequestState
267 if !p.setInterested(next.Interested) {
271 cancel := roaring.AndNot(¤t.Requests, &next.Requests)
272 cancel.Iterate(func(req uint32) bool {
279 next.Requests.Iterate(func(req uint32) bool {
280 // This could happen if the peer chokes us between the next state being generated, and us
281 // trying to transmit the state.
282 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) {
286 more, err = p.request(req)