7 "github.com/anacrolix/missinggo/v2/bitmap"
8 pp "github.com/anacrolix/torrent/peer_protocol"
10 "github.com/anacrolix/chansync"
11 request_strategy "github.com/anacrolix/torrent/request-strategy"
14 func (cl *Client) requester() {
16 update := func() chansync.Signaled {
20 return cl.updateRequests.Signaled()
22 minWait := time.After(100 * time.Millisecond)
23 maxWait := time.After(1000 * time.Millisecond)
25 case <-cl.closed.Done():
31 case <-cl.closed.Done():
39 func (cl *Client) tickleRequester() {
40 cl.updateRequests.Broadcast()
43 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
44 ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
45 for _, t := range cl.torrents {
46 rst := request_strategy.Torrent{
50 rst.Capacity = t.storage.Capacity
52 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
53 for i := range t.pieces {
55 rst.Pieces = append(rst.Pieces, request_strategy.Piece{
56 Request: !t.ignorePieceForRequests(i),
57 Priority: p.purePriority(),
58 Partial: t.piecePartiallyDownloaded(i),
59 Availability: p.availability,
60 Length: int64(p.length()),
61 NumPendingChunks: int(t.pieceNumPendingChunks(i)),
62 IterPendingChunks: p.iterUndirtiedChunks,
65 t.iterPeers(func(p *Peer) {
69 if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
70 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
72 p.piecesReceivedSinceLastRequestUpdate = 0
73 rst.Peers = append(rst.Peers, request_strategy.Peer{
74 HasPiece: p.peerHasPiece,
75 MaxRequests: p.nominalMaxRequests(),
76 HasExistingRequest: func(r request_strategy.Request) bool {
77 _, ok := p.actualRequestState.Requests[r]
80 Choking: p.peerChoking,
81 PieceAllowedFast: func(i pieceIndex) bool {
82 return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
84 DownloadRate: p.downloadRate(),
85 Age: time.Since(p.completedHandshake),
88 ptr: uintptr(unsafe.Pointer(p)),
94 return request_strategy.Input{
96 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
100 func (cl *Client) doRequests() {
101 nextPeerStates := request_strategy.Run(cl.getRequestStrategyInput())
102 for p, state := range nextPeerStates {
103 setPeerNextRequestState(p, state)
112 func (p peerId) Uintptr() uintptr {
116 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
117 p := _p.(peerId).Peer
118 p.nextRequestState = rp
119 p.onNextRequestStateChanged()
122 func (p *Peer) applyNextRequestState() bool {
123 if len(p.actualRequestState.Requests) > p.nominalMaxRequests()/2 {
130 var pieceOrder []piece
131 request_strategy.GetRequestablePieces(
132 p.t.cl.getRequestStrategyInput(),
133 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
134 if t.InfoHash != p.t.infoHash {
137 if !p.peerHasPiece(pieceIndex) {
140 pieceOrder = append(pieceOrder, piece{
142 endGame: rsp.Priority == PiecePriorityNow,
148 for _, endGameIter := range []bool{false, true} {
149 for _, piece := range pieceOrder {
150 tp := p.t.piece(piece.index)
151 tp.iterUndirtiedChunks(func(cs ChunkSpec) {
152 req := Request{pp.Integer(piece.index), cs}
153 if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
157 more = p.setInterested(true)
161 if len(p.actualRequestState.Requests) >= p.nominalMaxRequests() {
164 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req.Index)) {
168 more, err = p.request(req)
173 if interested && len(p.actualRequestState.Requests) >= p.nominalMaxRequests() {
188 p.setInterested(false)