1 package request_strategy
8 "github.com/anacrolix/multiless"
9 "github.com/anacrolix/torrent/storage"
11 pp "github.com/anacrolix/torrent/peer_protocol"
12 "github.com/anacrolix/torrent/types"
16 Request = types.Request
17 pieceIndex = types.PieceIndex
18 piecePriority = types.PiecePriority
19 // This can be made into a type-param later, will be great for testing.
20 ChunkSpec = types.ChunkSpec
23 type ClientPieceOrder struct{}
25 type filterTorrent struct {
28 // Potentially shared with other torrents.
32 func sortFilterPieces(pieces []filterPiece) {
33 sort.Slice(pieces, func(_i, _j int) bool {
36 return multiless.New().Int(
37 int(j.Priority), int(i.Priority),
41 i.Availability, j.Availability,
45 i.t.StableId, j.t.StableId,
50 type requestsPeer struct {
52 nextState PeerNextRequestState
53 requestablePiecesRemaining int
56 func (rp *requestsPeer) canFitRequest() bool {
57 return len(rp.nextState.Requests) < rp.MaxRequests
60 func (rp *requestsPeer) addNextRequest(r Request) {
61 _, ok := rp.nextState.Requests[r]
63 panic("should only add once")
65 rp.nextState.Requests[r] = struct{}{}
68 type peersForPieceRequests struct {
73 func (me *peersForPieceRequests) addNextRequest(r Request) {
74 me.requestsPeer.addNextRequest(r)
78 type requestablePiece struct {
83 IterPendingChunks ChunksIter
86 type filterPiece struct {
92 func getRequestablePieces(input Input) (ret []requestablePiece) {
94 for i := range input.Torrents {
95 maxPieces += len(input.Torrents[i].Pieces)
97 pieces := make([]filterPiece, 0, maxPieces)
98 ret = make([]requestablePiece, 0, maxPieces)
99 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
100 // TorrentImpl. A nil value means no capacity limit.
101 storageLeft := make(map[storage.TorrentCapacity]*int64)
102 for _t := range input.Torrents {
103 // TODO: We could do metainfo requests here.
105 Torrent: &input.Torrents[_t],
110 if _, ok := storageLeft[key]; !ok {
111 capacity, ok := (*key)()
113 storageLeft[key] = &capacity
115 storageLeft[key] = nil
118 t.storageLeft = storageLeft[key]
120 for i := range t.Pieces {
121 pieces = append(pieces, filterPiece{
128 sortFilterPieces(pieces)
129 var allTorrentsUnverifiedBytes int64
130 for _, piece := range pieces {
131 if left := piece.t.storageLeft; left != nil {
132 if *left < int64(piece.Length) {
135 *left -= int64(piece.Length)
137 if !piece.Request || piece.NumPendingChunks == 0 {
138 // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
139 // considered unverified and hold up further requests.
142 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
145 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
148 piece.t.unverifiedBytes += piece.Length
149 allTorrentsUnverifiedBytes += piece.Length
150 ret = append(ret, requestablePiece{
153 NumPendingChunks: piece.NumPendingChunks,
154 IterPendingChunks: piece.iterPendingChunksWrapper,
155 alwaysReallocate: piece.Priority >= types.PiecePriorityNext,
163 MaxUnverifiedBytes int64
166 // TODO: We could do metainfo requests here.
167 func Run(input Input) map[PeerId]PeerNextRequestState {
168 requestPieces := getRequestablePieces(input)
169 torrents := input.Torrents
170 allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
171 for _, t := range torrents {
172 peers := make([]*requestsPeer, 0, len(t.Peers))
173 for _, p := range t.Peers {
174 peers = append(peers, &requestsPeer{
176 nextState: PeerNextRequestState{
177 Requests: make(map[Request]struct{}, p.MaxRequests),
181 allPeers[t.StableId] = peers
183 for _, piece := range requestPieces {
184 for _, peer := range allPeers[piece.t.StableId] {
185 if peer.canRequestPiece(piece.index) {
186 peer.requestablePiecesRemaining++
190 for _, piece := range requestPieces {
191 allocatePendingChunks(piece, allPeers[piece.t.StableId])
193 ret := make(map[PeerId]PeerNextRequestState)
194 for _, peers := range allPeers {
195 for _, rp := range peers {
196 if rp.requestablePiecesRemaining != 0 {
197 panic(rp.requestablePiecesRemaining)
199 if _, ok := ret[rp.Id]; ok {
200 panic(fmt.Sprintf("duplicate peer id: %v", rp.Id))
202 ret[rp.Id] = rp.nextState
208 // Checks that a sorted peersForPiece slice makes sense.
209 func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
210 if !sort.IsSorted(peers) {
213 peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
214 for _, p := range peers.peersForPiece {
215 if _, ok := peerMap[p]; ok {
218 peerMap[p] = struct{}{}
222 var peersForPiecesPool sync.Pool
224 func makePeersForPiece(cap int) []*peersForPieceRequests {
225 got := peersForPiecesPool.Get()
227 return make([]*peersForPieceRequests, 0, cap)
229 return got.([]*peersForPieceRequests)[:0]
232 type peersForPieceSorter struct {
233 peersForPiece []*peersForPieceRequests
238 func (me *peersForPieceSorter) Len() int {
239 return len(me.peersForPiece)
242 func (me *peersForPieceSorter) Swap(i, j int) {
243 me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
246 func (me *peersForPieceSorter) Less(_i, _j int) bool {
247 i := me.peersForPiece[_i]
248 j := me.peersForPiece[_j]
251 byHasRequest := func() multiless.Computation {
252 ml := multiless.New()
254 _, iHas := i.nextState.Requests[*req]
255 _, jHas := j.nextState.Requests[*req]
256 ml = ml.Bool(jHas, iHas)
260 ml := multiless.New()
261 // We always "reallocate", that is force even striping amongst peers that are either on
262 // the last piece they can contribute too, or for pieces marked for this behaviour.
263 // Striping prevents starving peers of requests, and will always re-balance to the
264 // fastest known peers.
265 if !p.alwaysReallocate {
267 j.requestablePiecesRemaining == 1,
268 i.requestablePiecesRemaining == 1)
270 if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
275 ml = ml.AndThen(byHasRequest)
278 i.requestablePiecesRemaining,
279 j.requestablePiecesRemaining,
287 ml = ml.AndThen(byHasRequest)
289 int64(j.Age), int64(i.Age),
290 // TODO: Probably peer priority can come next
297 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
298 peersForPiece := makePeersForPiece(len(peers))
299 for _, peer := range peers {
300 if !peer.canRequestPiece(p.index) {
303 if !peer.canFitRequest() {
304 peer.requestablePiecesRemaining--
307 peersForPiece = append(peersForPiece, &peersForPieceRequests{
313 for _, peer := range peersForPiece {
314 if peer.canRequestPiece(p.index) {
315 peer.requestablePiecesRemaining--
318 peersForPiecesPool.Put(peersForPiece)
320 peersForPieceSorter := peersForPieceSorter{
321 peersForPiece: peersForPiece,
324 sortPeersForPiece := func(req *Request) {
325 peersForPieceSorter.req = req
326 sort.Sort(&peersForPieceSorter)
327 //ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
329 // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
330 // with "next" request state before another request strategy run occurs.
331 preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks)
332 p.IterPendingChunks(func(spec ChunkSpec) {
333 req := Request{pp.Integer(p.index), spec}
334 for _, peer := range peersForPiece {
335 if h := peer.HasExistingRequest; h == nil || !h(req) {
338 if !peer.canFitRequest() {
341 if !peer.canRequestPiece(p.index) {
344 preallocated[spec] = append(preallocated[spec], peer)
345 peer.addNextRequest(req)
348 pendingChunksRemaining := int(p.NumPendingChunks)
349 p.IterPendingChunks(func(chunk types.ChunkSpec) {
350 if _, ok := preallocated[chunk]; ok {
353 req := Request{pp.Integer(p.index), chunk}
354 defer func() { pendingChunksRemaining-- }()
355 sortPeersForPiece(nil)
356 for _, peer := range peersForPiece {
357 if !peer.canFitRequest() {
360 if !peer.HasPiece(p.index) {
363 if !peer.pieceAllowedFastOrDefault(p.index) {
364 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
365 peer.nextState.Interested = true
370 peer.addNextRequest(req)
375 for chunk, prePeers := range preallocated {
376 pendingChunksRemaining--
377 req := Request{pp.Integer(p.index), chunk}
378 for _, pp := range prePeers {
381 sortPeersForPiece(&req)
382 for _, pp := range prePeers {
383 delete(pp.nextState.Requests, req)
385 for _, peer := range peersForPiece {
386 if !peer.canFitRequest() {
389 if !peer.HasPiece(p.index) {
392 if !peer.pieceAllowedFastOrDefault(p.index) {
393 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
394 peer.nextState.Interested = true
399 peer.addNextRequest(req)
403 if pendingChunksRemaining != 0 {
404 panic(pendingChunksRemaining)