]> Sergey Matveev's repositories - btrtrc.git/blobdiff - request-strategy/order.go
Drop support for go 1.20
[btrtrc.git] / request-strategy / order.go
index 1e4228680c303e56cc087c0281e0fa9ac508d43a..df656f6db06c941d0ba4e442cee472b2ec038784 100644 (file)
@@ -1,19 +1,19 @@
-package request_strategy
+package requestStrategy
 
 import (
        "bytes"
-       "sort"
-       "sync"
+       "expvar"
 
+       g "github.com/anacrolix/generics"
        "github.com/anacrolix/multiless"
-       "github.com/anacrolix/torrent/storage"
 
+       "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/types"
 )
 
 type (
-       RequestIndex  uint32
-       ChunkIndex    = uint32
+       RequestIndex  uint32
+       ChunkIndex    = RequestIndex
        Request       = types.Request
        pieceIndex    = types.PieceIndex
        piecePriority = types.PiecePriority
@@ -21,336 +21,79 @@ type (
        ChunkSpec = types.ChunkSpec
 )
 
-type ClientPieceOrder struct{}
-
-type filterTorrent struct {
-       *Torrent
-       unverifiedBytes int64
-       // Potentially shared with other torrents.
-       storageLeft *int64
-}
-
-func sortFilterPieces(pieces []filterPiece) {
-       sort.Slice(pieces, func(_i, _j int) bool {
-               i := &pieces[_i]
-               j := &pieces[_j]
-               return multiless.New().Int(
-                       int(j.Priority), int(i.Priority),
-               ).Bool(
-                       j.Partial, i.Partial,
-               ).Int64(
-                       i.Availability, j.Availability,
-               ).Int(
-                       i.index, j.index,
-               ).Lazy(func() multiless.Computation {
-                       return multiless.New().Cmp(bytes.Compare(
-                               i.t.InfoHash[:],
-                               j.t.InfoHash[:],
-                       ))
-               }).MustLess()
+func pieceOrderLess(i, j *pieceRequestOrderItem) multiless.Computation {
+       return multiless.New().Int(
+               int(j.state.Priority), int(i.state.Priority),
+               // TODO: Should we match on complete here to prevent churn when availability changes?
+       ).Bool(
+               j.state.Partial, i.state.Partial,
+       ).Int(
+               // If this is done with relative availability, do we lose some determinism? If completeness
+               // is used, would that push this far enough down?
+               i.state.Availability, j.state.Availability,
+       ).Int(
+               i.key.Index, j.key.Index,
+       ).Lazy(func() multiless.Computation {
+               return multiless.New().Cmp(bytes.Compare(
+                       i.key.InfoHash[:],
+                       j.key.InfoHash[:],
+               ))
        })
 }
 
-type requestsPeer struct {
-       Peer
-       nextState                  PeerNextRequestState
-       requestablePiecesRemaining int
-}
-
-func (rp *requestsPeer) canFitRequest() bool {
-       return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
-}
-
-func (rp *requestsPeer) addNextRequest(r RequestIndex) {
-       if !rp.nextState.Requests.CheckedAdd(r) {
-               panic("should only add once")
-       }
-}
-
-type peersForPieceRequests struct {
-       requestsInPiece int
-       *requestsPeer
-}
-
-func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
-       me.requestsPeer.addNextRequest(r)
-       me.requestsInPiece++
-}
-
-type requestablePiece struct {
-       index             pieceIndex
-       t                 *Torrent
-       alwaysReallocate  bool
-       NumPendingChunks  int
-       IterPendingChunks ChunksIterFunc
-}
-
-func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
-       return p.t.ChunksPerPiece*uint32(p.index) + c
-}
-
-type filterPiece struct {
-       t     *filterTorrent
-       index pieceIndex
-       *Piece
-}
+var packageExpvarMap = expvar.NewMap("request-strategy")
 
 // Calls f with requestable pieces in order.
-func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
-       maxPieces := 0
-       for i := range input.Torrents {
-               maxPieces += len(input.Torrents[i].Pieces)
-       }
-       pieces := make([]filterPiece, 0, maxPieces)
+func GetRequestablePieces(
+       input Input, pro *PieceRequestOrder,
+       f func(ih metainfo.Hash, pieceIndex int, orderState PieceRequestOrderState),
+) {
        // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
        // TorrentImpl. A nil value means no capacity limit.
-       storageLeft := make(map[storage.TorrentCapacity]*int64)
-       for _t := range input.Torrents {
-               // TODO: We could do metainfo requests here.
-               t := &filterTorrent{
-                       Torrent:         &input.Torrents[_t],
-                       unverifiedBytes: 0,
-               }
-               key := t.Capacity
-               if key != nil {
-                       if _, ok := storageLeft[key]; !ok {
-                               capacity, ok := (*key)()
-                               if ok {
-                                       storageLeft[key] = &capacity
-                               } else {
-                                       storageLeft[key] = nil
-                               }
-                       }
-                       t.storageLeft = storageLeft[key]
-               }
-               for i := range t.Pieces {
-                       pieces = append(pieces, filterPiece{
-                               t:     t,
-                               index: i,
-                               Piece: &t.Pieces[i],
-                       })
-               }
+       var storageLeft *int64
+       if cap, ok := input.Capacity(); ok {
+               storageLeft = &cap
        }
-       sortFilterPieces(pieces)
        var allTorrentsUnverifiedBytes int64
-       for _, piece := range pieces {
-               if left := piece.t.storageLeft; left != nil {
-                       if *left < int64(piece.Length) {
-                               continue
+       var lastItem g.Option[pieceRequestOrderItem]
+       pro.tree.Scan(func(_i pieceRequestOrderItem) bool {
+               // Check that scan emits pieces in priority order.
+               if lastItem.Ok {
+                       if _i.Less(&lastItem.Value) {
+                               panic("scan not in order")
                        }
-                       *left -= int64(piece.Length)
                }
-               if !piece.Request || piece.NumPendingChunks == 0 {
+               lastItem.Set(_i)
+
+               ih := _i.key.InfoHash
+               t := input.Torrent(ih)
+               pieceLength := t.PieceLength()
+               if storageLeft != nil {
+                       if *storageLeft < pieceLength {
+                               return false
+                       }
+                       *storageLeft -= pieceLength
+               }
+               if t.IgnorePiece(_i.key.Index) {
                        // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
                        // considered unverified and hold up further requests.
-                       continue
+                       return true
                }
-               if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
-                       continue
+               if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() {
+                       return true
                }
-               if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
-                       continue
-               }
-               piece.t.unverifiedBytes += piece.Length
-               allTorrentsUnverifiedBytes += piece.Length
-               f(piece.t.Torrent, piece.Piece, piece.index)
-       }
+               allTorrentsUnverifiedBytes += pieceLength
+               f(ih, _i.key.Index, _i.state)
+               return true
+       })
        return
 }
 
-type Input struct {
-       Torrents           []Torrent
-       MaxUnverifiedBytes int64
-}
-
-// Checks that a sorted peersForPiece slice makes sense.
-func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
-       if !sort.IsSorted(peers) {
-               panic("not sorted")
-       }
-       peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
-       for _, p := range peers.peersForPiece {
-               if _, ok := peerMap[p]; ok {
-                       panic(p)
-               }
-               peerMap[p] = struct{}{}
-       }
-}
-
-var peersForPiecesPool sync.Pool
-
-func makePeersForPiece(cap int) []*peersForPieceRequests {
-       got := peersForPiecesPool.Get()
-       if got == nil {
-               return make([]*peersForPieceRequests, 0, cap)
-       }
-       return got.([]*peersForPieceRequests)[:0]
-}
-
-type peersForPieceSorter struct {
-       peersForPiece []*peersForPieceRequests
-       req           *RequestIndex
-       p             requestablePiece
-}
-
-func (me *peersForPieceSorter) Len() int {
-       return len(me.peersForPiece)
-}
-
-func (me *peersForPieceSorter) Swap(i, j int) {
-       me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
-}
-
-func (me *peersForPieceSorter) Less(_i, _j int) bool {
-       i := me.peersForPiece[_i]
-       j := me.peersForPiece[_j]
-       req := me.req
-       p := &me.p
-       byHasRequest := func() multiless.Computation {
-               ml := multiless.New()
-               if req != nil {
-                       iHas := i.nextState.Requests.Contains(*req)
-                       jHas := j.nextState.Requests.Contains(*req)
-                       ml = ml.Bool(jHas, iHas)
-               }
-               return ml
-       }()
-       ml := multiless.New()
-       // We always "reallocate", that is force even striping amongst peers that are either on
-       // the last piece they can contribute too, or for pieces marked for this behaviour.
-       // Striping prevents starving peers of requests, and will always re-balance to the
-       // fastest known peers.
-       if !p.alwaysReallocate {
-               ml = ml.Bool(
-                       j.requestablePiecesRemaining == 1,
-                       i.requestablePiecesRemaining == 1)
-       }
-       if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
-               ml = ml.Int(
-                       i.requestsInPiece,
-                       j.requestsInPiece)
-       } else {
-               ml = ml.AndThen(byHasRequest)
-       }
-       ml = ml.Int(
-               i.requestablePiecesRemaining,
-               j.requestablePiecesRemaining,
-       ).Float64(
-               j.DownloadRate,
-               i.DownloadRate,
-       )
-       if ml.Ok() {
-               return ml.Less()
-       }
-       ml = ml.AndThen(byHasRequest)
-       return ml.Int64(
-               int64(j.Age), int64(i.Age),
-               // TODO: Probably peer priority can come next
-       ).Uintptr(
-               i.Id.Uintptr(),
-               j.Id.Uintptr(),
-       ).MustLess()
-}
-
-func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
-       peersForPiece := makePeersForPiece(len(peers))
-       for _, peer := range peers {
-               if !peer.canRequestPiece(p.index) {
-                       continue
-               }
-               if !peer.canFitRequest() {
-                       peer.requestablePiecesRemaining--
-                       continue
-               }
-               peersForPiece = append(peersForPiece, &peersForPieceRequests{
-                       requestsInPiece: 0,
-                       requestsPeer:    peer,
-               })
-       }
-       defer func() {
-               for _, peer := range peersForPiece {
-                       peer.requestablePiecesRemaining--
-               }
-               peersForPiecesPool.Put(peersForPiece)
-       }()
-       peersForPieceSorter := peersForPieceSorter{
-               peersForPiece: peersForPiece,
-               p:             p,
-       }
-       sortPeersForPiece := func(req *RequestIndex) {
-               peersForPieceSorter.req = req
-               sort.Sort(&peersForPieceSorter)
-               // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
-       }
-       // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
-       // with "next" request state before another request strategy run occurs.
-       preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
-       p.IterPendingChunks(func(spec ChunkIndex) {
-               req := p.chunkIndexToRequestIndex(spec)
-               for _, peer := range peersForPiece {
-                       if !peer.ExistingRequests.Contains(req) {
-                               continue
-                       }
-                       if !peer.canFitRequest() {
-                               continue
-                       }
-                       preallocated[spec] = append(preallocated[spec], peer)
-                       peer.addNextRequest(req)
-               }
-       })
-       pendingChunksRemaining := int(p.NumPendingChunks)
-       p.IterPendingChunks(func(chunk ChunkIndex) {
-               if len(preallocated[chunk]) != 0 {
-                       return
-               }
-               req := p.chunkIndexToRequestIndex(chunk)
-               defer func() { pendingChunksRemaining-- }()
-               sortPeersForPiece(nil)
-               for _, peer := range peersForPiece {
-                       if !peer.canFitRequest() {
-                               continue
-                       }
-                       if !peer.PieceAllowedFast.ContainsInt(p.index) {
-                               // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
-                               peer.nextState.Interested = true
-                               if peer.Choking {
-                                       continue
-                               }
-                       }
-                       peer.addNextRequest(req)
-                       break
-               }
-       })
-chunk:
-       for chunk, prePeers := range preallocated {
-               if len(prePeers) == 0 {
-                       continue
-               }
-               pendingChunksRemaining--
-               req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
-               for _, pp := range prePeers {
-                       pp.requestsInPiece--
-               }
-               sortPeersForPiece(&req)
-               for _, pp := range prePeers {
-                       pp.nextState.Requests.Remove(req)
-               }
-               for _, peer := range peersForPiece {
-                       if !peer.canFitRequest() {
-                               continue
-                       }
-                       if !peer.PieceAllowedFast.ContainsInt(p.index) {
-                               // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
-                               peer.nextState.Interested = true
-                               if peer.Choking {
-                                       continue
-                               }
-                       }
-                       peer.addNextRequest(req)
-                       continue chunk
-               }
-       }
-       if pendingChunksRemaining != 0 {
-               panic(pendingChunksRemaining)
-       }
+type Input interface {
+       Torrent(metainfo.Hash) Torrent
+       // Storage capacity, shared among all Torrents with the same storage.TorrentCapacity pointer in
+       // their storage.Torrent references.
+       Capacity() (cap int64, capped bool)
+       // Across all the Torrents. This might be partitioned by storage capacity key now.
+       MaxUnverifiedBytes() int64
 }