1 package request_strategy
7 "github.com/anacrolix/multiless"
8 pp "github.com/anacrolix/torrent/peer_protocol"
9 "github.com/anacrolix/torrent/types"
13 Request = types.Request
14 pieceIndex = types.PieceIndex
15 piecePriority = types.PiecePriority
16 // This can be made into a type-param later, will be great for testing.
17 ChunkSpec = types.ChunkSpec
20 type ClientPieceOrder struct {
21 pieces []pieceRequestOrderPiece
24 type pieceRequestOrderPiece struct {
30 func (me *ClientPieceOrder) Len() int {
34 func (me ClientPieceOrder) sort() {
35 sort.Slice(me.pieces, me.less)
38 func (me ClientPieceOrder) less(_i, _j int) bool {
41 return multiless.New().Int(
42 int(j.Priority), int(i.Priority),
45 ).Int64(i.Availability, j.Availability).Int(i.index, j.index).Less()
48 type requestsPeer struct {
50 nextState PeerNextRequestState
51 requestablePiecesRemaining int
54 func (rp *requestsPeer) canFitRequest() bool {
55 return len(rp.nextState.Requests) < rp.MaxRequests
58 // Returns true if it is added and wasn't there before.
59 func (rp *requestsPeer) addNextRequest(r Request) bool {
60 _, ok := rp.nextState.Requests[r]
64 rp.nextState.Requests[r] = struct{}{}
68 type peersForPieceRequests struct {
73 func (me *peersForPieceRequests) addNextRequest(r Request) {
74 if me.requestsPeer.addNextRequest(r) {
81 Capacity *func() *int64
82 Peers []Peer // not closed.
85 func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState {
86 requestOrder.pieces = requestOrder.pieces[:0]
87 allPeers := make(map[*Torrent][]*requestsPeer)
88 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
90 storageLeft := make(map[*func() *int64]*int64)
91 for _, t := range torrents {
92 // TODO: We could do metainfo requests here.
95 if _, ok := storageLeft[key]; !ok {
96 storageLeft[key] = (*key)()
99 var peers []*requestsPeer
100 for _, p := range t.Peers {
101 peers = append(peers, &requestsPeer{
103 nextState: PeerNextRequestState{
104 Requests: make(map[Request]struct{}),
108 for i, tp := range t.Pieces {
109 requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
115 for _, p := range peers {
116 if p.canRequestPiece(i) {
117 p.requestablePiecesRemaining++
125 for _, p := range requestOrder.pieces {
127 if left := storageLeft[p.t.Capacity]; left != nil {
128 if *left < int64(torrentPiece.Length) {
131 *left -= int64(torrentPiece.Length)
136 allocatePendingChunks(p, allPeers[p.t])
138 ret := make(map[PeerId]PeerNextRequestState)
139 for _, peers := range allPeers {
140 for _, rp := range peers {
141 if rp.requestablePiecesRemaining != 0 {
142 panic(rp.requestablePiecesRemaining)
144 ret[rp.Id] = rp.nextState
150 func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
151 peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
152 for _, peer := range peers {
153 peersForPiece = append(peersForPiece, &peersForPieceRequests{
158 sortPeersForPiece := func() {
159 sort.Slice(peersForPiece, func(i, j int) bool {
160 return multiless.New().Int(
161 peersForPiece[i].requestsInPiece,
162 peersForPiece[j].requestsInPiece,
164 peersForPiece[i].requestablePiecesRemaining,
165 peersForPiece[j].requestablePiecesRemaining,
167 peersForPiece[j].DownloadRate,
168 peersForPiece[i].DownloadRate,
170 int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
171 // TODO: Probably peer priority can come next
173 peersForPiece[i].Id.Uintptr(),
174 peersForPiece[j].Id.Uintptr(),
178 pendingChunksRemaining := int(p.NumPendingChunks)
179 if f := p.IterPendingChunks; f != nil {
180 f(func(chunk types.ChunkSpec) {
181 req := Request{pp.Integer(p.index), chunk}
182 defer func() { pendingChunksRemaining-- }()
185 // Try up to the number of peers that could legitimately receive the request equal to
186 // the number of chunks left. This should ensure that only the best peers serve the last
187 // few chunks in a piece.
188 lowestNumRequestsInPiece := math.MaxInt16
189 for _, peer := range peersForPiece {
190 if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) {
193 if skipped+1 >= pendingChunksRemaining {
196 if f := peer.HasExistingRequest; f == nil || !f(req) {
198 lowestNumRequestsInPiece = peer.requestsInPiece
201 if peer.requestsInPiece > lowestNumRequestsInPiece {
204 if !peer.pieceAllowedFastOrDefault(p.index) {
205 // We must stay interested for this.
206 peer.nextState.Interested = true
208 peer.addNextRequest(req)
211 for _, peer := range peersForPiece {
212 if !peer.canFitRequest() {
215 if !peer.HasPiece(p.index) {
218 if !peer.pieceAllowedFastOrDefault(p.index) {
219 // TODO: Verify that's okay to stay uninterested if we request allowed fast
221 peer.nextState.Interested = true
226 peer.addNextRequest(req)
231 if pendingChunksRemaining != 0 {
232 panic(pendingChunksRemaining)
234 for _, peer := range peersForPiece {
235 if peer.canRequestPiece(p.index) {
236 peer.requestablePiecesRemaining--