]> Sergey Matveev's repositories - btrtrc.git/blobdiff - request-strategy/order.go
Use interfaces to lazily expose the bare minimum inputs to GetRequestablePieces
[btrtrc.git] / request-strategy / order.go
index adbd29e4e5f8776e0588ef1d4960f59bf9dbf53c..4a86aae97c20d69e4bb62fcc063630b88892af11 100644 (file)
@@ -3,9 +3,6 @@ package request_strategy
 import (
        "bytes"
        "expvar"
-       "runtime"
-       "sort"
-       "sync"
 
        "github.com/anacrolix/multiless"
        "github.com/anacrolix/torrent/metainfo"
@@ -24,46 +21,6 @@ type (
        ChunkSpec = types.ChunkSpec
 )
 
-type ClientPieceOrder struct{}
-
-func equalFilterPieces(l, r []filterPiece) bool {
-       if len(l) != len(r) {
-               return false
-       }
-       for i := range l {
-               lp := &l[i]
-               rp := &r[i]
-               if lp.Priority != rp.Priority ||
-                       lp.Partial != rp.Partial ||
-                       lp.Availability != rp.Availability ||
-                       lp.index != rp.index ||
-                       lp.t.InfoHash != rp.t.InfoHash {
-                       return false
-               }
-       }
-       return true
-}
-
-type pieceSorter struct {
-       swap func(i, j int)
-       get  func(i int) *filterPiece
-       len  int
-}
-
-func (me pieceSorter) Len() int {
-       return me.len
-}
-
-func (me pieceSorter) Swap(i, j int) {
-       me.swap(i, j)
-}
-
-func (me pieceSorter) Less(_i, _j int) bool {
-       i := me.get(_i)
-       j := me.get(_j)
-       return pieceOrderLess(i.toPieceOrderInput(), j.toPieceOrderInput()).MustLess()
-}
-
 type pieceOrderInput struct {
        PieceRequestOrderState
        PieceRequestOrderKey
@@ -112,339 +69,49 @@ func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
        me.requestsInPiece++
 }
 
-type requestablePiece struct {
-       index             pieceIndex
-       t                 *Torrent
-       alwaysReallocate  bool
-       NumPendingChunks  int
-       IterPendingChunks ChunksIterFunc
-}
-
-func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
-       return p.t.ChunksPerPiece*uint32(p.index) + c
-}
-
-type filterPiece struct {
-       t     *Torrent
-       index pieceIndex
-       *Piece
-}
-
-func (fp *filterPiece) toPieceOrderInput() (ret pieceOrderInput) {
-       ret.Partial = fp.Partial
-       ret.InfoHash = fp.t.InfoHash
-       ret.Availability = fp.Availability
-       ret.Priority = fp.Priority
-       ret.Index = fp.index
-       return
-}
-
-var (
-       sortsMu sync.Mutex
-       sorts   = map[*[]filterPiece][]int{}
-)
-
-func reorderedFilterPieces(pieces []filterPiece, indices []int) (ret []filterPiece) {
-       ret = make([]filterPiece, len(indices))
-       for i, j := range indices {
-               ret[i] = pieces[j]
-       }
-       return
-}
-
 var packageExpvarMap = expvar.NewMap("request-strategy")
 
-func getSortedFilterPieces(unsorted []filterPiece) []filterPiece {
-       const cachePieceSorts = false
-       if !cachePieceSorts {
-               sort.Sort(pieceSorter{
-                       len: len(unsorted),
-                       swap: func(i, j int) {
-                               unsorted[i], unsorted[j] = unsorted[j], unsorted[i]
-                       },
-                       get: func(i int) *filterPiece {
-                               return &unsorted[i]
-                       },
-               })
-               return unsorted
-       }
-       sortsMu.Lock()
-       defer sortsMu.Unlock()
-       for key, order := range sorts {
-               if equalFilterPieces(*key, unsorted) {
-                       packageExpvarMap.Add("reused filter piece ordering", 1)
-                       return reorderedFilterPieces(unsorted, order)
-               }
-       }
-       indices := make([]int, len(unsorted))
-       for i := 0; i < len(indices); i++ {
-               indices[i] = i
-       }
-       sort.Sort(pieceSorter{
-               len: len(unsorted),
-               swap: func(i, j int) {
-                       indices[i], indices[j] = indices[j], indices[i]
-               },
-               get: func(i int) *filterPiece {
-                       return &unsorted[indices[i]]
-               },
-       })
-       packageExpvarMap.Add("added filter piece ordering", 1)
-       sorts[&unsorted] = indices
-       runtime.SetFinalizer(&pieceOrderingFinalizer{unsorted: &unsorted}, func(me *pieceOrderingFinalizer) {
-               packageExpvarMap.Add("finalized filter piece ordering", 1)
-               sortsMu.Lock()
-               defer sortsMu.Unlock()
-               delete(sorts, me.unsorted)
-       })
-       return reorderedFilterPieces(unsorted, indices)
-}
-
-type pieceOrderingFinalizer struct {
-       unsorted *[]filterPiece
-}
-
 // Calls f with requestable pieces in order.
-func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(t *Torrent, p *Piece, pieceIndex int)) {
+func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(ih metainfo.Hash, pieceIndex int)) {
        // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
        // TorrentImpl. A nil value means no capacity limit.
        var storageLeft *int64
-       if input.Capacity != nil {
-               storageLeft = new(int64)
-               *storageLeft = *input.Capacity
+       if cap, ok := input.Capacity(); ok {
+               storageLeft = &cap
        }
        var allTorrentsUnverifiedBytes int64
-       torrentUnverifiedBytes := map[metainfo.Hash]int64{}
        pro.tree.Ascend(func(i btree.Item) bool {
                _i := i.(pieceRequestOrderItem)
-               var t Torrent = input.Torrents[_i.key.InfoHash]
-               var piece *Piece = &t.Pieces[_i.key.Index]
-               if left := storageLeft; left != nil {
-                       if *left < piece.Length {
+               ih := _i.key.InfoHash
+               var t Torrent = input.Torrent(ih)
+               var piece Piece = t.Piece(_i.key.Index)
+               pieceLength := t.PieceLength()
+               if storageLeft != nil {
+                       if *storageLeft < pieceLength {
                                return true
                        }
-                       *left -= piece.Length
+                       *storageLeft -= pieceLength
                }
-               if !piece.Request || piece.NumPendingChunks == 0 {
+               if !piece.Request() || piece.NumPendingChunks() == 0 {
                        // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
                        // considered unverified and hold up further requests.
-
-                       return true
-               }
-               if t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[t.InfoHash]+piece.Length > t.MaxUnverifiedBytes {
                        return true
                }
-               if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
+               if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() {
                        return true
                }
-               torrentUnverifiedBytes[t.InfoHash] += piece.Length
-               allTorrentsUnverifiedBytes += piece.Length
-               f(&t, piece, _i.key.Index)
+               allTorrentsUnverifiedBytes += pieceLength
+               f(ih, _i.key.Index)
                return true
        })
        return
 }
 
-type Input struct {
-       // This is all torrents that share the same capacity below (or likely a single torrent if there
-       // is infinite capacity, since you could just run it separately for each Torrent if that's the
-       // case).
-       Torrents map[metainfo.Hash]Torrent
-       // Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents
-       // that share the same capacity key must be incorporated in piece ordering.
-       Capacity *int64
+type Input interface {
+       Torrent(metainfo.Hash) Torrent
+       // Storage capacity, shared among all Torrents with the same storage.TorrentCapacity pointer in
+       // their storage.Torrent references.
+       Capacity() (cap int64, capped bool)
        // Across all the Torrents. This might be partitioned by storage capacity key now.
-       MaxUnverifiedBytes int64
-}
-
-// Checks that a sorted peersForPiece slice makes sense.
-func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
-       if !sort.IsSorted(peers) {
-               panic("not sorted")
-       }
-       peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
-       for _, p := range peers.peersForPiece {
-               if _, ok := peerMap[p]; ok {
-                       panic(p)
-               }
-               peerMap[p] = struct{}{}
-       }
-}
-
-var peersForPiecesPool sync.Pool
-
-func makePeersForPiece(cap int) []*peersForPieceRequests {
-       got := peersForPiecesPool.Get()
-       if got == nil {
-               return make([]*peersForPieceRequests, 0, cap)
-       }
-       return got.([]*peersForPieceRequests)[:0]
-}
-
-type peersForPieceSorter struct {
-       peersForPiece []*peersForPieceRequests
-       req           *RequestIndex
-       p             requestablePiece
-}
-
-func (me *peersForPieceSorter) Len() int {
-       return len(me.peersForPiece)
-}
-
-func (me *peersForPieceSorter) Swap(i, j int) {
-       me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
-}
-
-func (me *peersForPieceSorter) Less(_i, _j int) bool {
-       i := me.peersForPiece[_i]
-       j := me.peersForPiece[_j]
-       req := me.req
-       p := &me.p
-       byHasRequest := func() multiless.Computation {
-               ml := multiless.New()
-               if req != nil {
-                       iHas := i.nextState.Requests.Contains(*req)
-                       jHas := j.nextState.Requests.Contains(*req)
-                       ml = ml.Bool(jHas, iHas)
-               }
-               return ml
-       }()
-       ml := multiless.New()
-       // We always "reallocate", that is force even striping amongst peers that are either on
-       // the last piece they can contribute too, or for pieces marked for this behaviour.
-       // Striping prevents starving peers of requests, and will always re-balance to the
-       // fastest known peers.
-       if !p.alwaysReallocate {
-               ml = ml.Bool(
-                       j.requestablePiecesRemaining == 1,
-                       i.requestablePiecesRemaining == 1)
-       }
-       if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
-               ml = ml.Int(
-                       i.requestsInPiece,
-                       j.requestsInPiece)
-       } else {
-               ml = ml.AndThen(byHasRequest)
-       }
-       ml = ml.Int(
-               i.requestablePiecesRemaining,
-               j.requestablePiecesRemaining,
-       ).Float64(
-               j.DownloadRate,
-               i.DownloadRate,
-       )
-       if ml.Ok() {
-               return ml.Less()
-       }
-       ml = ml.AndThen(byHasRequest)
-       return ml.Int64(
-               int64(j.Age), int64(i.Age),
-               // TODO: Probably peer priority can come next
-       ).Uintptr(
-               i.Id.Uintptr(),
-               j.Id.Uintptr(),
-       ).MustLess()
-}
-
-func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
-       peersForPiece := makePeersForPiece(len(peers))
-       for _, peer := range peers {
-               if !peer.canRequestPiece(p.index) {
-                       continue
-               }
-               if !peer.canFitRequest() {
-                       peer.requestablePiecesRemaining--
-                       continue
-               }
-               peersForPiece = append(peersForPiece, &peersForPieceRequests{
-                       requestsInPiece: 0,
-                       requestsPeer:    peer,
-               })
-       }
-       defer func() {
-               for _, peer := range peersForPiece {
-                       peer.requestablePiecesRemaining--
-               }
-               peersForPiecesPool.Put(peersForPiece)
-       }()
-       peersForPieceSorter := peersForPieceSorter{
-               peersForPiece: peersForPiece,
-               p:             p,
-       }
-       sortPeersForPiece := func(req *RequestIndex) {
-               peersForPieceSorter.req = req
-               sort.Sort(&peersForPieceSorter)
-               // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
-       }
-       // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
-       // with "next" request state before another request strategy run occurs.
-       preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
-       p.IterPendingChunks(func(spec ChunkIndex) {
-               req := p.chunkIndexToRequestIndex(spec)
-               for _, peer := range peersForPiece {
-                       if !peer.ExistingRequests.Contains(req) {
-                               continue
-                       }
-                       if !peer.canFitRequest() {
-                               continue
-                       }
-                       preallocated[spec] = append(preallocated[spec], peer)
-                       peer.addNextRequest(req)
-               }
-       })
-       pendingChunksRemaining := int(p.NumPendingChunks)
-       p.IterPendingChunks(func(chunk ChunkIndex) {
-               if len(preallocated[chunk]) != 0 {
-                       return
-               }
-               req := p.chunkIndexToRequestIndex(chunk)
-               defer func() { pendingChunksRemaining-- }()
-               sortPeersForPiece(nil)
-               for _, peer := range peersForPiece {
-                       if !peer.canFitRequest() {
-                               continue
-                       }
-                       if !peer.PieceAllowedFast.ContainsInt(p.index) {
-                               // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
-                               peer.nextState.Interested = true
-                               if peer.Choking {
-                                       continue
-                               }
-                       }
-                       peer.addNextRequest(req)
-                       break
-               }
-       })
-chunk:
-       for chunk, prePeers := range preallocated {
-               if len(prePeers) == 0 {
-                       continue
-               }
-               pendingChunksRemaining--
-               req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
-               for _, pp := range prePeers {
-                       pp.requestsInPiece--
-               }
-               sortPeersForPiece(&req)
-               for _, pp := range prePeers {
-                       pp.nextState.Requests.Remove(req)
-               }
-               for _, peer := range peersForPiece {
-                       if !peer.canFitRequest() {
-                               continue
-                       }
-                       if !peer.PieceAllowedFast.ContainsInt(p.index) {
-                               // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
-                               peer.nextState.Interested = true
-                               if peer.Choking {
-                                       continue
-                               }
-                       }
-                       peer.addNextRequest(req)
-                       continue chunk
-               }
-       }
-       if pendingChunksRemaining != 0 {
-               panic(pendingChunksRemaining)
-       }
+       MaxUnverifiedBytes() int64
 }