]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Handle downgrading part files
authorMatt Joiner <anacrolix@gmail.com>
Tue, 29 Apr 2025 06:21:21 +0000 (16:21 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 29 Apr 2025 06:21:21 +0000 (16:21 +1000)
metainfo/piece.go
storage/file-piece.go
storage/file.go
storage/perms.go

index 87950a5187ad1d0bca1c9e0f0f4cdc7da3b86d23..2adfa3be4f100ad5ad06e2719b8fac387ba706be 100644 (file)
@@ -1,6 +1,7 @@
 package metainfo
 
 import (
+       "fmt"
        "iter"
 
        g "github.com/anacrolix/generics"
@@ -11,6 +12,10 @@ type Piece struct {
        i    pieceIndex
 }
 
+func (p Piece) String() string {
+       return fmt.Sprintf("metainfo.Piece(Info.Name=%q, i=%v)", p.Info.Name, p.i)
+}
+
 type pieceIndex = int
 
 func (p Piece) Length() int64 {
index 108911a8c4f75f6b28d60f7236e0e202348685b3..985cca924b50536eda940cd9f5686dd1c5bac4bd 100644 (file)
@@ -1,15 +1,19 @@
 package storage
 
 import (
+       "errors"
        "fmt"
        "io"
+       "io/fs"
        "iter"
+       "log/slog"
        "os"
 
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/segments"
 )
 
+// Piece within File storage.
 type filePieceImpl struct {
        t *fileTorrentImpl
        p metainfo.Piece
@@ -19,75 +23,103 @@ type filePieceImpl struct {
 
 var _ PieceImpl = (*filePieceImpl)(nil)
 
+func (me *filePieceImpl) logger() *slog.Logger {
+       return me.t.client.opts.Logger
+}
+
 func (me *filePieceImpl) pieceKey() metainfo.PieceKey {
        return metainfo.PieceKey{me.t.infoHash, me.p.Index()}
 }
 
-func (fs *filePieceImpl) extent() segments.Extent {
+func (me *filePieceImpl) extent() segments.Extent {
        return segments.Extent{
-               Start:  fs.p.Offset(),
-               Length: fs.p.Length(),
+               Start:  me.p.Offset(),
+               Length: me.p.Length(),
        }
 }
 
-func (fs *filePieceImpl) pieceFiles() iter.Seq2[int, file] {
+func (me *filePieceImpl) pieceFiles() iter.Seq2[int, file] {
        return func(yield func(int, file) bool) {
-               for fileIndex := range fs.t.segmentLocater.LocateIter(fs.extent()) {
-                       if !yield(fileIndex, fs.t.files[fileIndex]) {
+               for fileIndex := range me.t.segmentLocater.LocateIter(me.extent()) {
+                       if !yield(fileIndex, me.t.files[fileIndex]) {
                                return
                        }
                }
        }
 }
 
-func (fs *filePieceImpl) Completion() Completion {
-       c := fs.t.getCompletion(fs.p.Index())
-       if !c.Ok {
+func (me *filePieceImpl) pieceCompletion() PieceCompletion {
+       return me.t.client.opts.PieceCompletion
+}
+
+func (me *filePieceImpl) Completion() Completion {
+       c := me.t.getCompletion(me.p.Index())
+       if !c.Ok || c.Err != nil {
                return c
        }
        verified := true
        if c.Complete {
+               noFiles := true
                // If it's allegedly complete, check that its constituent files have the necessary length.
-               if !fs.t.segmentLocater.Locate(
-                       fs.extent(),
-                       func(i int, extent segments.Extent) bool {
-                               file := fs.t.files[i]
-                               s, err := os.Stat(file.safeOsPath)
-                               if err != nil || s.Size() < extent.Start+extent.Length {
-                                       verified = false
-                                       return false
-                               }
-                               return true
-                       }) {
+               for i, extent := range me.t.segmentLocater.LocateIter(me.extent()) {
+                       noFiles = false
+                       file := me.t.files[i]
+                       s, err := os.Stat(file.safeOsPath)
+                       if errors.Is(err, fs.ErrNotExist) {
+                               s, err = os.Stat(file.partFilePath())
+                       }
+                       if err != nil {
+                               me.logger().Warn(
+                                       "error checking file for piece marked as complete",
+                                       "piece", me.p,
+                                       "file", file.safeOsPath,
+                                       "err", err)
+                       } else if s.Size() < extent.Start+extent.Length {
+                               me.logger().Error(
+                                       "file too small for piece marked as complete",
+                                       "piece", me.p,
+                                       "file", file.safeOsPath,
+                                       "size", s.Size(),
+                                       "extent", extent)
+                       } else {
+                               continue
+                       }
+                       verified = false
+                       break
+               }
+               // This probably belongs in a wrapper helper of some kind. I will retain the logic for now.
+               if noFiles {
                        panic("files do not cover piece extent")
                }
        }
 
        if !verified {
-               // The completion was wrong, fix it. TODO: Should we use MarkNotComplete?
-               c.Complete = false
-               fs.t.completion.Set(fs.pieceKey(), false)
+               // The completion was wrong, fix it.
+               err := me.MarkNotComplete()
+               if err != nil {
+                       c.Err = fmt.Errorf("error marking piece not complete: %w", err)
+               }
        }
 
        return c
 }
 
-func (fs *filePieceImpl) MarkComplete() (err error) {
-       err = fs.t.completion.Set(fs.pieceKey(), true)
+func (me *filePieceImpl) MarkComplete() (err error) {
+       err = me.pieceCompletion().Set(me.pieceKey(), true)
        if err != nil {
                return
        }
 nextFile:
-       for i, f := range fs.pieceFiles() {
+       for i, f := range me.pieceFiles() {
                for p := f.beginPieceIndex; p < f.endPieceIndex; p++ {
                        _ = i
                        //fmt.Printf("%v %#v %v\n", i, f, p)
-                       cmpl := fs.t.getCompletion(p)
+                       cmpl := me.t.getCompletion(p)
                        if !cmpl.Ok || !cmpl.Complete {
                                continue nextFile
                        }
                }
-               err = fs.t.promotePartFile(f)
+               err = me.promotePartFile(f)
                if err != nil {
                        err = fmt.Errorf("error promoting part file %q: %w", f.safeOsPath, err)
                        return
@@ -96,6 +128,73 @@ nextFile:
        return
 }
 
-func (fs *filePieceImpl) MarkNotComplete() error {
-       return fs.t.completion.Set(fs.pieceKey(), false)
+func (me *filePieceImpl) MarkNotComplete() (err error) {
+       err = me.pieceCompletion().Set(me.pieceKey(), false)
+       if err != nil {
+               return
+       }
+       for i, f := range me.pieceFiles() {
+               _ = i
+               err = me.onFileNotComplete(f)
+               if err != nil {
+                       err = fmt.Errorf("preparing incomplete file %q: %w", f.safeOsPath, err)
+                       return
+               }
+       }
+       return
+
+}
+
+func (me *filePieceImpl) promotePartFile(f file) (err error) {
+       if me.partFiles() {
+               err = os.Rename(f.partFilePath(), f.safeOsPath)
+               // If we get ENOENT, the file may already be in the final location.
+               if err != nil && !errors.Is(err, fs.ErrNotExist) {
+                       err = fmt.Errorf("renaming part file: %w", err)
+                       return
+               }
+       }
+       info, err := os.Stat(f.safeOsPath)
+       if err != nil {
+               err = fmt.Errorf("statting file: %w", err)
+               return
+       }
+       // Clear writability for the file.
+       err = os.Chmod(f.safeOsPath, info.Mode().Perm()&^0o222)
+       if err != nil {
+               err = fmt.Errorf("setting file to read-only: %w", err)
+               return
+       }
+       return
+}
+
+func (me *filePieceImpl) onFileNotComplete(f file) (err error) {
+       if me.partFiles() {
+               err = os.Rename(f.safeOsPath, f.partFilePath())
+               // If we get ENOENT, the file may already be in the final location.
+               if err != nil && !errors.Is(err, fs.ErrNotExist) {
+                       err = fmt.Errorf("renaming incomplete file: %w", err)
+                       return
+               }
+       }
+       info, err := os.Stat(me.pathForWrite(f))
+       if err != nil {
+               err = fmt.Errorf("statting file: %w", err)
+               return
+       }
+       // Ensure the file is writable
+       err = os.Chmod(f.safeOsPath, info.Mode().Perm()|(filePerm&0o222))
+       if err != nil {
+               err = fmt.Errorf("setting file to read-only: %w", err)
+               return
+       }
+       return
+}
+
+func (me *filePieceImpl) pathForWrite(f file) string {
+       return me.t.pathForWrite(f)
+}
+
+func (me *filePieceImpl) partFiles() bool {
+       return me.t.partFiles()
 }
index d4df5511c0ad0a398696b633d66c58f621552cc1..42ef20c55df8c8a0e3ac0502df02f5a1f60c5c3f 100644 (file)
@@ -5,6 +5,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "io/fs"
        "iter"
        "log/slog"
        "os"
@@ -35,6 +36,7 @@ type NewFileClientOpts struct {
        TorrentDirMaker TorrentDirFilePathMaker
        PieceCompletion PieceCompletion
        UsePartFiles    g.Option[bool]
+       Logger          *slog.Logger
 }
 
 // NewFileOpts creates a new ClientImplCloser that stores files using the OS native filesystem.
@@ -54,10 +56,13 @@ func NewFileOpts(opts NewFileClientOpts) ClientImplCloser {
        if opts.PieceCompletion == nil {
                opts.PieceCompletion = pieceCompletionForDir(opts.ClientBaseDir)
        }
-       return fileClientImpl{opts}
+       if opts.Logger == nil {
+               opts.Logger = log.Default.Slogger()
+       }
+       return &fileClientImpl{opts}
 }
 
-func (me fileClientImpl) Close() error {
+func (me *fileClientImpl) Close() error {
        return me.opts.PieceCompletion.Close()
 }
 
@@ -73,7 +78,7 @@ func enumIter[T any](i iter.Seq[T]) iter.Seq2[int, T] {
        }
 }
 
-func (fs fileClientImpl) OpenTorrent(
+func (fs *fileClientImpl) OpenTorrent(
        ctx context.Context,
        info *metainfo.Info,
        infoHash metainfo.Hash,
@@ -111,8 +116,7 @@ func (fs fileClientImpl) OpenTorrent(
                files,
                info.FileSegmentsIndex(),
                infoHash,
-               fs.opts.PieceCompletion,
-               fs.opts.UsePartFiles.UnwrapOr(true),
+               fs,
        }
        return TorrentImpl{
                Piece: t.Piece,
@@ -129,40 +133,32 @@ type file struct {
        length          int64
 }
 
+func (f file) partFilePath() string {
+       return f.safeOsPath + ".part"
+}
+
 type fileTorrentImpl struct {
        info           *metainfo.Info
        files          []file
        segmentLocater segments.Index
        infoHash       metainfo.Hash
-       completion     PieceCompletion
-       partFiles      bool
+       // Save memory by pointing to the other data.
+       client *fileClientImpl
 }
 
-func (fts *fileTorrentImpl) promotePartFile(f file) (err error) {
-       //fmt.Printf("promoting %q\n", f.safeOsPath)
-       if fts.partFiles {
-               err = os.Rename(f.safeOsPath+".part", f.safeOsPath)
-               if err != nil {
-                       err = fmt.Errorf("renaming part file: %w", err)
-                       return
-               }
-       }
-       info, err := os.Stat(f.safeOsPath)
-       if err != nil {
-               err = fmt.Errorf("statting file: %w", err)
-               return
-       }
-       // Clear writability for the file.
-       err = os.Chmod(f.safeOsPath, info.Mode().Perm()&^0o222)
-       if err != nil {
-               err = fmt.Errorf("setting file to read-only: %w", err)
-               return
+func (fts *fileTorrentImpl) partFiles() bool {
+       return fts.client.opts.UsePartFiles.UnwrapOr(true)
+}
+
+func (fts *fileTorrentImpl) pathForWrite(f file) string {
+       if fts.partFiles() {
+               return f.partFilePath()
        }
-       return
+       return f.safeOsPath
 }
 
 func (fts *fileTorrentImpl) getCompletion(piece int) Completion {
-       cmpl, err := fts.completion.Get(metainfo.PieceKey{
+       cmpl, err := fts.client.opts.PieceCompletion.Get(metainfo.PieceKey{
                fts.infoHash, piece,
        })
        cmpl.Err = errors.Join(cmpl.Err, err)
@@ -186,11 +182,9 @@ func (fs *fileTorrentImpl) Close() error {
 }
 
 func fsync(filePath string) (err error) {
-       _ = os.MkdirAll(filepath.Dir(filePath), dirPerm)
-       var f *os.File
-       f, err = os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, filePerm)
-       if err != nil {
-               return err
+       f, err := os.OpenFile(filePath, os.O_WRONLY, filePerm)
+       if err != nil && !errors.Is(err, fs.ErrNotExist) {
+               return
        }
        defer f.Close()
        if err = f.Sync(); err != nil {
@@ -201,7 +195,7 @@ func fsync(filePath string) (err error) {
 
 func (fts *fileTorrentImpl) Flush() error {
        for _, f := range fts.files {
-               if err := fsync(f.safeOsPath); err != nil {
+               if err := fsync(fts.pathForWrite(f)); err != nil {
                        return err
                }
        }
@@ -229,8 +223,12 @@ type fileTorrentImplIO struct {
 // Returns EOF on short or missing file.
 func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
        f, err := os.Open(file.safeOsPath)
-       if os.IsNotExist(err) {
-               // File missing is treated the same as a short file.
+       if fst.fts.partFiles() && errors.Is(err, fs.ErrNotExist) {
+               f, err = os.Open(file.partFilePath())
+       }
+       if errors.Is(err, fs.ErrNotExist) {
+               // File missing is treated the same as a short file. Should we propagate this through the
+               // interface now that fs.ErrNotExist is a thing?
                err = io.EOF
                return
        }
@@ -273,7 +271,7 @@ func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
 func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
        // log.Printf("write at %v: %v bytes", off, len(p))
        fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
-               name := fst.fts.files[i].safeOsPath
+               name := fst.fts.pathForWrite(fst.fts.files[i])
                os.MkdirAll(filepath.Dir(name), dirPerm)
                var f *os.File
                f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, filePerm)
index f7c23cea828190caebc889d3b24f711396d68582..b38349d11a7858205ae7da5dcb8ac4e5b446f1fc 100644 (file)
@@ -6,6 +6,6 @@ import (
 
 // Default file permissions for writable OS files.
 const (
-       filePerm os.FileMode = 0o666
-       dirPerm  os.FileMode = 0o777
+       filePerm os.FileMode = 0o644
+       dirPerm  os.FileMode = 0o755
 )