1 package request_strategy
6 "github.com/anacrolix/multiless"
7 pp "github.com/anacrolix/torrent/peer_protocol"
8 "github.com/anacrolix/torrent/types"
12 Request = types.Request
13 pieceIndex = types.PieceIndex
14 piecePriority = types.PiecePriority
15 // This can be made into a type-param later, will be great for testing.
16 ChunkSpec = types.ChunkSpec
19 type ClientPieceOrder struct{}
21 type filterTorrent struct {
24 // Potentially shared with other torrents.
28 func sortFilterPieces(pieces []filterPiece) {
29 sort.Slice(pieces, func(_i, _j int) bool {
32 return multiless.New().Int(
33 int(j.Priority), int(i.Priority),
37 i.Availability, j.Availability,
41 i.t.StableId, j.t.StableId,
46 type requestsPeer struct {
48 nextState PeerNextRequestState
49 requestablePiecesRemaining int
52 func (rp *requestsPeer) canFitRequest() bool {
53 return len(rp.nextState.Requests) < rp.MaxRequests
56 func (rp *requestsPeer) addNextRequest(r Request) {
57 _, ok := rp.nextState.Requests[r]
59 panic("should only add once")
61 rp.nextState.Requests[r] = struct{}{}
64 type peersForPieceRequests struct {
69 func (me *peersForPieceRequests) addNextRequest(r Request) {
70 me.requestsPeer.addNextRequest(r)
74 type requestablePiece struct {
78 IterPendingChunks ChunksIter
81 type filterPiece struct {
87 func getRequestablePieces(input Input) (ret []requestablePiece) {
88 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
90 storageLeft := make(map[*func() *int64]*int64)
91 var pieces []filterPiece
92 for _, _t := range input.Torrents {
93 // TODO: We could do metainfo requests here.
100 if _, ok := storageLeft[key]; !ok {
101 storageLeft[key] = (*key)()
103 t.storageLeft = storageLeft[key]
105 for i, tp := range t.Pieces {
106 pieces = append(pieces, filterPiece{
113 sortFilterPieces(pieces)
114 var allTorrentsUnverifiedBytes int64
115 for _, piece := range pieces {
116 if left := piece.t.storageLeft; left != nil {
117 if *left < int64(piece.Length) {
120 *left -= int64(piece.Length)
122 if !piece.Request || piece.NumPendingChunks == 0 {
123 // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
124 // considered unverified and hold up further requests.
127 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
130 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
133 piece.t.unverifiedBytes += piece.Length
134 allTorrentsUnverifiedBytes += piece.Length
135 ret = append(ret, requestablePiece{
138 NumPendingChunks: piece.NumPendingChunks,
139 IterPendingChunks: piece.iterPendingChunksWrapper,
147 MaxUnverifiedBytes int64
150 // TODO: We could do metainfo requests here.
151 func Run(input Input) map[PeerId]PeerNextRequestState {
152 requestPieces := getRequestablePieces(input)
153 torrents := input.Torrents
154 allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
155 for _, t := range torrents {
156 peers := make([]*requestsPeer, 0, len(t.Peers))
157 for _, p := range t.Peers {
158 peers = append(peers, &requestsPeer{
160 nextState: PeerNextRequestState{
161 Requests: make(map[Request]struct{}),
165 allPeers[t.StableId] = peers
167 for _, piece := range requestPieces {
168 for _, peer := range allPeers[piece.t.StableId] {
169 if peer.canRequestPiece(piece.index) {
170 peer.requestablePiecesRemaining++
174 for _, piece := range requestPieces {
175 allocatePendingChunks(piece, allPeers[piece.t.StableId])
177 ret := make(map[PeerId]PeerNextRequestState)
178 for _, peers := range allPeers {
179 for _, rp := range peers {
180 if rp.requestablePiecesRemaining != 0 {
181 panic(rp.requestablePiecesRemaining)
183 ret[rp.Id] = rp.nextState
189 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
190 peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
191 for _, peer := range peers {
192 peersForPiece = append(peersForPiece, &peersForPieceRequests{
198 for _, peer := range peersForPiece {
199 if peer.canRequestPiece(p.index) {
200 peer.requestablePiecesRemaining--
204 sortPeersForPiece := func(byHasRequest *Request) {
205 sort.Slice(peersForPiece, func(i, j int) bool {
206 ml := multiless.New().Int(
207 peersForPiece[i].requestsInPiece,
208 peersForPiece[j].requestsInPiece,
210 peersForPiece[i].requestablePiecesRemaining,
211 peersForPiece[j].requestablePiecesRemaining,
213 peersForPiece[j].DownloadRate,
214 peersForPiece[i].DownloadRate,
216 if byHasRequest != nil {
217 _, iHas := peersForPiece[i].nextState.Requests[*byHasRequest]
218 _, jHas := peersForPiece[j].nextState.Requests[*byHasRequest]
219 ml = ml.Bool(jHas, iHas)
222 int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
223 // TODO: Probably peer priority can come next
225 peersForPiece[i].Id.Uintptr(),
226 peersForPiece[j].Id.Uintptr(),
230 preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
231 p.IterPendingChunks(func(spec ChunkSpec) {
232 req := Request{pp.Integer(p.index), spec}
233 for _, peer := range peersForPiece {
234 if h := peer.HasExistingRequest; h == nil || !h(req) {
237 if !peer.canFitRequest() {
240 if !peer.canRequestPiece(p.index) {
243 preallocated[spec] = peer
244 peer.addNextRequest(req)
247 pendingChunksRemaining := int(p.NumPendingChunks)
248 p.IterPendingChunks(func(chunk types.ChunkSpec) {
249 if _, ok := preallocated[chunk]; ok {
252 req := Request{pp.Integer(p.index), chunk}
253 defer func() { pendingChunksRemaining-- }()
254 sortPeersForPiece(nil)
255 for _, peer := range peersForPiece {
256 if !peer.canFitRequest() {
259 if !peer.HasPiece(p.index) {
262 if !peer.pieceAllowedFastOrDefault(p.index) {
263 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
264 peer.nextState.Interested = true
269 peer.addNextRequest(req)
274 for chunk, prePeer := range preallocated {
275 pendingChunksRemaining--
276 req := Request{pp.Integer(p.index), chunk}
277 prePeer.requestsInPiece--
278 sortPeersForPiece(&req)
279 delete(prePeer.nextState.Requests, req)
280 for _, peer := range peersForPiece {
281 if !peer.canFitRequest() {
284 if !peer.HasPiece(p.index) {
287 if !peer.pieceAllowedFastOrDefault(p.index) {
288 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
289 peer.nextState.Interested = true
294 peer.addNextRequest(req)
298 if pendingChunksRemaining != 0 {
299 panic(pendingChunksRemaining)