From 6c6be4b121f6271da556e0bffc5c8b162f26632b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 29 Apr 2025 16:21:21 +1000 Subject: [PATCH] Handle downgrading part files --- metainfo/piece.go | 5 ++ storage/file-piece.go | 159 ++++++++++++++++++++++++++++++++++-------- storage/file.go | 72 ++++++++++--------- storage/perms.go | 4 +- 4 files changed, 171 insertions(+), 69 deletions(-) diff --git a/metainfo/piece.go b/metainfo/piece.go index 87950a51..2adfa3be 100644 --- a/metainfo/piece.go +++ b/metainfo/piece.go @@ -1,6 +1,7 @@ package metainfo import ( + "fmt" "iter" g "github.com/anacrolix/generics" @@ -11,6 +12,10 @@ type Piece struct { i pieceIndex } +func (p Piece) String() string { + return fmt.Sprintf("metainfo.Piece(Info.Name=%q, i=%v)", p.Info.Name, p.i) +} + type pieceIndex = int func (p Piece) Length() int64 { diff --git a/storage/file-piece.go b/storage/file-piece.go index 108911a8..985cca92 100644 --- a/storage/file-piece.go +++ b/storage/file-piece.go @@ -1,15 +1,19 @@ package storage import ( + "errors" "fmt" "io" + "io/fs" "iter" + "log/slog" "os" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/segments" ) +// Piece within File storage. type filePieceImpl struct { t *fileTorrentImpl p metainfo.Piece @@ -19,75 +23,103 @@ type filePieceImpl struct { var _ PieceImpl = (*filePieceImpl)(nil) +func (me *filePieceImpl) logger() *slog.Logger { + return me.t.client.opts.Logger +} + func (me *filePieceImpl) pieceKey() metainfo.PieceKey { return metainfo.PieceKey{me.t.infoHash, me.p.Index()} } -func (fs *filePieceImpl) extent() segments.Extent { +func (me *filePieceImpl) extent() segments.Extent { return segments.Extent{ - Start: fs.p.Offset(), - Length: fs.p.Length(), + Start: me.p.Offset(), + Length: me.p.Length(), } } -func (fs *filePieceImpl) pieceFiles() iter.Seq2[int, file] { +func (me *filePieceImpl) pieceFiles() iter.Seq2[int, file] { return func(yield func(int, file) bool) { - for fileIndex := range fs.t.segmentLocater.LocateIter(fs.extent()) { - if !yield(fileIndex, fs.t.files[fileIndex]) { + for fileIndex := range me.t.segmentLocater.LocateIter(me.extent()) { + if !yield(fileIndex, me.t.files[fileIndex]) { return } } } } -func (fs *filePieceImpl) Completion() Completion { - c := fs.t.getCompletion(fs.p.Index()) - if !c.Ok { +func (me *filePieceImpl) pieceCompletion() PieceCompletion { + return me.t.client.opts.PieceCompletion +} + +func (me *filePieceImpl) Completion() Completion { + c := me.t.getCompletion(me.p.Index()) + if !c.Ok || c.Err != nil { return c } verified := true if c.Complete { + noFiles := true // If it's allegedly complete, check that its constituent files have the necessary length. - if !fs.t.segmentLocater.Locate( - fs.extent(), - func(i int, extent segments.Extent) bool { - file := fs.t.files[i] - s, err := os.Stat(file.safeOsPath) - if err != nil || s.Size() < extent.Start+extent.Length { - verified = false - return false - } - return true - }) { + for i, extent := range me.t.segmentLocater.LocateIter(me.extent()) { + noFiles = false + file := me.t.files[i] + s, err := os.Stat(file.safeOsPath) + if errors.Is(err, fs.ErrNotExist) { + s, err = os.Stat(file.partFilePath()) + } + if err != nil { + me.logger().Warn( + "error checking file for piece marked as complete", + "piece", me.p, + "file", file.safeOsPath, + "err", err) + } else if s.Size() < extent.Start+extent.Length { + me.logger().Error( + "file too small for piece marked as complete", + "piece", me.p, + "file", file.safeOsPath, + "size", s.Size(), + "extent", extent) + } else { + continue + } + verified = false + break + } + // This probably belongs in a wrapper helper of some kind. I will retain the logic for now. + if noFiles { panic("files do not cover piece extent") } } if !verified { - // The completion was wrong, fix it. TODO: Should we use MarkNotComplete? - c.Complete = false - fs.t.completion.Set(fs.pieceKey(), false) + // The completion was wrong, fix it. + err := me.MarkNotComplete() + if err != nil { + c.Err = fmt.Errorf("error marking piece not complete: %w", err) + } } return c } -func (fs *filePieceImpl) MarkComplete() (err error) { - err = fs.t.completion.Set(fs.pieceKey(), true) +func (me *filePieceImpl) MarkComplete() (err error) { + err = me.pieceCompletion().Set(me.pieceKey(), true) if err != nil { return } nextFile: - for i, f := range fs.pieceFiles() { + for i, f := range me.pieceFiles() { for p := f.beginPieceIndex; p < f.endPieceIndex; p++ { _ = i //fmt.Printf("%v %#v %v\n", i, f, p) - cmpl := fs.t.getCompletion(p) + cmpl := me.t.getCompletion(p) if !cmpl.Ok || !cmpl.Complete { continue nextFile } } - err = fs.t.promotePartFile(f) + err = me.promotePartFile(f) if err != nil { err = fmt.Errorf("error promoting part file %q: %w", f.safeOsPath, err) return @@ -96,6 +128,73 @@ nextFile: return } -func (fs *filePieceImpl) MarkNotComplete() error { - return fs.t.completion.Set(fs.pieceKey(), false) +func (me *filePieceImpl) MarkNotComplete() (err error) { + err = me.pieceCompletion().Set(me.pieceKey(), false) + if err != nil { + return + } + for i, f := range me.pieceFiles() { + _ = i + err = me.onFileNotComplete(f) + if err != nil { + err = fmt.Errorf("preparing incomplete file %q: %w", f.safeOsPath, err) + return + } + } + return + +} + +func (me *filePieceImpl) promotePartFile(f file) (err error) { + if me.partFiles() { + err = os.Rename(f.partFilePath(), f.safeOsPath) + // If we get ENOENT, the file may already be in the final location. + if err != nil && !errors.Is(err, fs.ErrNotExist) { + err = fmt.Errorf("renaming part file: %w", err) + return + } + } + info, err := os.Stat(f.safeOsPath) + if err != nil { + err = fmt.Errorf("statting file: %w", err) + return + } + // Clear writability for the file. + err = os.Chmod(f.safeOsPath, info.Mode().Perm()&^0o222) + if err != nil { + err = fmt.Errorf("setting file to read-only: %w", err) + return + } + return +} + +func (me *filePieceImpl) onFileNotComplete(f file) (err error) { + if me.partFiles() { + err = os.Rename(f.safeOsPath, f.partFilePath()) + // If we get ENOENT, the file may already be in the final location. + if err != nil && !errors.Is(err, fs.ErrNotExist) { + err = fmt.Errorf("renaming incomplete file: %w", err) + return + } + } + info, err := os.Stat(me.pathForWrite(f)) + if err != nil { + err = fmt.Errorf("statting file: %w", err) + return + } + // Ensure the file is writable + err = os.Chmod(f.safeOsPath, info.Mode().Perm()|(filePerm&0o222)) + if err != nil { + err = fmt.Errorf("setting file to read-only: %w", err) + return + } + return +} + +func (me *filePieceImpl) pathForWrite(f file) string { + return me.t.pathForWrite(f) +} + +func (me *filePieceImpl) partFiles() bool { + return me.t.partFiles() } diff --git a/storage/file.go b/storage/file.go index d4df5511..42ef20c5 100644 --- a/storage/file.go +++ b/storage/file.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "io/fs" "iter" "log/slog" "os" @@ -35,6 +36,7 @@ type NewFileClientOpts struct { TorrentDirMaker TorrentDirFilePathMaker PieceCompletion PieceCompletion UsePartFiles g.Option[bool] + Logger *slog.Logger } // NewFileOpts creates a new ClientImplCloser that stores files using the OS native filesystem. @@ -54,10 +56,13 @@ func NewFileOpts(opts NewFileClientOpts) ClientImplCloser { if opts.PieceCompletion == nil { opts.PieceCompletion = pieceCompletionForDir(opts.ClientBaseDir) } - return fileClientImpl{opts} + if opts.Logger == nil { + opts.Logger = log.Default.Slogger() + } + return &fileClientImpl{opts} } -func (me fileClientImpl) Close() error { +func (me *fileClientImpl) Close() error { return me.opts.PieceCompletion.Close() } @@ -73,7 +78,7 @@ func enumIter[T any](i iter.Seq[T]) iter.Seq2[int, T] { } } -func (fs fileClientImpl) OpenTorrent( +func (fs *fileClientImpl) OpenTorrent( ctx context.Context, info *metainfo.Info, infoHash metainfo.Hash, @@ -111,8 +116,7 @@ func (fs fileClientImpl) OpenTorrent( files, info.FileSegmentsIndex(), infoHash, - fs.opts.PieceCompletion, - fs.opts.UsePartFiles.UnwrapOr(true), + fs, } return TorrentImpl{ Piece: t.Piece, @@ -129,40 +133,32 @@ type file struct { length int64 } +func (f file) partFilePath() string { + return f.safeOsPath + ".part" +} + type fileTorrentImpl struct { info *metainfo.Info files []file segmentLocater segments.Index infoHash metainfo.Hash - completion PieceCompletion - partFiles bool + // Save memory by pointing to the other data. + client *fileClientImpl } -func (fts *fileTorrentImpl) promotePartFile(f file) (err error) { - //fmt.Printf("promoting %q\n", f.safeOsPath) - if fts.partFiles { - err = os.Rename(f.safeOsPath+".part", f.safeOsPath) - if err != nil { - err = fmt.Errorf("renaming part file: %w", err) - return - } - } - info, err := os.Stat(f.safeOsPath) - if err != nil { - err = fmt.Errorf("statting file: %w", err) - return - } - // Clear writability for the file. - err = os.Chmod(f.safeOsPath, info.Mode().Perm()&^0o222) - if err != nil { - err = fmt.Errorf("setting file to read-only: %w", err) - return +func (fts *fileTorrentImpl) partFiles() bool { + return fts.client.opts.UsePartFiles.UnwrapOr(true) +} + +func (fts *fileTorrentImpl) pathForWrite(f file) string { + if fts.partFiles() { + return f.partFilePath() } - return + return f.safeOsPath } func (fts *fileTorrentImpl) getCompletion(piece int) Completion { - cmpl, err := fts.completion.Get(metainfo.PieceKey{ + cmpl, err := fts.client.opts.PieceCompletion.Get(metainfo.PieceKey{ fts.infoHash, piece, }) cmpl.Err = errors.Join(cmpl.Err, err) @@ -186,11 +182,9 @@ func (fs *fileTorrentImpl) Close() error { } func fsync(filePath string) (err error) { - _ = os.MkdirAll(filepath.Dir(filePath), dirPerm) - var f *os.File - f, err = os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, filePerm) - if err != nil { - return err + f, err := os.OpenFile(filePath, os.O_WRONLY, filePerm) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return } defer f.Close() if err = f.Sync(); err != nil { @@ -201,7 +195,7 @@ func fsync(filePath string) (err error) { func (fts *fileTorrentImpl) Flush() error { for _, f := range fts.files { - if err := fsync(f.safeOsPath); err != nil { + if err := fsync(fts.pathForWrite(f)); err != nil { return err } } @@ -229,8 +223,12 @@ type fileTorrentImplIO struct { // Returns EOF on short or missing file. func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) { f, err := os.Open(file.safeOsPath) - if os.IsNotExist(err) { - // File missing is treated the same as a short file. + if fst.fts.partFiles() && errors.Is(err, fs.ErrNotExist) { + f, err = os.Open(file.partFilePath()) + } + if errors.Is(err, fs.ErrNotExist) { + // File missing is treated the same as a short file. Should we propagate this through the + // interface now that fs.ErrNotExist is a thing? err = io.EOF return } @@ -273,7 +271,7 @@ func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) { func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) { // log.Printf("write at %v: %v bytes", off, len(p)) fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool { - name := fst.fts.files[i].safeOsPath + name := fst.fts.pathForWrite(fst.fts.files[i]) os.MkdirAll(filepath.Dir(name), dirPerm) var f *os.File f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, filePerm) diff --git a/storage/perms.go b/storage/perms.go index f7c23cea..b38349d1 100644 --- a/storage/perms.go +++ b/storage/perms.go @@ -6,6 +6,6 @@ import ( // Default file permissions for writable OS files. const ( - filePerm os.FileMode = 0o666 - dirPerm os.FileMode = 0o777 + filePerm os.FileMode = 0o644 + dirPerm os.FileMode = 0o755 ) -- 2.51.0