1 package request_strategy
6 "github.com/anacrolix/multiless"
7 pp "github.com/anacrolix/torrent/peer_protocol"
8 "github.com/anacrolix/torrent/types"
12 Request = types.Request
13 pieceIndex = types.PieceIndex
14 piecePriority = types.PiecePriority
15 // This can be made into a type-param later, will be great for testing.
16 ChunkSpec = types.ChunkSpec
19 type ClientPieceOrder struct{}
21 type filterTorrent struct {
24 // Potentially shared with other torrents.
28 func sortFilterPieces(pieces []filterPiece) {
29 sort.Slice(pieces, func(_i, _j int) bool {
32 return multiless.New().Int(
33 int(j.Priority), int(i.Priority),
37 i.Availability, j.Availability,
41 i.t.StableId, j.t.StableId,
46 type requestsPeer struct {
48 nextState PeerNextRequestState
49 requestablePiecesRemaining int
52 func (rp *requestsPeer) canFitRequest() bool {
53 return len(rp.nextState.Requests) < rp.MaxRequests
56 func (rp *requestsPeer) addNextRequest(r Request) {
57 _, ok := rp.nextState.Requests[r]
59 panic("should only add once")
61 rp.nextState.Requests[r] = struct{}{}
64 type peersForPieceRequests struct {
69 func (me *peersForPieceRequests) addNextRequest(r Request) {
70 me.requestsPeer.addNextRequest(r)
74 type requestablePiece struct {
78 IterPendingChunks ChunksIter
81 type filterPiece struct {
87 func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) {
88 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
90 storageLeft := make(map[*func() *int64]*int64)
91 var pieces []filterPiece
92 for _, _t := range torrents {
93 // TODO: We could do metainfo requests here.
100 if _, ok := storageLeft[key]; !ok {
101 storageLeft[key] = (*key)()
103 t.storageLeft = storageLeft[key]
105 for i, tp := range t.Pieces {
106 pieces = append(pieces, filterPiece{
113 sortFilterPieces(pieces)
114 for _, piece := range pieces {
115 if left := piece.t.storageLeft; left != nil {
116 if *left < int64(piece.Length) {
119 *left -= int64(piece.Length)
121 if !piece.Request || piece.NumPendingChunks == 0 {
124 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
127 piece.t.unverifiedBytes += piece.Length
128 ret = append(ret, requestablePiece{
131 NumPendingChunks: piece.NumPendingChunks,
132 IterPendingChunks: piece.iterPendingChunksWrapper,
138 // TODO: We could do metainfo requests here.
139 func (requestOrder *ClientPieceOrder) DoRequests(torrents []Torrent) map[PeerId]PeerNextRequestState {
140 requestPieces := getRequestablePieces(torrents)
141 allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
142 for _, t := range torrents {
143 peers := make([]*requestsPeer, 0, len(t.Peers))
144 for _, p := range t.Peers {
145 peers = append(peers, &requestsPeer{
147 nextState: PeerNextRequestState{
148 Requests: make(map[Request]struct{}),
152 allPeers[t.StableId] = peers
154 for _, piece := range requestPieces {
155 for _, peer := range allPeers[piece.t.StableId] {
156 if peer.canRequestPiece(piece.index) {
157 peer.requestablePiecesRemaining++
161 for _, piece := range requestPieces {
162 allocatePendingChunks(piece, allPeers[piece.t.StableId])
164 ret := make(map[PeerId]PeerNextRequestState)
165 for _, peers := range allPeers {
166 for _, rp := range peers {
167 if rp.requestablePiecesRemaining != 0 {
168 panic(rp.requestablePiecesRemaining)
170 ret[rp.Id] = rp.nextState
176 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
177 peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
178 for _, peer := range peers {
179 peersForPiece = append(peersForPiece, &peersForPieceRequests{
185 for _, peer := range peersForPiece {
186 if peer.canRequestPiece(p.index) {
187 peer.requestablePiecesRemaining--
191 sortPeersForPiece := func(byHasRequest *Request) {
192 sort.Slice(peersForPiece, func(i, j int) bool {
193 ml := multiless.New().Int(
194 peersForPiece[i].requestsInPiece,
195 peersForPiece[j].requestsInPiece,
197 peersForPiece[i].requestablePiecesRemaining,
198 peersForPiece[j].requestablePiecesRemaining,
200 peersForPiece[j].DownloadRate,
201 peersForPiece[i].DownloadRate,
203 if byHasRequest != nil {
204 _, iHas := peersForPiece[i].nextState.Requests[*byHasRequest]
205 _, jHas := peersForPiece[j].nextState.Requests[*byHasRequest]
206 ml = ml.Bool(jHas, iHas)
209 int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
210 // TODO: Probably peer priority can come next
212 peersForPiece[i].Id.Uintptr(),
213 peersForPiece[j].Id.Uintptr(),
217 preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
218 p.IterPendingChunks(func(spec ChunkSpec) {
219 req := Request{pp.Integer(p.index), spec}
220 for _, peer := range peersForPiece {
221 if h := peer.HasExistingRequest; h == nil || !h(req) {
224 if !peer.canFitRequest() {
227 if !peer.canRequestPiece(p.index) {
230 preallocated[spec] = peer
231 peer.addNextRequest(req)
234 pendingChunksRemaining := int(p.NumPendingChunks)
235 p.IterPendingChunks(func(chunk types.ChunkSpec) {
236 if _, ok := preallocated[chunk]; ok {
239 req := Request{pp.Integer(p.index), chunk}
240 defer func() { pendingChunksRemaining-- }()
241 sortPeersForPiece(nil)
242 for _, peer := range peersForPiece {
243 if !peer.canFitRequest() {
246 if !peer.HasPiece(p.index) {
249 if !peer.pieceAllowedFastOrDefault(p.index) {
250 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
251 peer.nextState.Interested = true
256 peer.addNextRequest(req)
261 for chunk, prePeer := range preallocated {
262 pendingChunksRemaining--
263 req := Request{pp.Integer(p.index), chunk}
264 prePeer.requestsInPiece--
265 sortPeersForPiece(&req)
266 delete(prePeer.nextState.Requests, req)
267 for _, peer := range peersForPiece {
268 if !peer.canFitRequest() {
271 if !peer.HasPiece(p.index) {
274 if !peer.pieceAllowedFastOrDefault(p.index) {
275 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
276 peer.nextState.Interested = true
281 peer.addNextRequest(req)
285 if pendingChunksRemaining != 0 {
286 panic(pendingChunksRemaining)