7 "github.com/RoaringBitmap/roaring"
8 "github.com/anacrolix/missinggo/v2/bitmap"
10 "github.com/anacrolix/chansync"
11 request_strategy "github.com/anacrolix/torrent/request-strategy"
14 // Calculate requests individually for each peer.
15 const peerRequesting = false
17 func (cl *Client) requester() {
19 update := func() chansync.Signaled {
23 return cl.updateRequests.Signaled()
25 minWait := time.After(100 * time.Millisecond)
26 maxWait := time.After(1000 * time.Millisecond)
28 case <-cl.closed.Done():
34 case <-cl.closed.Done():
42 func (cl *Client) tickleRequester() {
43 cl.updateRequests.Broadcast()
46 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
47 ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
48 for _, t := range cl.torrents {
50 // This would be removed if metadata is handled here. We have to guard against not
51 // knowing the piece size. If we have no info, we have no pieces too, so the end result
55 rst := request_strategy.Torrent{
57 ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize),
60 rst.Capacity = t.storage.Capacity
62 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
63 for i := range t.pieces {
65 rst.Pieces = append(rst.Pieces, request_strategy.Piece{
66 Request: !t.ignorePieceForRequests(i),
67 Priority: p.purePriority(),
68 Partial: t.piecePartiallyDownloaded(i),
69 Availability: p.availability,
70 Length: int64(p.length()),
71 NumPendingChunks: int(t.pieceNumPendingChunks(i)),
72 IterPendingChunks: p.iterUndirtiedChunks,
75 t.iterPeers(func(p *Peer) {
79 if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
80 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
82 p.piecesReceivedSinceLastRequestUpdate = 0
83 rst.Peers = append(rst.Peers, request_strategy.Peer{
84 HasPiece: p.peerHasPiece,
85 MaxRequests: p.nominalMaxRequests(),
86 HasExistingRequest: func(r RequestIndex) bool {
87 return p.actualRequestState.Requests.Contains(r)
89 Choking: p.peerChoking,
90 PieceAllowedFast: func(i pieceIndex) bool {
91 return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
93 DownloadRate: p.downloadRate(),
94 Age: time.Since(p.completedHandshake),
97 ptr: uintptr(unsafe.Pointer(p)),
103 return request_strategy.Input{
105 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
109 func (cl *Client) doRequests() {
110 nextPeerStates := request_strategy.Run(cl.getRequestStrategyInput())
111 for p, state := range nextPeerStates {
112 setPeerNextRequestState(p, state)
121 func (p peerId) Uintptr() uintptr {
125 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
126 p := _p.(peerId).Peer
127 p.nextRequestState = rp
128 p.onNextRequestStateChanged()
131 type RequestIndex = request_strategy.RequestIndex
132 type chunkIndexType = request_strategy.ChunkIndex
134 func (p *Peer) applyNextRequestState() bool {
136 if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
143 var pieceOrder []piece
144 request_strategy.GetRequestablePieces(
145 p.t.cl.getRequestStrategyInput(),
146 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
147 if t.InfoHash != p.t.infoHash {
150 if !p.peerHasPiece(pieceIndex) {
153 pieceOrder = append(pieceOrder, piece{
155 endGame: rsp.Priority == PiecePriorityNow,
161 for _, endGameIter := range []bool{false, true} {
162 for _, piece := range pieceOrder {
163 tp := p.t.piece(piece.index)
164 tp.iterUndirtiedChunks(func(cs chunkIndexType) {
165 req := cs + tp.requestIndexOffset()
166 if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
170 more = p.setInterested(true)
174 if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
177 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
181 more, err = p.request(req)
186 if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
201 p.setInterested(false)
206 next := p.nextRequestState
207 current := p.actualRequestState
208 if !p.setInterested(next.Interested) {
212 cancel := roaring.AndNot(¤t.Requests, &next.Requests)
213 cancel.Iterate(func(req uint32) bool {
220 next.Requests.Iterate(func(req uint32) bool {
221 // This could happen if the peer chokes us between the next state being generated, and us
222 // trying to transmit the state.
223 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) {
227 more, err = p.request(req)