7 "github.com/anacrolix/missinggo/v2/bitmap"
9 "github.com/anacrolix/chansync"
10 request_strategy "github.com/anacrolix/torrent/request-strategy"
13 func (cl *Client) requester() {
15 update := func() chansync.Signaled {
19 return cl.updateRequests.Signaled()
21 minWait := time.After(100 * time.Millisecond)
22 maxWait := time.After(1000 * time.Millisecond)
24 case <-cl.closed.Done():
30 case <-cl.closed.Done():
38 func (cl *Client) tickleRequester() {
39 cl.updateRequests.Broadcast()
42 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
43 ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
44 for _, t := range cl.torrents {
46 // This would be removed if metadata is handled here.
49 rst := request_strategy.Torrent{
51 ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize),
54 rst.Capacity = t.storage.Capacity
56 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
57 for i := range t.pieces {
59 rst.Pieces = append(rst.Pieces, request_strategy.Piece{
60 Request: !t.ignorePieceForRequests(i),
61 Priority: p.purePriority(),
62 Partial: t.piecePartiallyDownloaded(i),
63 Availability: p.availability,
64 Length: int64(p.length()),
65 NumPendingChunks: int(t.pieceNumPendingChunks(i)),
66 IterPendingChunks: p.iterUndirtiedChunks,
69 t.iterPeers(func(p *Peer) {
73 if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
74 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
76 p.piecesReceivedSinceLastRequestUpdate = 0
77 rst.Peers = append(rst.Peers, request_strategy.Peer{
78 HasPiece: p.peerHasPiece,
79 MaxRequests: p.nominalMaxRequests(),
80 HasExistingRequest: func(r RequestIndex) bool {
81 return p.actualRequestState.Requests.Contains(r)
83 Choking: p.peerChoking,
84 PieceAllowedFast: func(i pieceIndex) bool {
85 return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
87 DownloadRate: p.downloadRate(),
88 Age: time.Since(p.completedHandshake),
91 ptr: uintptr(unsafe.Pointer(p)),
97 return request_strategy.Input{
99 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
103 func (cl *Client) doRequests() {
104 nextPeerStates := request_strategy.Run(cl.getRequestStrategyInput())
105 for p, state := range nextPeerStates {
106 setPeerNextRequestState(p, state)
115 func (p peerId) Uintptr() uintptr {
119 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
120 p := _p.(peerId).Peer
121 p.nextRequestState = rp
122 p.onNextRequestStateChanged()
125 type RequestIndex = request_strategy.RequestIndex
126 type chunkIndexType = request_strategy.ChunkIndex
128 func (p *Peer) applyNextRequestState() bool {
129 if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
136 var pieceOrder []piece
137 request_strategy.GetRequestablePieces(
138 p.t.cl.getRequestStrategyInput(),
139 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
140 if t.InfoHash != p.t.infoHash {
143 if !p.peerHasPiece(pieceIndex) {
146 pieceOrder = append(pieceOrder, piece{
148 endGame: rsp.Priority == PiecePriorityNow,
154 for _, endGameIter := range []bool{false, true} {
155 for _, piece := range pieceOrder {
156 tp := p.t.piece(piece.index)
157 tp.iterUndirtiedChunks(func(cs chunkIndexType) {
158 req := cs + tp.requestIndexOffset()
159 if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
163 more = p.setInterested(true)
167 if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
170 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
174 more, err = p.request(req)
179 if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
194 p.setInterested(false)