"log/slog"
"os"
+ g "github.com/anacrolix/generics"
+ "github.com/anacrolix/missinggo/v2/panicif"
+
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/segments"
)
if err != nil {
return
}
-nextFile:
- for i, f := range me.pieceFiles() {
- for p := f.beginPieceIndex; p < f.endPieceIndex; p++ {
- _ = i
- //fmt.Printf("%v %#v %v\n", i, f, p)
- cmpl := me.t.getCompletion(p)
- if cmpl.Err != nil {
- err = fmt.Errorf("error getting completion for piece %d: %w", p, cmpl.Err)
- return
- }
- if !cmpl.Ok || !cmpl.Complete {
- continue nextFile
- }
+ for _, f := range me.pieceFiles() {
+ res := me.allFilePiecesComplete(f)
+ if res.Err != nil {
+ err = res.Err
+ return
+ }
+ if !res.Ok {
+ continue
}
err = me.promotePartFile(f)
if err != nil {
return
}
+func (me *filePieceImpl) allFilePiecesComplete(f file) (ret g.Result[bool]) {
+ next, stop := iter.Pull(GetPieceCompletionRange(
+ me.t.pieceCompletion(),
+ me.t.infoHash,
+ f.beginPieceIndex,
+ f.endPieceIndex,
+ ))
+ defer stop()
+ for p := f.beginPieceIndex; p < f.endPieceIndex; p++ {
+ cmpl, ok := next()
+ panicif.False(ok)
+ if cmpl.Err != nil {
+ ret.Err = fmt.Errorf("error getting completion for piece %d: %w", p, cmpl.Err)
+ return
+ }
+ if !cmpl.Ok || !cmpl.Complete {
+ return
+ }
+ }
+ _, ok := next()
+ panicif.True(ok)
+ ret.SetOk(true)
+ return
+}
+
func (me *filePieceImpl) MarkNotComplete() (err error) {
err = me.pieceCompletion().Set(me.pieceKey(), false)
if err != nil {
package storage
import (
+ "iter"
"sync"
+ g "github.com/anacrolix/generics"
+
"github.com/anacrolix/torrent/metainfo"
+ "github.com/anacrolix/torrent/types/infohash"
)
type mapPieceCompletion struct {
- // TODO: Generics
+ // TODO: Generics. map of InfoHash to *memoryTorrentJustComplete.
m sync.Map
}
-var _ PieceCompletion = (*mapPieceCompletion)(nil)
+type (
+ justComplete = g.Option[bool]
+ memoryTorrentJustComplete struct {
+ mu sync.RWMutex
+ state []justComplete
+ }
+)
+
+func (me *memoryTorrentJustComplete) Get(i int) justComplete {
+ me.mu.RLock()
+ defer me.mu.RUnlock()
+ return me.getLocked(i)
+}
+
+func (me *memoryTorrentJustComplete) getLocked(i int) justComplete {
+ if i >= len(me.state) {
+ return g.None[bool]()
+ }
+ return me.state[i]
+}
+
+func (me *memoryTorrentJustComplete) Set(i int, complete bool) {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ for i >= len(me.state) {
+ me.state = append(me.state, g.None[bool]())
+ }
+ me.state[i].Set(complete)
+}
+
+func (me *memoryTorrentJustComplete) GetRange(begin, end int) iter.Seq[justComplete] {
+ me.mu.RLock()
+ defer me.mu.RUnlock()
+ return func(yield func(justComplete) bool) {
+ for i := begin; i < end; i++ {
+ if !yield(me.getLocked(i)) {
+ return
+ }
+ }
+ }
+}
+
+var _ interface {
+ PieceCompletion
+ PieceCompletionGetRanger
+} = (*mapPieceCompletion)(nil)
func NewMapPieceCompletion() PieceCompletion {
return &mapPieceCompletion{}
}
-func (*mapPieceCompletion) Close() error { return nil }
+func (me *mapPieceCompletion) Close() error {
+ me.m.Clear()
+ return nil
+}
func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (c Completion, err error) {
- v, ok := me.m.Load(pk)
- if ok {
- c.Complete = v.(bool)
+ v, ok := me.m.Load(pk.InfoHash)
+ if !ok {
+ return
}
- c.Ok = ok
+ jcs := v.(*memoryTorrentJustComplete)
+ c.Complete, c.Ok = jcs.Get(pk.Index).AsTuple()
return
}
-func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
- me.m.Store(pk, b)
+func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, complete bool) error {
+ v, ok := me.m.Load(pk.InfoHash)
+ if !ok {
+ v, _ = me.m.LoadOrStore(pk.InfoHash, &memoryTorrentJustComplete{})
+ }
+ t := v.(*memoryTorrentJustComplete)
+ t.Set(pk.Index, complete)
return nil
}
+
+func (me *mapPieceCompletion) GetRange(ih infohash.T, begin, end int) iter.Seq[Completion] {
+ return func(yield func(Completion) bool) {
+ v, ok := me.m.Load(ih)
+ if !ok {
+ return
+ }
+ t := v.(*memoryTorrentJustComplete)
+ for jc := range t.GetRange(begin, end) {
+ if !yield(Completion{
+ Err: nil,
+ Ok: jc.Ok,
+ Complete: jc.Value,
+ }) {
+ return
+ }
+ }
+ }
+}
package storage
import (
+ "cmp"
+ "iter"
"os"
"github.com/anacrolix/log"
"github.com/anacrolix/torrent/metainfo"
+ "github.com/anacrolix/torrent/types/infohash"
)
type PieceCompletionGetSetter interface {
Close() error
}
+// Optional interface with optimized Get for ranges. Use GetPieceCompletionRange wrapper to abstract
+// over it not being implemented.
+type PieceCompletionGetRanger interface {
+ GetRange(_ infohash.T, begin, end int) iter.Seq[Completion]
+}
+
+// Get piece completion as an iterator. Should be faster for long sequences of Gets. Uses optional
+// interface PieceCompletionGetRanger if implemented.
+func GetPieceCompletionRange(pc PieceCompletion, ih infohash.T, begin, end int) iter.Seq[Completion] {
+ if a, ok := pc.(PieceCompletionGetRanger); ok {
+ return a.GetRange(ih, begin, end)
+ }
+ return func(yield func(Completion) bool) {
+ for i := begin; i < end; i++ {
+ c, err := pc.Get(metainfo.PieceKey{
+ InfoHash: ih,
+ Index: i,
+ })
+ c.Err = cmp.Or(c.Err, err)
+ if !yield(c) {
+ return
+ }
+ }
+ }
+}
+
func pieceCompletionForDir(dir string) (ret PieceCompletion) {
// This should be happening before sqlite attempts to open a database in the intended directory.
os.MkdirAll(dir, 0o700)