9 "github.com/RoaringBitmap/roaring"
10 "github.com/anacrolix/chansync/events"
11 "github.com/anacrolix/log"
12 "github.com/anacrolix/missinggo/v2/bitmap"
14 request_strategy "github.com/anacrolix/torrent/request-strategy"
17 // Calculate requests individually for each peer.
18 const peerRequesting = false
20 func (cl *Client) requester() {
22 update := func() events.Signaled {
26 return cl.updateRequests.Signaled()
28 minWait := time.After(100 * time.Millisecond)
29 maxWait := time.After(1000 * time.Millisecond)
31 case <-cl.closed.Done():
37 case <-cl.closed.Done():
45 func (cl *Client) tickleRequester() {
46 cl.updateRequests.Broadcast()
49 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
50 ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
51 for _, t := range cl.torrents {
53 // This would be removed if metadata is handled here. We have to guard against not
54 // knowing the piece size. If we have no info, we have no pieces too, so the end result
58 rst := request_strategy.Torrent{
60 ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize),
63 rst.Capacity = t.storage.Capacity
65 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
66 for i := range t.pieces {
68 rst.Pieces = append(rst.Pieces, request_strategy.Piece{
69 Request: !t.ignorePieceForRequests(i),
70 Priority: p.purePriority(),
71 Partial: t.piecePartiallyDownloaded(i),
72 Availability: p.availability,
73 Length: int64(p.length()),
74 NumPendingChunks: int(t.pieceNumPendingChunks(i)),
75 IterPendingChunks: p.undirtiedChunksIter(),
78 t.iterPeers(func(p *Peer) {
82 if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
83 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
85 p.piecesReceivedSinceLastRequestUpdate = 0
86 rst.Peers = append(rst.Peers, request_strategy.Peer{
87 Pieces: *p.newPeerPieces(),
88 MaxRequests: p.nominalMaxRequests(),
89 ExistingRequests: p.actualRequestState.Requests,
90 Choking: p.peerChoking,
91 PieceAllowedFast: p.peerAllowedFast,
92 DownloadRate: p.downloadRate(),
93 Age: time.Since(p.completedHandshake),
96 ptr: uintptr(unsafe.Pointer(p)),
102 return request_strategy.Input{
104 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
108 func (cl *Client) doRequests() {
109 input := cl.getRequestStrategyInput()
110 nextPeerStates := request_strategy.Run(input)
111 for p, state := range nextPeerStates {
112 setPeerNextRequestState(p, state)
117 gob.Register(peerId{})
125 func (p peerId) Uintptr() uintptr {
129 func (p peerId) GobEncode() (b []byte, _ error) {
130 *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
131 Data: uintptr(unsafe.Pointer(&p.ptr)),
132 Len: int(unsafe.Sizeof(p.ptr)),
133 Cap: int(unsafe.Sizeof(p.ptr)),
138 func (p *peerId) GobDecode(b []byte) error {
139 if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
142 ptr := unsafe.Pointer(&b[0])
143 p.ptr = *(*uintptr)(ptr)
144 log.Printf("%p", ptr)
145 dst := reflect.SliceHeader{
146 Data: uintptr(unsafe.Pointer(&p.Peer)),
147 Len: int(unsafe.Sizeof(p.Peer)),
148 Cap: int(unsafe.Sizeof(p.Peer)),
150 copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
154 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
155 p := _p.(peerId).Peer
156 p.nextRequestState = rp
157 p.onNextRequestStateChanged()
160 type RequestIndex = request_strategy.RequestIndex
161 type chunkIndexType = request_strategy.ChunkIndex
163 func (p *Peer) applyNextRequestState() bool {
165 if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
172 var pieceOrder []piece
173 request_strategy.GetRequestablePieces(
174 p.t.cl.getRequestStrategyInput(),
175 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
176 if t.InfoHash != p.t.infoHash {
179 if !p.peerHasPiece(pieceIndex) {
182 pieceOrder = append(pieceOrder, piece{
184 endGame: rsp.Priority == PiecePriorityNow,
190 for _, endGameIter := range []bool{false, true} {
191 for _, piece := range pieceOrder {
192 tp := p.t.piece(piece.index)
193 tp.iterUndirtiedChunks(func(cs chunkIndexType) {
194 req := cs + tp.requestIndexOffset()
195 if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
199 more = p.setInterested(true)
203 if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
206 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
210 more, err = p.request(req)
215 if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
230 p.setInterested(false)
235 next := p.nextRequestState
236 current := p.actualRequestState
237 if !p.setInterested(next.Interested) {
241 cancel := roaring.AndNot(¤t.Requests, &next.Requests)
242 cancel.Iterate(func(req uint32) bool {
249 next.Requests.Iterate(func(req uint32) bool {
250 // This could happen if the peer chokes us between the next state being generated, and us
251 // trying to transmit the state.
252 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) {
256 more, err = p.request(req)