7 "github.com/anacrolix/missinggo/v2/bitmap"
9 "github.com/anacrolix/chansync"
10 request_strategy "github.com/anacrolix/torrent/request-strategy"
13 // Calculate requests individually for each peer.
14 const peerRequesting = true
16 func (cl *Client) requester() {
18 update := func() chansync.Signaled {
22 return cl.updateRequests.Signaled()
24 minWait := time.After(100 * time.Millisecond)
25 maxWait := time.After(1000 * time.Millisecond)
27 case <-cl.closed.Done():
33 case <-cl.closed.Done():
41 func (cl *Client) tickleRequester() {
42 cl.updateRequests.Broadcast()
45 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
46 ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
47 for _, t := range cl.torrents {
49 // This would be removed if metadata is handled here. We have to guard against not
50 // knowing the piece size. If we have no info, we have no pieces too, so the end result
54 rst := request_strategy.Torrent{
56 ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize),
59 rst.Capacity = t.storage.Capacity
61 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
62 for i := range t.pieces {
64 rst.Pieces = append(rst.Pieces, request_strategy.Piece{
65 Request: !t.ignorePieceForRequests(i),
66 Priority: p.purePriority(),
67 Partial: t.piecePartiallyDownloaded(i),
68 Availability: p.availability,
69 Length: int64(p.length()),
70 NumPendingChunks: int(t.pieceNumPendingChunks(i)),
71 IterPendingChunks: p.iterUndirtiedChunks,
74 t.iterPeers(func(p *Peer) {
78 if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
79 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
81 p.piecesReceivedSinceLastRequestUpdate = 0
82 rst.Peers = append(rst.Peers, request_strategy.Peer{
83 HasPiece: p.peerHasPiece,
84 MaxRequests: p.nominalMaxRequests(),
85 HasExistingRequest: func(r RequestIndex) bool {
86 return p.actualRequestState.Requests.Contains(r)
88 Choking: p.peerChoking,
89 PieceAllowedFast: func(i pieceIndex) bool {
90 return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
92 DownloadRate: p.downloadRate(),
93 Age: time.Since(p.completedHandshake),
96 ptr: uintptr(unsafe.Pointer(p)),
102 return request_strategy.Input{
104 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
108 func (cl *Client) doRequests() {
109 nextPeerStates := request_strategy.Run(cl.getRequestStrategyInput())
110 for p, state := range nextPeerStates {
111 setPeerNextRequestState(p, state)
120 func (p peerId) Uintptr() uintptr {
124 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
125 p := _p.(peerId).Peer
126 p.nextRequestState = rp
127 p.onNextRequestStateChanged()
130 type RequestIndex = request_strategy.RequestIndex
131 type chunkIndexType = request_strategy.ChunkIndex
133 func (p *Peer) applyNextRequestState() bool {
135 if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
142 var pieceOrder []piece
143 request_strategy.GetRequestablePieces(
144 p.t.cl.getRequestStrategyInput(),
145 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
146 if t.InfoHash != p.t.infoHash {
149 if !p.peerHasPiece(pieceIndex) {
152 pieceOrder = append(pieceOrder, piece{
154 endGame: rsp.Priority == PiecePriorityNow,
160 for _, endGameIter := range []bool{false, true} {
161 for _, piece := range pieceOrder {
162 tp := p.t.piece(piece.index)
163 tp.iterUndirtiedChunks(func(cs chunkIndexType) {
164 req := cs + tp.requestIndexOffset()
165 if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
169 more = p.setInterested(true)
173 if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
176 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
180 more, err = p.request(req)
185 if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
200 p.setInterested(false)
205 next := p.nextRequestState
206 current := p.actualRequestState
207 if !p.setInterested(next.Interested) {
211 current.Requests.Iterate(func(req uint32) bool {
212 if !next.Requests.Contains(req) {
221 next.Requests.Iterate(func(req uint32) bool {
222 // This could happen if the peer chokes us between the next state being generated, and us
223 // trying to transmit the state.
224 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) {
228 more, err = p.request(req)