]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Optimize request strategy for torrent storage with unlimited capacity
authorMatt Joiner <anacrolix@gmail.com>
Thu, 8 Feb 2024 13:15:18 +0000 (00:15 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 12 Feb 2024 03:28:57 +0000 (14:28 +1100)
piece.go
request-strategy-impls.go
request-strategy-impls_test.go
request-strategy/order.go
request-strategy/piece-request-order.go
request-strategy/piece-request-order_test.go
requesting.go
torrent-piece-request-order.go
torrent.go
undirtied-chunks-iter.go

index e08b2609690e385663c4df716d22d05ab3c7808b..4fd2d309b94634879c32d4c8ee2a7e9a3a920319 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -82,13 +82,13 @@ func (p *Piece) numDirtyChunks() chunkIndexType {
 
 func (p *Piece) unpendChunkIndex(i chunkIndexType) {
        p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
-       p.t.updatePieceRequestOrder(p.index)
+       p.t.updatePieceRequestOrderPiece(p.index)
        p.readerCond.Broadcast()
 }
 
 func (p *Piece) pendChunkIndex(i RequestIndex) {
        p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
-       p.t.updatePieceRequestOrder(p.index)
+       p.t.updatePieceRequestOrderPiece(p.index)
 }
 
 func (p *Piece) numChunks() chunkIndexType {
index ae5054507d516686bd64e25383de9c95a4d57298..9d779ded01c06c3937f1a76d98f9e2b166ba622a 100644 (file)
@@ -2,38 +2,66 @@ package torrent
 
 import (
        g "github.com/anacrolix/generics"
+
        "github.com/anacrolix/torrent/metainfo"
        request_strategy "github.com/anacrolix/torrent/request-strategy"
        "github.com/anacrolix/torrent/storage"
 )
 
-type requestStrategyInput struct {
-       cl      *Client
-       capFunc storage.TorrentCapacity
+type requestStrategyInputCommon struct {
+       maxUnverifiedBytes int64
 }
 
-func (r requestStrategyInput) Torrent(ih metainfo.Hash) request_strategy.Torrent {
-       return requestStrategyTorrent{g.MapMustGet(r.cl.torrents, ih)}
+func (r requestStrategyInputCommon) MaxUnverifiedBytes() int64 {
+       return r.maxUnverifiedBytes
 }
 
-func (r requestStrategyInput) Capacity() (int64, bool) {
-       if r.capFunc == nil {
-               return 0, false
-       }
+type requestStrategyInputMultiTorrent struct {
+       requestStrategyInputCommon
+       torrents map[metainfo.Hash]*Torrent
+       capFunc  storage.TorrentCapacity
+}
+
+func (r requestStrategyInputMultiTorrent) Torrent(ih metainfo.Hash) request_strategy.Torrent {
+       return requestStrategyTorrent{g.MapMustGet(r.torrents, ih)}
+}
+
+func (r requestStrategyInputMultiTorrent) Capacity() (int64, bool) {
        return (*r.capFunc)()
 }
 
-func (r requestStrategyInput) MaxUnverifiedBytes() int64 {
-       return r.cl.config.MaxUnverifiedBytes
+type requestStrategyInputSingleTorrent struct {
+       requestStrategyInputCommon
+       t *Torrent
 }
 
-var _ request_strategy.Input = requestStrategyInput{}
+func (r requestStrategyInputSingleTorrent) Torrent(_ metainfo.Hash) request_strategy.Torrent {
+       return requestStrategyTorrent{r.t}
+}
+
+func (r requestStrategyInputSingleTorrent) Capacity() (cap int64, capped bool) {
+       return 0, false
+}
+
+var _ request_strategy.Input = requestStrategyInputSingleTorrent{}
+
+func (cl *Client) getRequestStrategyInputCommon() requestStrategyInputCommon {
+       return requestStrategyInputCommon{cl.config.MaxUnverifiedBytes}
+}
 
 // Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
 func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
-       return requestStrategyInput{
-               cl:      cl,
-               capFunc: primaryTorrent.storage.Capacity,
+       if primaryTorrent.storage.Capacity == nil {
+               return requestStrategyInputSingleTorrent{
+                       requestStrategyInputCommon: cl.getRequestStrategyInputCommon(),
+                       t:                          primaryTorrent,
+               }
+       } else {
+               return requestStrategyInputMultiTorrent{
+                       requestStrategyInputCommon: cl.getRequestStrategyInputCommon(),
+                       torrents:                   cl.torrents,
+                       capFunc:                    primaryTorrent.storage.Capacity,
+               }
        }
 }
 
index f0d3fa183fdbf488e5052cbf3f7c19599e095665..ad15166259fe962e8063f397d1b5275788a0e6ca 100644 (file)
@@ -5,10 +5,13 @@ import (
        "runtime"
        "testing"
 
+       "github.com/anacrolix/missinggo/v2/iter"
        "github.com/davecgh/go-spew/spew"
        qt "github.com/frankban/quicktest"
 
+       "github.com/anacrolix/torrent/metainfo"
        request_strategy "github.com/anacrolix/torrent/request-strategy"
+       "github.com/anacrolix/torrent/storage"
 )
 
 func makeRequestStrategyPiece(t request_strategy.Torrent) request_strategy.Piece {
@@ -27,3 +30,103 @@ func TestRequestStrategyPieceDoesntAlloc(t *testing.T) {
        // We have to use p, or it gets optimized away.
        spew.Fdump(io.Discard, p)
 }
+
+type storagePiece struct {
+       complete bool
+}
+
+func (s storagePiece) ReadAt(p []byte, off int64) (n int, err error) {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (s storagePiece) WriteAt(p []byte, off int64) (n int, err error) {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (s storagePiece) MarkComplete() error {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (s storagePiece) MarkNotComplete() error {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (s storagePiece) Completion() storage.Completion {
+       return storage.Completion{Ok: true, Complete: s.complete}
+}
+
+var _ storage.PieceImpl = storagePiece{}
+
+type storageClient struct {
+       completed int
+}
+
+func (s *storageClient) OpenTorrent(
+       info *metainfo.Info,
+       infoHash metainfo.Hash,
+) (storage.TorrentImpl, error) {
+       return storage.TorrentImpl{
+               Piece: func(p metainfo.Piece) storage.PieceImpl {
+                       return storagePiece{complete: p.Index() < s.completed}
+               },
+       }, nil
+}
+
+func BenchmarkRequestStrategy(b *testing.B) {
+       c := qt.New(b)
+       cl := newTestingClient(b)
+       storageClient := storageClient{}
+       tor, new := cl.AddTorrentOpt(AddTorrentOpts{
+               Storage: &storageClient,
+       })
+       tor.disableTriggers = true
+       c.Assert(new, qt.IsTrue)
+       const pieceLength = 1 << 8 << 10
+       const numPieces = 30_000
+       err := tor.setInfo(&metainfo.Info{
+               Pieces:      make([]byte, numPieces*metainfo.HashSize),
+               PieceLength: pieceLength,
+               Length:      pieceLength * numPieces,
+       })
+       c.Assert(err, qt.IsNil)
+       tor.onSetInfo()
+       peer := cl.newConnection(nil, newConnectionOpts{
+               network: "test",
+       })
+       peer.setTorrent(tor)
+       c.Assert(tor.storage, qt.IsNotNil)
+       const chunkSize = defaultChunkSize
+       peer.onPeerHasAllPiecesNoTriggers()
+       for i := 0; i < tor.numPieces(); i++ {
+               tor.pieces[i].priority.Raise(PiecePriorityNormal)
+               tor.updatePiecePriorityNoTriggers(i)
+       }
+       peer.peerChoking = false
+       //b.StopTimer()
+       b.ResetTimer()
+       //b.ReportAllocs()
+       for _ = range iter.N(b.N) {
+               storageClient.completed = 0
+               for pieceIndex := range iter.N(numPieces) {
+                       tor.updatePieceCompletion(pieceIndex)
+               }
+               for completed := 0; completed <= numPieces; completed += 1 {
+                       storageClient.completed = completed
+                       if completed > 0 {
+                               tor.updatePieceCompletion(completed - 1)
+                       }
+                       // Starting and stopping timers around this part causes lots of GC overhead.
+                       rs := peer.getDesiredRequestState()
+                       tor.cacheNextRequestIndexesForReuse(rs.Requests.requestIndexes)
+                       // End of part that should be timed.
+                       remainingChunks := (numPieces - completed) * (pieceLength / chunkSize)
+                       c.Assert(rs.Requests.requestIndexes, qt.HasLen, minInt(
+                               remainingChunks,
+                               int(cl.config.MaxUnverifiedBytes/chunkSize)))
+               }
+       }
+}
index eb4d1b7f7296b166bd4d704e00ca5c907381c0e6..67adc8587b9c5a8a0d1c2a2afb06bebd089ad5b8 100644 (file)
@@ -68,7 +68,7 @@ func GetRequestablePieces(
                        return true
                }
                if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() {
-                       return true
+                       return false
                }
                allTorrentsUnverifiedBytes += pieceLength
                f(ih, _i.key.Index, _i.state)
index 3056741db389a453de42e87899125ae563a5d658..54b5a6ee995ffc7dce73cf930bcc8c4613d16e64 100644 (file)
@@ -1,6 +1,9 @@
 package requestStrategy
 
-import "github.com/anacrolix/torrent/metainfo"
+import (
+       g "github.com/anacrolix/generics"
+       "github.com/anacrolix/torrent/metainfo"
+)
 
 type Btree interface {
        Delete(pieceRequestOrderItem)
@@ -21,14 +24,14 @@ type PieceRequestOrder struct {
 }
 
 type PieceRequestOrderKey struct {
-       InfoHash metainfo.Hash
        Index    int
+       InfoHash metainfo.Hash
 }
 
 type PieceRequestOrderState struct {
+       Availability int
        Priority     piecePriority
        Partial      bool
-       Availability int
 }
 
 type pieceRequestOrderItem struct {
@@ -40,28 +43,29 @@ func (me *pieceRequestOrderItem) Less(otherConcrete *pieceRequestOrderItem) bool
        return pieceOrderLess(me, otherConcrete).Less()
 }
 
-func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrderState) {
-       if _, ok := me.keys[key]; ok {
-               panic(key)
+// Returns the old state if the key was already present.
+func (me *PieceRequestOrder) Add(
+       key PieceRequestOrderKey,
+       state PieceRequestOrderState,
+) (old g.Option[PieceRequestOrderState]) {
+       if old.Value, old.Ok = me.keys[key]; old.Ok {
+               if state == old.Value {
+                       return
+               }
+               me.tree.Delete(pieceRequestOrderItem{key, old.Value})
        }
        me.tree.Add(pieceRequestOrderItem{key, state})
        me.keys[key] = state
+       return
 }
 
 func (me *PieceRequestOrder) Update(
        key PieceRequestOrderKey,
        state PieceRequestOrderState,
 ) {
-       oldState, ok := me.keys[key]
-       if !ok {
+       if !me.Add(key, state).Ok {
                panic("key should have been added already")
        }
-       if state == oldState {
-               return
-       }
-       me.tree.Delete(pieceRequestOrderItem{key, oldState})
-       me.tree.Add(pieceRequestOrderItem{key, state})
-       me.keys[key] = state
 }
 
 func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceRequestOrderItem {
@@ -71,9 +75,14 @@ func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceR
        }
 }
 
-func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) {
-       me.tree.Delete(pieceRequestOrderItem{key, me.keys[key]})
+func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) bool {
+       state, ok := me.keys[key]
+       if !ok {
+               return false
+       }
+       me.tree.Delete(pieceRequestOrderItem{key, state})
        delete(me.keys, key)
+       return true
 }
 
 func (me *PieceRequestOrder) Len() int {
index ee5fb39ae5f02580d982305ea137682ae1bd3343..818b2414076b0f15fbbc5e1d204366beca6b93be 100644 (file)
@@ -19,7 +19,7 @@ func benchmarkPieceRequestOrder[B Btree](
        for range iter.N(b.N) {
                pro := NewPieceOrder(newBtree(), numPieces)
                state := PieceRequestOrderState{}
-               doPieces := func(m func(PieceRequestOrderKey)) {
+               doPieces := func(m func(PieceRequestOrderKey) bool) {
                        for i := range iter.N(numPieces) {
                                key := PieceRequestOrderKey{
                                        Index: i,
@@ -28,34 +28,38 @@ func benchmarkPieceRequestOrder[B Btree](
                                m(key)
                        }
                }
-               doPieces(func(key PieceRequestOrderKey) {
-                       pro.Add(key, state)
+               doPieces(func(key PieceRequestOrderKey) bool {
+                       return !pro.Add(key, state).Ok
                })
                state.Availability++
-               doPieces(func(key PieceRequestOrderKey) {
+               doPieces(func(key PieceRequestOrderKey) bool {
                        pro.Update(key, state)
+                       return true
                })
                pro.tree.Scan(func(item pieceRequestOrderItem) bool {
                        return true
                })
-               doPieces(func(key PieceRequestOrderKey) {
+               doPieces(func(key PieceRequestOrderKey) bool {
                        state.Priority = piecePriority(key.Index / 4)
                        pro.Update(key, state)
+                       return true
                })
                pro.tree.Scan(func(item pieceRequestOrderItem) bool {
                        return item.key.Index < 1000
                })
                state.Priority = 0
                state.Availability++
-               doPieces(func(key PieceRequestOrderKey) {
+               doPieces(func(key PieceRequestOrderKey) bool {
                        pro.Update(key, state)
+                       return true
                })
                pro.tree.Scan(func(item pieceRequestOrderItem) bool {
                        return item.key.Index < 1000
                })
                state.Availability--
-               doPieces(func(key PieceRequestOrderKey) {
+               doPieces(func(key PieceRequestOrderKey) bool {
                        pro.Update(key, state)
+                       return true
                })
                doPieces(pro.Delete)
                if pro.Len() != 0 {
index fdb0ad79109feff0383e3df15099460d0098741e..1440207d52382b4221eeb9522c0484ff00e8c52c 100644 (file)
@@ -212,7 +212,8 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
                                                return
                                        }
                                }
-                               if p.requestState.Cancelled.Contains(r) {
+                               cancelled := &p.requestState.Cancelled
+                               if !cancelled.IsEmpty() && cancelled.Contains(r) {
                                        // Can't re-request while awaiting acknowledgement.
                                        return
                                }
@@ -244,11 +245,19 @@ func (p *Peer) maybeUpdateActualRequestState() {
                func(_ context.Context) {
                        next := p.getDesiredRequestState()
                        p.applyRequestState(next)
-                       p.t.requestIndexes = next.Requests.requestIndexes[:0]
+                       p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
                },
        )
 }
 
+func (t *Torrent) cacheNextRequestIndexesForReuse(slice []RequestIndex) {
+       // The incoming slice can be smaller when getDesiredRequestState short circuits on some
+       // conditions.
+       if cap(slice) > cap(t.requestIndexes) {
+               t.requestIndexes = slice[:0]
+       }
+}
+
 // Whether we should allow sending not interested ("losing interest") to the peer. I noticed
 // qBitTorrent seems to punish us for sending not interested when we're streaming and don't
 // currently need anything.
@@ -278,7 +287,11 @@ func (p *Peer) applyRequestState(next desiredRequestState) {
                return
        }
        more := true
-       requestHeap := heap.InterfaceForSlice(&next.Requests.requestIndexes, next.Requests.lessByValue)
+       orig := next.Requests.requestIndexes
+       requestHeap := heap.InterfaceForSlice(
+               &next.Requests.requestIndexes,
+               next.Requests.lessByValue,
+       )
        heap.Init(requestHeap)
 
        t := p.t
@@ -292,6 +305,9 @@ func (p *Peer) applyRequestState(next desiredRequestState) {
                        break
                }
                req := heap.Pop(requestHeap)
+               if cap(next.Requests.requestIndexes) != cap(orig) {
+                       panic("changed")
+               }
                existing := t.requestingPeer(req)
                if existing != nil && existing != p {
                        // Don't steal from the poor.
index 10623da0b5d83594c396a14a11a2599123b44151..03943ed04a1d71445925d82584afafa353d32484 100644 (file)
@@ -1,17 +1,28 @@
 package torrent
 
 import (
+       g "github.com/anacrolix/generics"
        request_strategy "github.com/anacrolix/torrent/request-strategy"
 )
 
-func (t *Torrent) updatePieceRequestOrder(pieceIndex int) {
+func (t *Torrent) updatePieceRequestOrderPiece(pieceIndex int) {
        if t.storage == nil {
                return
        }
-       if ro, ok := t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()]; ok {
-               ro.Update(
-                       t.pieceRequestOrderKey(pieceIndex),
-                       t.requestStrategyPieceOrderState(pieceIndex))
+       pro, ok := t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()]
+       if !ok {
+               return
+       }
+       key := t.pieceRequestOrderKey(pieceIndex)
+       if t.hasStorageCap() {
+               pro.Update(key, t.requestStrategyPieceOrderState(pieceIndex))
+               return
+       }
+       pending := !t.ignorePieceForRequests(pieceIndex)
+       if pending {
+               pro.Add(key, t.requestStrategyPieceOrderState(pieceIndex))
+       } else {
+               pro.Delete(key)
        }
 }
 
@@ -41,9 +52,7 @@ func (t *Torrent) initPieceRequestOrder() {
        if t.storage == nil {
                return
        }
-       if t.cl.pieceRequestOrder == nil {
-               t.cl.pieceRequestOrder = make(map[interface{}]*request_strategy.PieceRequestOrder)
-       }
+       g.MakeMapIfNil(&t.cl.pieceRequestOrder)
        key := t.clientPieceRequestOrderKey()
        cpro := t.cl.pieceRequestOrder
        if cpro[key] == nil {
@@ -55,9 +64,11 @@ func (t *Torrent) addRequestOrderPiece(i int) {
        if t.storage == nil {
                return
        }
-       t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()].Add(
-               t.pieceRequestOrderKey(i),
-               t.requestStrategyPieceOrderState(i))
+       pro := t.getPieceRequestOrder()
+       key := t.pieceRequestOrderKey(i)
+       if t.hasStorageCap() || !t.ignorePieceForRequests(i) {
+               pro.Add(key, t.requestStrategyPieceOrderState(i))
+       }
 }
 
 func (t *Torrent) getPieceRequestOrder() *request_strategy.PieceRequestOrder {
index 8ea2acde041ff65a7c2839a8aa5e96e21da62b27..2643dd6a69237a394196166f1f9dca22b305b56d 100644 (file)
@@ -170,6 +170,8 @@ type Torrent struct {
        // Large allocations reused between request state updates.
        requestPieceStates []request_strategy.PieceRequestOrderState
        requestIndexes     []RequestIndex
+
+       disableTriggers bool
 }
 
 type outgoingConnAttemptKey = *PeerInfo
@@ -200,7 +202,7 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) {
                panic(p.relativeAvailability)
        }
        p.relativeAvailability--
-       t.updatePieceRequestOrder(i)
+       t.updatePieceRequestOrderPiece(i)
 }
 
 func (t *Torrent) incPieceAvailability(i pieceIndex) {
@@ -208,7 +210,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) {
        if t.haveInfo() {
                p := t.piece(i)
                p.relativeAvailability++
-               t.updatePieceRequestOrder(i)
+               t.updatePieceRequestOrderPiece(i)
        }
 }
 
@@ -1099,7 +1101,7 @@ func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType
 }
 
 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
-       return t._pendingPieces.Contains(uint32(index))
+       return !t._pendingPieces.IsEmpty() && t._pendingPieces.Contains(uint32(index))
 }
 
 // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
@@ -1256,7 +1258,7 @@ func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChange
        if !t.closed.IsSet() {
                // It would be possible to filter on pure-priority changes here to avoid churning the piece
                // request order.
-               t.updatePieceRequestOrder(piece)
+               t.updatePieceRequestOrderPiece(piece)
        }
        p := &t.pieces[piece]
        newPrio := p.uncachedPriority()
@@ -1269,9 +1271,10 @@ func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChange
 }
 
 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
-       if t.updatePiecePriorityNoTriggers(piece) {
+       if t.updatePiecePriorityNoTriggers(piece) && !t.disableTriggers {
                t.onPiecePendingTriggers(piece, reason)
        }
+       t.updatePieceRequestOrderPiece(piece)
 }
 
 func (t *Torrent) updateAllPiecePriorities(reason string) {
@@ -1415,13 +1418,17 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
        } else {
                t._completedPieces.Remove(x)
        }
-       p.t.updatePieceRequestOrder(piece)
+       p.t.updatePieceRequestOrderPiece(piece)
        t.updateComplete()
        if complete && len(p.dirtiers) != 0 {
                t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
        }
        if changed {
-               t.logger.Levelf(log.Debug, "piece %d completion changed: %+v -> %+v", piece, cached, uncached)
+               //slog.Debug(
+               //      "piece completion changed",
+               //      slog.Int("piece", piece),
+               //      slog.Any("from", cached),
+               //      slog.Any("to", uncached))
                t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
        }
        return changed
index de0cce0a5e699837153e02646e197bd3f0b1fbd8..8d480c2e9a4962807be42ca7ef91721600d538aa 100644 (file)
@@ -1,10 +1,14 @@
 package torrent
 
 import (
-       "github.com/anacrolix/torrent/typed-roaring"
+       typedRoaring "github.com/anacrolix/torrent/typed-roaring"
 )
 
-func iterBitmapUnsetInRange[T typedRoaring.BitConstraint](it *typedRoaring.Iterator[T], start, end T, f func(T)) {
+func iterBitmapUnsetInRange[T typedRoaring.BitConstraint](
+       it *typedRoaring.Iterator[T],
+       start, end T,
+       f func(T),
+) {
        it.AdvanceIfNeeded(start)
        lastDirty := start - 1
        for it.HasNext() {