From 6ae9e00e9377cf1ad20ac70e0e0851ce8e216718 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 27 May 2025 00:29:07 +1000 Subject: [PATCH] Add optimized in-memory PieceCompletion.GetRange --- storage/file-piece.go | 49 ++++++++++++----- storage/map-piece-completion.go | 95 +++++++++++++++++++++++++++++---- storage/piece-completion.go | 29 ++++++++++ 3 files changed, 151 insertions(+), 22 deletions(-) diff --git a/storage/file-piece.go b/storage/file-piece.go index da23c486..f4ab77a1 100644 --- a/storage/file-piece.go +++ b/storage/file-piece.go @@ -9,6 +9,9 @@ import ( "log/slog" "os" + g "github.com/anacrolix/generics" + "github.com/anacrolix/missinggo/v2/panicif" + "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/segments" ) @@ -113,19 +116,14 @@ func (me *filePieceImpl) MarkComplete() (err error) { if err != nil { return } -nextFile: - for i, f := range me.pieceFiles() { - for p := f.beginPieceIndex; p < f.endPieceIndex; p++ { - _ = i - //fmt.Printf("%v %#v %v\n", i, f, p) - cmpl := me.t.getCompletion(p) - if cmpl.Err != nil { - err = fmt.Errorf("error getting completion for piece %d: %w", p, cmpl.Err) - return - } - if !cmpl.Ok || !cmpl.Complete { - continue nextFile - } + for _, f := range me.pieceFiles() { + res := me.allFilePiecesComplete(f) + if res.Err != nil { + err = res.Err + return + } + if !res.Ok { + continue } err = me.promotePartFile(f) if err != nil { @@ -136,6 +134,31 @@ nextFile: return } +func (me *filePieceImpl) allFilePiecesComplete(f file) (ret g.Result[bool]) { + next, stop := iter.Pull(GetPieceCompletionRange( + me.t.pieceCompletion(), + me.t.infoHash, + f.beginPieceIndex, + f.endPieceIndex, + )) + defer stop() + for p := f.beginPieceIndex; p < f.endPieceIndex; p++ { + cmpl, ok := next() + panicif.False(ok) + if cmpl.Err != nil { + ret.Err = fmt.Errorf("error getting completion for piece %d: %w", p, cmpl.Err) + return + } + if !cmpl.Ok || !cmpl.Complete { + return + } + } + _, ok := next() + panicif.True(ok) + ret.SetOk(true) + return +} + func (me *filePieceImpl) MarkNotComplete() (err error) { err = me.pieceCompletion().Set(me.pieceKey(), false) if err != nil { diff --git a/storage/map-piece-completion.go b/storage/map-piece-completion.go index afb1e97b..9db53e3d 100644 --- a/storage/map-piece-completion.go +++ b/storage/map-piece-completion.go @@ -1,34 +1,111 @@ package storage import ( + "iter" "sync" + g "github.com/anacrolix/generics" + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/types/infohash" ) type mapPieceCompletion struct { - // TODO: Generics + // TODO: Generics. map of InfoHash to *memoryTorrentJustComplete. m sync.Map } -var _ PieceCompletion = (*mapPieceCompletion)(nil) +type ( + justComplete = g.Option[bool] + memoryTorrentJustComplete struct { + mu sync.RWMutex + state []justComplete + } +) + +func (me *memoryTorrentJustComplete) Get(i int) justComplete { + me.mu.RLock() + defer me.mu.RUnlock() + return me.getLocked(i) +} + +func (me *memoryTorrentJustComplete) getLocked(i int) justComplete { + if i >= len(me.state) { + return g.None[bool]() + } + return me.state[i] +} + +func (me *memoryTorrentJustComplete) Set(i int, complete bool) { + me.mu.Lock() + defer me.mu.Unlock() + for i >= len(me.state) { + me.state = append(me.state, g.None[bool]()) + } + me.state[i].Set(complete) +} + +func (me *memoryTorrentJustComplete) GetRange(begin, end int) iter.Seq[justComplete] { + me.mu.RLock() + defer me.mu.RUnlock() + return func(yield func(justComplete) bool) { + for i := begin; i < end; i++ { + if !yield(me.getLocked(i)) { + return + } + } + } +} + +var _ interface { + PieceCompletion + PieceCompletionGetRanger +} = (*mapPieceCompletion)(nil) func NewMapPieceCompletion() PieceCompletion { return &mapPieceCompletion{} } -func (*mapPieceCompletion) Close() error { return nil } +func (me *mapPieceCompletion) Close() error { + me.m.Clear() + return nil +} func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (c Completion, err error) { - v, ok := me.m.Load(pk) - if ok { - c.Complete = v.(bool) + v, ok := me.m.Load(pk.InfoHash) + if !ok { + return } - c.Ok = ok + jcs := v.(*memoryTorrentJustComplete) + c.Complete, c.Ok = jcs.Get(pk.Index).AsTuple() return } -func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, b bool) error { - me.m.Store(pk, b) +func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, complete bool) error { + v, ok := me.m.Load(pk.InfoHash) + if !ok { + v, _ = me.m.LoadOrStore(pk.InfoHash, &memoryTorrentJustComplete{}) + } + t := v.(*memoryTorrentJustComplete) + t.Set(pk.Index, complete) return nil } + +func (me *mapPieceCompletion) GetRange(ih infohash.T, begin, end int) iter.Seq[Completion] { + return func(yield func(Completion) bool) { + v, ok := me.m.Load(ih) + if !ok { + return + } + t := v.(*memoryTorrentJustComplete) + for jc := range t.GetRange(begin, end) { + if !yield(Completion{ + Err: nil, + Ok: jc.Ok, + Complete: jc.Value, + }) { + return + } + } + } +} diff --git a/storage/piece-completion.go b/storage/piece-completion.go index bf6dd41b..2750321b 100644 --- a/storage/piece-completion.go +++ b/storage/piece-completion.go @@ -1,11 +1,14 @@ package storage import ( + "cmp" + "iter" "os" "github.com/anacrolix/log" "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/types/infohash" ) type PieceCompletionGetSetter interface { @@ -21,6 +24,32 @@ type PieceCompletion interface { Close() error } +// Optional interface with optimized Get for ranges. Use GetPieceCompletionRange wrapper to abstract +// over it not being implemented. +type PieceCompletionGetRanger interface { + GetRange(_ infohash.T, begin, end int) iter.Seq[Completion] +} + +// Get piece completion as an iterator. Should be faster for long sequences of Gets. Uses optional +// interface PieceCompletionGetRanger if implemented. +func GetPieceCompletionRange(pc PieceCompletion, ih infohash.T, begin, end int) iter.Seq[Completion] { + if a, ok := pc.(PieceCompletionGetRanger); ok { + return a.GetRange(ih, begin, end) + } + return func(yield func(Completion) bool) { + for i := begin; i < end; i++ { + c, err := pc.Get(metainfo.PieceKey{ + InfoHash: ih, + Index: i, + }) + c.Err = cmp.Or(c.Err, err) + if !yield(c) { + return + } + } + } +} + func pieceCompletionForDir(dir string) (ret PieceCompletion) { // This should be happening before sqlite attempts to open a database in the intended directory. os.MkdirAll(dir, 0o700) -- 2.51.0