1 package request_strategy
9 "github.com/anacrolix/multiless"
10 "github.com/anacrolix/torrent/metainfo"
11 "github.com/anacrolix/torrent/storage"
13 "github.com/anacrolix/torrent/types"
19 Request = types.Request
20 pieceIndex = types.PieceIndex
21 piecePriority = types.PiecePriority
22 // This can be made into a type-param later, will be great for testing.
23 ChunkSpec = types.ChunkSpec
26 type ClientPieceOrder struct{}
28 type filterTorrent struct {
31 // Potentially shared with other torrents.
35 func sortFilterPieces(pieces []filterPiece) {
36 sort.Slice(pieces, func(_i, _j int) bool {
39 return multiless.New().Int(
40 int(j.Priority), int(i.Priority),
44 i.Availability, j.Availability,
47 ).Lazy(func() multiless.Computation {
48 return multiless.New().Cmp(bytes.Compare(
56 type requestsPeer struct {
58 nextState PeerNextRequestState
59 requestablePiecesRemaining int
62 func (rp *requestsPeer) canFitRequest() bool {
63 return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
66 func (rp *requestsPeer) addNextRequest(r RequestIndex) {
67 if !rp.nextState.Requests.CheckedAdd(r) {
68 panic("should only add once")
72 type peersForPieceRequests struct {
77 func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
78 me.requestsPeer.addNextRequest(r)
82 type requestablePiece struct {
87 IterPendingChunks ChunksIterFunc
90 func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
91 return p.t.ChunksPerPiece*uint32(p.index) + c
94 type filterPiece struct {
100 // Calls f with requestable pieces in order.
101 func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
103 for i := range input.Torrents {
104 maxPieces += len(input.Torrents[i].Pieces)
106 pieces := make([]filterPiece, 0, maxPieces)
107 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
108 // TorrentImpl. A nil value means no capacity limit.
109 storageLeft := make(map[storage.TorrentCapacity]*int64)
110 for _t := range input.Torrents {
111 // TODO: We could do metainfo requests here.
113 Torrent: &input.Torrents[_t],
118 if _, ok := storageLeft[key]; !ok {
119 capacity, ok := (*key)()
121 storageLeft[key] = &capacity
123 storageLeft[key] = nil
126 t.storageLeft = storageLeft[key]
128 for i := range t.Pieces {
129 pieces = append(pieces, filterPiece{
136 sortFilterPieces(pieces)
137 var allTorrentsUnverifiedBytes int64
138 for _, piece := range pieces {
139 if left := piece.t.storageLeft; left != nil {
140 if *left < int64(piece.Length) {
143 *left -= int64(piece.Length)
145 if !piece.Request || piece.NumPendingChunks == 0 {
146 // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
147 // considered unverified and hold up further requests.
150 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
153 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
156 piece.t.unverifiedBytes += piece.Length
157 allTorrentsUnverifiedBytes += piece.Length
158 f(piece.t.Torrent, piece.Piece, piece.index)
165 MaxUnverifiedBytes int64
168 // TODO: We could do metainfo requests here.
169 func Run(input Input) map[PeerId]PeerNextRequestState {
170 var requestPieces []requestablePiece
171 GetRequestablePieces(input, func(t *Torrent, piece *Piece, pieceIndex int) {
172 requestPieces = append(requestPieces, requestablePiece{
175 NumPendingChunks: piece.NumPendingChunks,
176 IterPendingChunks: piece.iterPendingChunksWrapper,
177 alwaysReallocate: piece.Priority >= types.PiecePriorityNext,
180 torrents := input.Torrents
181 allPeers := make(map[metainfo.Hash][]*requestsPeer, len(torrents))
182 for _, t := range torrents {
183 peers := make([]*requestsPeer, 0, len(t.Peers))
184 for _, p := range t.Peers {
185 peers = append(peers, &requestsPeer{
189 allPeers[t.InfoHash] = peers
191 for _, piece := range requestPieces {
192 for _, peer := range allPeers[piece.t.InfoHash] {
193 if peer.canRequestPiece(piece.index) {
194 peer.requestablePiecesRemaining++
198 for _, piece := range requestPieces {
199 allocatePendingChunks(piece, allPeers[piece.t.InfoHash])
201 ret := make(map[PeerId]PeerNextRequestState)
202 for _, peers := range allPeers {
203 for _, rp := range peers {
204 if rp.requestablePiecesRemaining != 0 {
205 panic(rp.requestablePiecesRemaining)
207 if _, ok := ret[rp.Id]; ok {
208 panic(fmt.Sprintf("duplicate peer id: %v", rp.Id))
210 ret[rp.Id] = rp.nextState
216 // Checks that a sorted peersForPiece slice makes sense.
217 func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
218 if !sort.IsSorted(peers) {
221 peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
222 for _, p := range peers.peersForPiece {
223 if _, ok := peerMap[p]; ok {
226 peerMap[p] = struct{}{}
230 var peersForPiecesPool sync.Pool
232 func makePeersForPiece(cap int) []*peersForPieceRequests {
233 got := peersForPiecesPool.Get()
235 return make([]*peersForPieceRequests, 0, cap)
237 return got.([]*peersForPieceRequests)[:0]
240 type peersForPieceSorter struct {
241 peersForPiece []*peersForPieceRequests
246 func (me *peersForPieceSorter) Len() int {
247 return len(me.peersForPiece)
250 func (me *peersForPieceSorter) Swap(i, j int) {
251 me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
254 func (me *peersForPieceSorter) Less(_i, _j int) bool {
255 i := me.peersForPiece[_i]
256 j := me.peersForPiece[_j]
259 byHasRequest := func() multiless.Computation {
260 ml := multiless.New()
262 iHas := i.nextState.Requests.Contains(*req)
263 jHas := j.nextState.Requests.Contains(*req)
264 ml = ml.Bool(jHas, iHas)
268 ml := multiless.New()
269 // We always "reallocate", that is force even striping amongst peers that are either on
270 // the last piece they can contribute too, or for pieces marked for this behaviour.
271 // Striping prevents starving peers of requests, and will always re-balance to the
272 // fastest known peers.
273 if !p.alwaysReallocate {
275 j.requestablePiecesRemaining == 1,
276 i.requestablePiecesRemaining == 1)
278 if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
283 ml = ml.AndThen(byHasRequest)
286 i.requestablePiecesRemaining,
287 j.requestablePiecesRemaining,
295 ml = ml.AndThen(byHasRequest)
297 int64(j.Age), int64(i.Age),
298 // TODO: Probably peer priority can come next
305 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
306 peersForPiece := makePeersForPiece(len(peers))
307 for _, peer := range peers {
308 if !peer.canRequestPiece(p.index) {
311 if !peer.canFitRequest() {
312 peer.requestablePiecesRemaining--
315 peersForPiece = append(peersForPiece, &peersForPieceRequests{
321 for _, peer := range peersForPiece {
322 peer.requestablePiecesRemaining--
324 peersForPiecesPool.Put(peersForPiece)
326 peersForPieceSorter := peersForPieceSorter{
327 peersForPiece: peersForPiece,
330 sortPeersForPiece := func(req *RequestIndex) {
331 peersForPieceSorter.req = req
332 sort.Sort(&peersForPieceSorter)
333 // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
335 // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
336 // with "next" request state before another request strategy run occurs.
337 preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
338 p.IterPendingChunks(func(spec ChunkIndex) {
339 req := p.chunkIndexToRequestIndex(spec)
340 for _, peer := range peersForPiece {
341 if !peer.ExistingRequests.Contains(req) {
344 if !peer.canFitRequest() {
347 preallocated[spec] = append(preallocated[spec], peer)
348 peer.addNextRequest(req)
351 pendingChunksRemaining := int(p.NumPendingChunks)
352 p.IterPendingChunks(func(chunk ChunkIndex) {
353 if len(preallocated[chunk]) != 0 {
356 req := p.chunkIndexToRequestIndex(chunk)
357 defer func() { pendingChunksRemaining-- }()
358 sortPeersForPiece(nil)
359 for _, peer := range peersForPiece {
360 if !peer.canFitRequest() {
363 if !peer.PieceAllowedFast.ContainsInt(p.index) {
364 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
365 peer.nextState.Interested = true
370 peer.addNextRequest(req)
375 for chunk, prePeers := range preallocated {
376 if len(prePeers) == 0 {
379 pendingChunksRemaining--
380 req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
381 for _, pp := range prePeers {
384 sortPeersForPiece(&req)
385 for _, pp := range prePeers {
386 pp.nextState.Requests.Remove(req)
388 for _, peer := range peersForPiece {
389 if !peer.canFitRequest() {
392 if !peer.PieceAllowedFast.ContainsInt(p.index) {
393 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
394 peer.nextState.Interested = true
399 peer.addNextRequest(req)
403 if pendingChunksRemaining != 0 {
404 panic(pendingChunksRemaining)