8 "github.com/anacrolix/multiless"
9 pp "github.com/anacrolix/torrent/peer_protocol"
10 "github.com/bradfitz/iter"
13 type clientPieceRequestOrder struct {
14 pieces []pieceRequestOrderPiece
17 type pieceRequestOrderPiece struct {
26 func (me *clientPieceRequestOrder) Len() int {
30 func (me clientPieceRequestOrder) sort() {
31 sort.Slice(me.pieces, me.less)
34 func (me clientPieceRequestOrder) less(_i, _j int) bool {
37 return multiless.New().Int(
38 int(j.prio), int(i.prio),
41 ).Int64(i.availability, j.availability).Int(i.index, j.index).Less()
44 func (cl *Client) requester() {
52 case <-cl.closed.LockedChan(cl.locker()):
54 case <-time.After(100 * time.Millisecond):
59 type requestsPeer struct {
61 nextRequests map[Request]struct{}
63 requestablePiecesRemaining int
66 func (rp *requestsPeer) canRequestPiece(p pieceIndex) bool {
67 return rp.hasPiece(p) && (!rp.choking() || rp.pieceAllowedFast(p))
70 func (rp *requestsPeer) hasPiece(i pieceIndex) bool {
71 return rp.cur.peerHasPiece(i)
74 func (rp *requestsPeer) pieceAllowedFast(p pieceIndex) bool {
75 return rp.cur.peerAllowedFast.Contains(p)
78 func (rp *requestsPeer) choking() bool {
79 return rp.cur.peerChoking
82 func (rp *requestsPeer) hasExistingRequest(r Request) bool {
83 _, ok := rp.cur.requests[r]
87 func (rp *requestsPeer) canFitRequest() bool {
88 return len(rp.nextRequests) < rp.cur.nominalMaxRequests()
91 // Returns true if it is added and wasn't there before.
92 func (rp *requestsPeer) addNextRequest(r Request) bool {
93 _, ok := rp.nextRequests[r]
97 rp.nextRequests[r] = struct{}{}
101 type peersForPieceRequests struct {
106 func (me *peersForPieceRequests) addNextRequest(r Request) {
107 if me.requestsPeer.addNextRequest(r) {
113 func (cl *Client) doRequests() {
114 requestOrder := &cl.pieceRequestOrder
115 requestOrder.pieces = requestOrder.pieces[:0]
116 allPeers := make(map[*Torrent][]*requestsPeer)
117 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
119 storageLeft := make(map[*func() *int64]*int64)
120 for _, t := range cl.torrents {
121 // TODO: We could do metainfo requests here.
125 key := t.storage.Capacity
127 if _, ok := storageLeft[key]; !ok {
128 storageLeft[key] = (*key)()
131 var peers []*requestsPeer
132 t.iterPeers(func(p *Peer) {
133 if !p.closed.IsSet() {
134 peers = append(peers, &requestsPeer{
136 nextRequests: make(map[Request]struct{}),
140 for i := range iter.N(t.numPieces()) {
142 pp := tp.purePriority()
143 request := !t.ignorePieceForRequests(i)
144 requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
148 partial: t.piecePartiallyDownloaded(i),
149 availability: tp.availability,
153 for _, p := range peers {
154 if p.canRequestPiece(i) {
155 p.requestablePiecesRemaining++
163 for _, p := range requestOrder.pieces {
164 torrentPiece := p.t.piece(p.index)
165 if left := storageLeft[p.t.storage.Capacity]; left != nil {
166 if *left < int64(torrentPiece.length()) {
169 *left -= int64(torrentPiece.length())
174 peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
175 for _, peer := range allPeers[p.t] {
176 peersForPiece = append(peersForPiece, &peersForPieceRequests{
181 sortPeersForPiece := func() {
182 sort.Slice(peersForPiece, func(i, j int) bool {
183 return multiless.New().Bool(
184 peersForPiece[j].canFitRequest(),
185 peersForPiece[i].canFitRequest(),
187 peersForPiece[i].requestsInPiece,
188 peersForPiece[j].requestsInPiece,
190 peersForPiece[i].requestablePiecesRemaining,
191 peersForPiece[j].requestablePiecesRemaining,
193 peersForPiece[j].cur.downloadRate(),
194 peersForPiece[i].cur.downloadRate(),
196 peersForPiece[i].cur.completedHandshake.Equal(peersForPiece[j].cur.completedHandshake),
197 peersForPiece[i].cur.completedHandshake.Before(peersForPiece[j].cur.completedHandshake),
198 // TODO: Probably peer priority can come next
200 uintptr(unsafe.Pointer(peersForPiece[j].cur)),
201 uintptr(unsafe.Pointer(peersForPiece[i].cur)),
205 pendingChunksRemaining := int(p.t.pieceNumPendingChunks(p.index))
206 torrentPiece.iterUndirtiedChunks(func(chunk ChunkSpec) bool {
207 req := Request{pp.Integer(p.index), chunk}
208 pendingChunksRemaining--
210 for i, peer := range peersForPiece {
211 if i > pendingChunksRemaining {
214 if peer.hasExistingRequest(req) && peer.canFitRequest() {
215 peer.addNextRequest(req)
219 for _, peer := range peersForPiece {
220 if !peer.canFitRequest() {
223 if !peer.hasPiece(p.index) {
226 if !peer.pieceAllowedFast(p.index) {
227 // TODO: Verify that's okay to stay uninterested if we request allowed fast
229 peer.nextInterest = true
234 peer.addNextRequest(req)
239 for _, peer := range peersForPiece {
240 if peer.canRequestPiece(p.index) {
241 peer.requestablePiecesRemaining--
245 for _, peers := range allPeers {
246 for _, rp := range peers {
247 if rp.requestablePiecesRemaining != 0 {
248 panic(rp.requestablePiecesRemaining)
250 applyPeerNextRequests(rp)
255 func applyPeerNextRequests(rp *requestsPeer) {
257 p.setInterested(rp.nextInterest)
258 for req := range p.requests {
259 if _, ok := rp.nextRequests[req]; !ok {
263 for req := range rp.nextRequests {
264 err := p.request(req)