]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add optimized in-memory PieceCompletion.GetRange
authorMatt Joiner <anacrolix@gmail.com>
Mon, 26 May 2025 14:29:07 +0000 (00:29 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 26 May 2025 14:29:07 +0000 (00:29 +1000)
storage/file-piece.go
storage/map-piece-completion.go
storage/piece-completion.go

index da23c4868959b27fa28a11ef2659152b82d709f0..f4ab77a15b011291f1acd3413644e8b01834acc6 100644 (file)
@@ -9,6 +9,9 @@ import (
        "log/slog"
        "os"
 
+       g "github.com/anacrolix/generics"
+       "github.com/anacrolix/missinggo/v2/panicif"
+
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/segments"
 )
@@ -113,19 +116,14 @@ func (me *filePieceImpl) MarkComplete() (err error) {
        if err != nil {
                return
        }
-nextFile:
-       for i, f := range me.pieceFiles() {
-               for p := f.beginPieceIndex; p < f.endPieceIndex; p++ {
-                       _ = i
-                       //fmt.Printf("%v %#v %v\n", i, f, p)
-                       cmpl := me.t.getCompletion(p)
-                       if cmpl.Err != nil {
-                               err = fmt.Errorf("error getting completion for piece %d: %w", p, cmpl.Err)
-                               return
-                       }
-                       if !cmpl.Ok || !cmpl.Complete {
-                               continue nextFile
-                       }
+       for _, f := range me.pieceFiles() {
+               res := me.allFilePiecesComplete(f)
+               if res.Err != nil {
+                       err = res.Err
+                       return
+               }
+               if !res.Ok {
+                       continue
                }
                err = me.promotePartFile(f)
                if err != nil {
@@ -136,6 +134,31 @@ nextFile:
        return
 }
 
+func (me *filePieceImpl) allFilePiecesComplete(f file) (ret g.Result[bool]) {
+       next, stop := iter.Pull(GetPieceCompletionRange(
+               me.t.pieceCompletion(),
+               me.t.infoHash,
+               f.beginPieceIndex,
+               f.endPieceIndex,
+       ))
+       defer stop()
+       for p := f.beginPieceIndex; p < f.endPieceIndex; p++ {
+               cmpl, ok := next()
+               panicif.False(ok)
+               if cmpl.Err != nil {
+                       ret.Err = fmt.Errorf("error getting completion for piece %d: %w", p, cmpl.Err)
+                       return
+               }
+               if !cmpl.Ok || !cmpl.Complete {
+                       return
+               }
+       }
+       _, ok := next()
+       panicif.True(ok)
+       ret.SetOk(true)
+       return
+}
+
 func (me *filePieceImpl) MarkNotComplete() (err error) {
        err = me.pieceCompletion().Set(me.pieceKey(), false)
        if err != nil {
index afb1e97b64d6a1310f0235a5e19cb516f0fbfadd..9db53e3df96f0fd2c1d57ac8fa4f2583adeab90f 100644 (file)
 package storage
 
 import (
+       "iter"
        "sync"
 
+       g "github.com/anacrolix/generics"
+
        "github.com/anacrolix/torrent/metainfo"
+       "github.com/anacrolix/torrent/types/infohash"
 )
 
 type mapPieceCompletion struct {
-       // TODO: Generics
+       // TODO: Generics. map of InfoHash to *memoryTorrentJustComplete.
        m sync.Map
 }
 
-var _ PieceCompletion = (*mapPieceCompletion)(nil)
+type (
+       justComplete              = g.Option[bool]
+       memoryTorrentJustComplete struct {
+               mu    sync.RWMutex
+               state []justComplete
+       }
+)
+
+func (me *memoryTorrentJustComplete) Get(i int) justComplete {
+       me.mu.RLock()
+       defer me.mu.RUnlock()
+       return me.getLocked(i)
+}
+
+func (me *memoryTorrentJustComplete) getLocked(i int) justComplete {
+       if i >= len(me.state) {
+               return g.None[bool]()
+       }
+       return me.state[i]
+}
+
+func (me *memoryTorrentJustComplete) Set(i int, complete bool) {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       for i >= len(me.state) {
+               me.state = append(me.state, g.None[bool]())
+       }
+       me.state[i].Set(complete)
+}
+
+func (me *memoryTorrentJustComplete) GetRange(begin, end int) iter.Seq[justComplete] {
+       me.mu.RLock()
+       defer me.mu.RUnlock()
+       return func(yield func(justComplete) bool) {
+               for i := begin; i < end; i++ {
+                       if !yield(me.getLocked(i)) {
+                               return
+                       }
+               }
+       }
+}
+
+var _ interface {
+       PieceCompletion
+       PieceCompletionGetRanger
+} = (*mapPieceCompletion)(nil)
 
 func NewMapPieceCompletion() PieceCompletion {
        return &mapPieceCompletion{}
 }
 
-func (*mapPieceCompletion) Close() error { return nil }
+func (me *mapPieceCompletion) Close() error {
+       me.m.Clear()
+       return nil
+}
 
 func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (c Completion, err error) {
-       v, ok := me.m.Load(pk)
-       if ok {
-               c.Complete = v.(bool)
+       v, ok := me.m.Load(pk.InfoHash)
+       if !ok {
+               return
        }
-       c.Ok = ok
+       jcs := v.(*memoryTorrentJustComplete)
+       c.Complete, c.Ok = jcs.Get(pk.Index).AsTuple()
        return
 }
 
-func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
-       me.m.Store(pk, b)
+func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, complete bool) error {
+       v, ok := me.m.Load(pk.InfoHash)
+       if !ok {
+               v, _ = me.m.LoadOrStore(pk.InfoHash, &memoryTorrentJustComplete{})
+       }
+       t := v.(*memoryTorrentJustComplete)
+       t.Set(pk.Index, complete)
        return nil
 }
+
+func (me *mapPieceCompletion) GetRange(ih infohash.T, begin, end int) iter.Seq[Completion] {
+       return func(yield func(Completion) bool) {
+               v, ok := me.m.Load(ih)
+               if !ok {
+                       return
+               }
+               t := v.(*memoryTorrentJustComplete)
+               for jc := range t.GetRange(begin, end) {
+                       if !yield(Completion{
+                               Err:      nil,
+                               Ok:       jc.Ok,
+                               Complete: jc.Value,
+                       }) {
+                               return
+                       }
+               }
+       }
+}
index bf6dd41b1a6cf4f57361f0cf6e77a58e063819ad..2750321b597beaf2969c536f51d6bea7e0e0a77a 100644 (file)
@@ -1,11 +1,14 @@
 package storage
 
 import (
+       "cmp"
+       "iter"
        "os"
 
        "github.com/anacrolix/log"
 
        "github.com/anacrolix/torrent/metainfo"
+       "github.com/anacrolix/torrent/types/infohash"
 )
 
 type PieceCompletionGetSetter interface {
@@ -21,6 +24,32 @@ type PieceCompletion interface {
        Close() error
 }
 
+// Optional interface with optimized Get for ranges. Use GetPieceCompletionRange wrapper to abstract
+// over it not being implemented.
+type PieceCompletionGetRanger interface {
+       GetRange(_ infohash.T, begin, end int) iter.Seq[Completion]
+}
+
+// Get piece completion as an iterator. Should be faster for long sequences of Gets. Uses optional
+// interface PieceCompletionGetRanger if implemented.
+func GetPieceCompletionRange(pc PieceCompletion, ih infohash.T, begin, end int) iter.Seq[Completion] {
+       if a, ok := pc.(PieceCompletionGetRanger); ok {
+               return a.GetRange(ih, begin, end)
+       }
+       return func(yield func(Completion) bool) {
+               for i := begin; i < end; i++ {
+                       c, err := pc.Get(metainfo.PieceKey{
+                               InfoHash: ih,
+                               Index:    i,
+                       })
+                       c.Err = cmp.Or(c.Err, err)
+                       if !yield(c) {
+                               return
+                       }
+               }
+       }
+}
+
 func pieceCompletionForDir(dir string) (ret PieceCompletion) {
        // This should be happening before sqlite attempts to open a database in the intended directory.
        os.MkdirAll(dir, 0o700)