-package request_strategy
+package requestStrategy
import (
- "sort"
+ "bytes"
+ "expvar"
+ g "github.com/anacrolix/generics"
"github.com/anacrolix/multiless"
- pp "github.com/anacrolix/torrent/peer_protocol"
+
+ "github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/types"
)
type (
+ RequestIndex uint32
+ ChunkIndex = RequestIndex
Request = types.Request
pieceIndex = types.PieceIndex
piecePriority = types.PiecePriority
ChunkSpec = types.ChunkSpec
)
-type ClientPieceOrder struct{}
-
-type filterTorrent struct {
- Torrent
- unverifiedBytes int64
- // Potentially shared with other torrents.
- storageLeft *int64
-}
-
-func sortFilterPieces(pieces []filterPiece) {
- sort.Slice(pieces, func(_i, _j int) bool {
- i := pieces[_i]
- j := pieces[_j]
- return multiless.New().Int(
- int(j.Priority), int(i.Priority),
- ).Bool(
- j.Partial, i.Partial,
- ).Int64(
- i.Availability, j.Availability,
- ).Int(
- i.index, j.index,
- ).Uintptr(
- i.t.StableId, j.t.StableId,
- ).MustLess()
+func pieceOrderLess(i, j *pieceRequestOrderItem) multiless.Computation {
+ return multiless.New().Int(
+ int(j.state.Priority), int(i.state.Priority),
+ // TODO: Should we match on complete here to prevent churn when availability changes?
+ ).Bool(
+ j.state.Partial, i.state.Partial,
+ ).Int(
+ // If this is done with relative availability, do we lose some determinism? If completeness
+ // is used, would that push this far enough down?
+ i.state.Availability, j.state.Availability,
+ ).Int(
+ i.key.Index, j.key.Index,
+ ).Lazy(func() multiless.Computation {
+ return multiless.New().Cmp(bytes.Compare(
+ i.key.InfoHash[:],
+ j.key.InfoHash[:],
+ ))
})
}
-type requestsPeer struct {
- Peer
- nextState PeerNextRequestState
- requestablePiecesRemaining int
-}
-
-func (rp *requestsPeer) canFitRequest() bool {
- return len(rp.nextState.Requests) < rp.MaxRequests
-}
-
-func (rp *requestsPeer) addNextRequest(r Request) {
- _, ok := rp.nextState.Requests[r]
- if ok {
- panic("should only add once")
- }
- rp.nextState.Requests[r] = struct{}{}
-}
-
-type peersForPieceRequests struct {
- requestsInPiece int
- *requestsPeer
-}
-
-func (me *peersForPieceRequests) addNextRequest(r Request) {
- me.requestsPeer.addNextRequest(r)
- me.requestsInPiece++
-}
-
-type requestablePiece struct {
- index pieceIndex
- t Torrent
- NumPendingChunks int
- IterPendingChunks ChunksIter
-}
+var packageExpvarMap = expvar.NewMap("request-strategy")
-type filterPiece struct {
- t *filterTorrent
- index pieceIndex
- Piece
-}
-
-func getRequestablePieces(input Input) (ret []requestablePiece) {
+// Calls f with requestable pieces in order.
+func GetRequestablePieces(
+ input Input, pro *PieceRequestOrder,
+ f func(ih metainfo.Hash, pieceIndex int, orderState PieceRequestOrderState),
+) {
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
- // TorrentImpl.
- storageLeft := make(map[*func() *int64]*int64)
- var pieces []filterPiece
- for _, _t := range input.Torrents {
- // TODO: We could do metainfo requests here.
- t := &filterTorrent{
- Torrent: _t,
- unverifiedBytes: 0,
- }
- key := t.Capacity
- if key != nil {
- if _, ok := storageLeft[key]; !ok {
- storageLeft[key] = (*key)()
- }
- t.storageLeft = storageLeft[key]
- }
- for i, tp := range t.Pieces {
- pieces = append(pieces, filterPiece{
- t: t,
- index: i,
- Piece: tp,
- })
- }
+ // TorrentImpl. A nil value means no capacity limit.
+ var storageLeft *int64
+ if cap, ok := input.Capacity(); ok {
+ storageLeft = &cap
}
- sortFilterPieces(pieces)
var allTorrentsUnverifiedBytes int64
- for _, piece := range pieces {
- if left := piece.t.storageLeft; left != nil {
- if *left < int64(piece.Length) {
- continue
+ var lastItem g.Option[pieceRequestOrderItem]
+ pro.tree.Scan(func(_i pieceRequestOrderItem) bool {
+ // Check that scan emits pieces in priority order.
+ if lastItem.Ok {
+ if _i.Less(&lastItem.Value) {
+ panic("scan not in order")
+ }
+ }
+ lastItem.Set(_i)
+
+ ih := _i.key.InfoHash
+ t := input.Torrent(ih)
+ pieceLength := t.PieceLength()
+ if storageLeft != nil {
+ if *storageLeft < pieceLength {
+ return false
}
- *left -= int64(piece.Length)
+ *storageLeft -= pieceLength
}
- if !piece.Request || piece.NumPendingChunks == 0 {
+ if t.IgnorePiece(_i.key.Index) {
// TODO: Clarify exactly what is verified. Stuff that's being hashed should be
// considered unverified and hold up further requests.
- continue
+ return true
}
- if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
- continue
+ if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() {
+ return true
}
- if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
- continue
- }
- piece.t.unverifiedBytes += piece.Length
- allTorrentsUnverifiedBytes += piece.Length
- ret = append(ret, requestablePiece{
- index: piece.index,
- t: piece.t.Torrent,
- NumPendingChunks: piece.NumPendingChunks,
- IterPendingChunks: piece.iterPendingChunksWrapper,
- })
- }
+ allTorrentsUnverifiedBytes += pieceLength
+ f(ih, _i.key.Index, _i.state)
+ return true
+ })
return
}
-type Input struct {
- Torrents []Torrent
- MaxUnverifiedBytes int64
-}
-
-// TODO: We could do metainfo requests here.
-func Run(input Input) map[PeerId]PeerNextRequestState {
- requestPieces := getRequestablePieces(input)
- torrents := input.Torrents
- allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
- for _, t := range torrents {
- peers := make([]*requestsPeer, 0, len(t.Peers))
- for _, p := range t.Peers {
- peers = append(peers, &requestsPeer{
- Peer: p,
- nextState: PeerNextRequestState{
- Requests: make(map[Request]struct{}),
- },
- })
- }
- allPeers[t.StableId] = peers
- }
- for _, piece := range requestPieces {
- for _, peer := range allPeers[piece.t.StableId] {
- if peer.canRequestPiece(piece.index) {
- peer.requestablePiecesRemaining++
- }
- }
- }
- for _, piece := range requestPieces {
- allocatePendingChunks(piece, allPeers[piece.t.StableId])
- }
- ret := make(map[PeerId]PeerNextRequestState)
- for _, peers := range allPeers {
- for _, rp := range peers {
- if rp.requestablePiecesRemaining != 0 {
- panic(rp.requestablePiecesRemaining)
- }
- ret[rp.Id] = rp.nextState
- }
- }
- return ret
-}
-
-func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
- peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
- for _, peer := range peers {
- peersForPiece = append(peersForPiece, &peersForPieceRequests{
- requestsInPiece: 0,
- requestsPeer: peer,
- })
- }
- defer func() {
- for _, peer := range peersForPiece {
- if peer.canRequestPiece(p.index) {
- peer.requestablePiecesRemaining--
- }
- }
- }()
- sortPeersForPiece := func(byHasRequest *Request) {
- sort.Slice(peersForPiece, func(i, j int) bool {
- ml := multiless.New().Int(
- peersForPiece[i].requestsInPiece,
- peersForPiece[j].requestsInPiece,
- ).Int(
- peersForPiece[i].requestablePiecesRemaining,
- peersForPiece[j].requestablePiecesRemaining,
- ).Float64(
- peersForPiece[j].DownloadRate,
- peersForPiece[i].DownloadRate,
- )
- if byHasRequest != nil {
- _, iHas := peersForPiece[i].nextState.Requests[*byHasRequest]
- _, jHas := peersForPiece[j].nextState.Requests[*byHasRequest]
- ml = ml.Bool(jHas, iHas)
- }
- return ml.Int64(
- int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
- // TODO: Probably peer priority can come next
- ).Uintptr(
- peersForPiece[i].Id.Uintptr(),
- peersForPiece[j].Id.Uintptr(),
- ).MustLess()
- })
- }
- preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
- p.IterPendingChunks(func(spec ChunkSpec) {
- req := Request{pp.Integer(p.index), spec}
- for _, peer := range peersForPiece {
- if h := peer.HasExistingRequest; h == nil || !h(req) {
- continue
- }
- if !peer.canFitRequest() {
- continue
- }
- if !peer.canRequestPiece(p.index) {
- continue
- }
- preallocated[spec] = peer
- peer.addNextRequest(req)
- }
- })
- pendingChunksRemaining := int(p.NumPendingChunks)
- p.IterPendingChunks(func(chunk types.ChunkSpec) {
- if _, ok := preallocated[chunk]; ok {
- return
- }
- req := Request{pp.Integer(p.index), chunk}
- defer func() { pendingChunksRemaining-- }()
- sortPeersForPiece(nil)
- for _, peer := range peersForPiece {
- if !peer.canFitRequest() {
- continue
- }
- if !peer.HasPiece(p.index) {
- continue
- }
- if !peer.pieceAllowedFastOrDefault(p.index) {
- // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
- peer.nextState.Interested = true
- if peer.Choking {
- continue
- }
- }
- peer.addNextRequest(req)
- break
- }
- })
-chunk:
- for chunk, prePeer := range preallocated {
- pendingChunksRemaining--
- req := Request{pp.Integer(p.index), chunk}
- prePeer.requestsInPiece--
- sortPeersForPiece(&req)
- delete(prePeer.nextState.Requests, req)
- for _, peer := range peersForPiece {
- if !peer.canFitRequest() {
- continue
- }
- if !peer.HasPiece(p.index) {
- continue
- }
- if !peer.pieceAllowedFastOrDefault(p.index) {
- // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
- peer.nextState.Interested = true
- if peer.Choking {
- continue
- }
- }
- peer.addNextRequest(req)
- continue chunk
- }
- }
- if pendingChunksRemaining != 0 {
- panic(pendingChunksRemaining)
- }
+type Input interface {
+ Torrent(metainfo.Hash) Torrent
+ // Storage capacity, shared among all Torrents with the same storage.TorrentCapacity pointer in
+ // their storage.Torrent references.
+ Capacity() (cap int64, capped bool)
+ // Across all the Torrents. This might be partitioned by storage capacity key now.
+ MaxUnverifiedBytes() int64
}