]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Set pieces incomplete when files go missing or are truncated
authorMatt Joiner <anacrolix@gmail.com>
Mon, 14 Jul 2025 05:34:15 +0000 (15:34 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 14 Jul 2025 05:34:15 +0000 (15:34 +1000)
metainfo/piece-key.go
metainfo/piece.go
storage/file-client.go
storage/file-misc.go
storage/file-piece.go
storage/file-torrent-io.go
storage/file-torrent.go

index 6ddf065c413b3ef58c28fe051e6b8c587e303fd8..f4bedca58eebf4c018bdfe6fc32103deaf8de05a 100644 (file)
@@ -3,5 +3,5 @@ package metainfo
 // Uniquely identifies a piece.
 type PieceKey struct {
        InfoHash Hash
-       Index    pieceIndex
+       Index    PieceIndex
 }
index 2adfa3be4f100ad5ad06e2719b8fac387ba706be..4c79ddc3f6a29764356d78ab02c0ae541d60bb6e 100644 (file)
@@ -9,14 +9,14 @@ import (
 
 type Piece struct {
        Info *Info // Can we embed the fields here instead, or is it something to do with saving memory?
-       i    pieceIndex
+       i    PieceIndex
 }
 
 func (p Piece) String() string {
        return fmt.Sprintf("metainfo.Piece(Info.Name=%q, i=%v)", p.Info.Name, p.i)
 }
 
-type pieceIndex = int
+type PieceIndex = int
 
 func (p Piece) Length() int64 {
        if p.Info.HasV2() {
index a912fda2242d027c138ca07379033e4f0d361d9b..f4ffb0378b091ea01ee7f1aaaac459a560b1d2f9 100644 (file)
@@ -78,8 +78,9 @@ func (fs *fileClientImpl) OpenTorrent(
        dir := fs.opts.TorrentDirMaker(fs.opts.ClientBaseDir, info, infoHash)
        logger := log.ContextLogger(ctx).Slogger()
        logger.DebugContext(ctx, "opened file torrent storage", slog.String("dir", dir))
-       var files []file
-       for i, fileInfo := range enumIter(info.UpvertedFilesIter()) {
+       metainfoFileInfos := info.UpvertedFiles()
+       files := make([]fileExtra, len(metainfoFileInfos))
+       for i, fileInfo := range metainfoFileInfos {
                filePath := filepath.Join(dir, fs.opts.FilePathMaker(FilePathMakerOpts{
                        Info: info,
                        File: &fileInfo,
@@ -88,24 +89,19 @@ func (fs *fileClientImpl) OpenTorrent(
                        err = fmt.Errorf("file %v: path %q is not sub path of %q", i, filePath, dir)
                        return
                }
-               f := file{
-                       safeOsPath:      filePath,
-                       length:          fileInfo.Length,
-                       beginPieceIndex: fileInfo.BeginPieceIndex(info.PieceLength),
-                       endPieceIndex:   fileInfo.EndPieceIndex(info.PieceLength),
-               }
-               if f.length == 0 {
-                       err = CreateNativeZeroLengthFile(f.safeOsPath)
+               files[i].safeOsPath = filePath
+               if metainfoFileInfos[i].Length == 0 {
+                       err = CreateNativeZeroLengthFile(filePath)
                        if err != nil {
                                err = fmt.Errorf("creating zero length file: %w", err)
                                return
                        }
                }
-               files = append(files, f)
        }
        t := &fileTorrentImpl{
                info,
                files,
+               metainfoFileInfos,
                info.FileSegmentsIndex(),
                infoHash,
                fs,
index 07e98e92ea0ab60b648b455b35d1f99506df29f2..27e0c1266def7fdb4d965ed646532a3d06475dd8 100644 (file)
@@ -8,6 +8,7 @@ import (
        "path/filepath"
        "sync"
 
+       "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/segments"
 )
 
@@ -51,18 +52,41 @@ func CreateNativeZeroLengthFile(name string) error {
        return f.Close()
 }
 
+// Combines data from different locations required to handle files in file storage.
 type file struct {
+       // Required for piece length.
+       *metainfo.Info
+       // Enumerated when info is provided.
+       *metainfo.FileInfo
+       *fileExtra
+}
+
+func (f *file) beginPieceIndex() int {
+       return f.FileInfo.BeginPieceIndex(f.Info.PieceLength)
+}
+
+func (f *file) endPieceIndex() int {
+       return f.FileInfo.EndPieceIndex(f.Info.PieceLength)
+}
+
+func (f *file) length() int64 {
+       return f.FileInfo.Length
+}
+
+func (f *file) torrentOffset() int64 {
+       return f.FileInfo.TorrentOffset
+}
+
+// Extra state in the file storage for each file.
+type fileExtra struct {
        // This protects high level OS file state like partial file name, permission mod, renaming etc.
        mu sync.RWMutex
        // The safe, OS-local file path.
-       safeOsPath      string
-       beginPieceIndex int
-       endPieceIndex   int
-       length          int64
+       safeOsPath string
        // Utility value to help the race detector find issues for us.
        race byte
 }
 
-func (f *file) partFilePath() string {
+func (f *fileExtra) partFilePath() string {
        return f.safeOsPath + ".part"
 }
index 07257a394ea23d60d408c2a9478de22e90863f1e..f6c9de969efb506916227af8e0bdfa9f6fdae16f 100644 (file)
@@ -16,7 +16,7 @@ import (
        "github.com/anacrolix/torrent/segments"
 )
 
-// Piece within File storage.
+// Piece within File storage. This is created on demand.
 type filePieceImpl struct {
        t *fileTorrentImpl
        p metainfo.Piece
@@ -44,10 +44,11 @@ func (me *filePieceImpl) extent() segments.Extent {
        }
 }
 
-func (me *filePieceImpl) pieceFiles() iter.Seq2[int, *file] {
-       return func(yield func(int, *file) bool) {
+func (me *filePieceImpl) pieceFiles() iter.Seq[file] {
+       return func(yield func(file) bool) {
                for fileIndex := range me.t.segmentLocater.LocateIter(me.extent()) {
-                       if !yield(fileIndex, &me.t.files[fileIndex]) {
+                       f := me.t.file(fileIndex)
+                       if !yield(f) {
                                return
                        }
                }
@@ -58,61 +59,93 @@ func (me *filePieceImpl) pieceCompletion() PieceCompletion {
        return me.t.pieceCompletion()
 }
 
-func (me *filePieceImpl) Completion() Completion {
-       c := me.t.getCompletion(me.p.Index())
+func (me *filePieceImpl) Completion() (c Completion) {
+       c = me.t.getCompletion(me.p.Index())
        if !c.Ok || c.Err != nil {
                return c
        }
-       verified := true
        if c.Complete {
+               c = me.checkCompleteFileSizes()
+       }
+       return
+}
+
+func (me *filePieceImpl) iterFileSegments() iter.Seq2[int, segments.Extent] {
+       return func(yield func(int, segments.Extent) bool) {
                noFiles := true
-               // If it's allegedly complete, check that its constituent files have the necessary length.
                for i, extent := range me.t.segmentLocater.LocateIter(me.extent()) {
                        noFiles = false
-                       file := &me.t.files[i]
-                       file.mu.RLock()
-                       s, err := os.Stat(file.safeOsPath)
-                       if me.partFiles() && errors.Is(err, fs.ErrNotExist) {
-                               // Can we use shared files for this? Is it faster?
-                               s, err = os.Stat(file.partFilePath())
+                       if !yield(i, extent) {
+                               return
                        }
-                       file.mu.RUnlock()
-                       if err != nil {
+               }
+               if noFiles {
+                       panic("files do not cover piece extent")
+               }
+       }
+}
+
+// If a piece is complete, check consituent files have the minimum required sizes.
+func (me *filePieceImpl) checkCompleteFileSizes() (c Completion) {
+       c.Complete = true
+       c.Ok = true
+       for i, extent := range me.iterFileSegments() {
+               file := me.t.file(i)
+               file.mu.RLock()
+               s, err := os.Stat(file.safeOsPath)
+               if me.partFiles() && errors.Is(err, fs.ErrNotExist) {
+                       // Can we use shared files for this? Is it faster?
+                       s, err = os.Stat(file.partFilePath())
+               }
+               file.mu.RUnlock()
+               if err != nil {
+                       if errors.Is(err, fs.ErrNotExist) {
                                me.logger().Warn(
-                                       "error checking file for piece marked as complete",
-                                       "piece", me.p,
+                                       "error checking file size for piece marked as complete",
                                        "file", file.safeOsPath,
+                                       "piece", me.p.Index(),
                                        "err", err)
-                       } else if s.Size() < extent.End() {
-                               me.logger().Error(
-                                       "file too small for piece marked as complete",
-                                       "piece", me.p,
-                                       "file", file.safeOsPath,
-                                       "size", s.Size(),
-                                       "extent", extent)
-                       } else {
-                               continue
+                               c.Complete = false
+                               me.markIncompletePieces(&file, 0)
+                               return
                        }
-                       verified = false
-                       break
+                       c.Err = fmt.Errorf("checking file %v: %w", file.safeOsPath, err)
+                       c.Complete = false
+                       return
                }
-               // This probably belongs in a wrapper helper of some kind. I will retain the logic for now.
-               if noFiles {
-                       panic("files do not cover piece extent")
+               if s.Size() < extent.End() {
+                       me.logger().Warn(
+                               "file too small for piece marked as complete",
+                               "piece", me.p.Index(),
+                               "file", file.safeOsPath,
+                               "size", s.Size(),
+                               "extent", extent)
+                       me.markIncompletePieces(&file, s.Size())
+                       c.Complete = false
+                       return
                }
        }
+       return
+}
 
-       if !verified {
-               // The completion was wrong, fix it. TODO: Fix all other affected pieces too so we don't
-               // spam log messages, or record that the file is known to be bad until it comes good again.
-               err := me.MarkNotComplete()
+func (me *filePieceImpl) markIncompletePieces(file *file, size int64) {
+       if size >= file.length() {
+               return
+       }
+       pieceLength := me.t.info.PieceLength
+       begin := metainfo.PieceIndex((file.torrentOffset() + size) / pieceLength)
+       end := metainfo.PieceIndex((file.torrentOffset() + file.length() + pieceLength - 1) / pieceLength)
+       for p := begin; p < end; p++ {
+               key := metainfo.PieceKey{
+                       InfoHash: me.t.infoHash,
+                       Index:    p,
+               }
+               err := me.pieceCompletion().Set(key, false)
                if err != nil {
-                       c.Err = fmt.Errorf("error marking piece not complete: %w", err)
+                       me.logger().Error("error marking piece not complete", "piece", p, "err", err)
+                       return
                }
-               c.Complete = false
        }
-
-       return c
 }
 
 func (me *filePieceImpl) MarkComplete() (err error) {
@@ -120,7 +153,7 @@ func (me *filePieceImpl) MarkComplete() (err error) {
        if err != nil {
                return
        }
-       for _, f := range me.pieceFiles() {
+       for f := range me.pieceFiles() {
                res := me.allFilePiecesComplete(f)
                if res.Err != nil {
                        err = res.Err
@@ -138,15 +171,15 @@ func (me *filePieceImpl) MarkComplete() (err error) {
        return
 }
 
-func (me *filePieceImpl) allFilePiecesComplete(f *file) (ret g.Result[bool]) {
+func (me *filePieceImpl) allFilePiecesComplete(f file) (ret g.Result[bool]) {
        next, stop := iter.Pull(GetPieceCompletionRange(
                me.t.pieceCompletion(),
                me.t.infoHash,
-               f.beginPieceIndex,
-               f.endPieceIndex,
+               f.beginPieceIndex(),
+               f.endPieceIndex(),
        ))
        defer stop()
-       for p := f.beginPieceIndex; p < f.endPieceIndex; p++ {
+       for p := f.beginPieceIndex(); p < f.endPieceIndex(); p++ {
                cmpl, ok := next()
                panicif.False(ok)
                if cmpl.Err != nil {
@@ -168,8 +201,7 @@ func (me *filePieceImpl) MarkNotComplete() (err error) {
        if err != nil {
                return
        }
-       for i, f := range me.pieceFiles() {
-               _ = i
+       for f := range me.pieceFiles() {
                err = me.onFileNotComplete(f)
                if err != nil {
                        err = fmt.Errorf("preparing incomplete file %q: %w", f.safeOsPath, err)
@@ -180,7 +212,7 @@ func (me *filePieceImpl) MarkNotComplete() (err error) {
 
 }
 
-func (me *filePieceImpl) promotePartFile(f *file) (err error) {
+func (me *filePieceImpl) promotePartFile(f file) (err error) {
        f.mu.Lock()
        defer f.mu.Unlock()
        f.race++
@@ -247,7 +279,7 @@ func (me *filePieceImpl) exclRenameIfExists(from, to string) error {
        return nil
 }
 
-func (me *filePieceImpl) onFileNotComplete(f *file) (err error) {
+func (me *filePieceImpl) onFileNotComplete(f file) (err error) {
        f.mu.Lock()
        defer f.mu.Unlock()
        f.race++
@@ -258,7 +290,7 @@ func (me *filePieceImpl) onFileNotComplete(f *file) (err error) {
                        return
                }
        }
-       info, err := os.Stat(me.pathForWrite(f))
+       info, err := os.Stat(me.pathForWrite(&f))
        if errors.Is(err, fs.ErrNotExist) {
                return nil
        }
@@ -267,7 +299,7 @@ func (me *filePieceImpl) onFileNotComplete(f *file) (err error) {
                return
        }
        // Ensure the file is writable
-       err = os.Chmod(me.pathForWrite(f), info.Mode().Perm()|(filePerm&0o222))
+       err = os.Chmod(me.pathForWrite(&f), info.Mode().Perm()|(filePerm&0o222))
        if err != nil {
                err = fmt.Errorf("setting file writable: %w", err)
                return
index b7fdb70db10a148f42b9770b6a9e8ca86a02968f..107f28e9789468429994e02c712839255597b501 100644 (file)
@@ -16,7 +16,7 @@ type fileTorrentImplIO struct {
 }
 
 // Returns EOF on short or missing file.
-func (fst fileTorrentImplIO) readFileAt(file *file, b []byte, off int64) (n int, err error) {
+func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
        fst.fts.logger().Debug("readFileAt", "file.safeOsPath", file.safeOsPath)
        var f sharedFileIf
        file.mu.RLock()
@@ -40,10 +40,10 @@ func (fst fileTorrentImplIO) readFileAt(file *file, b []byte, off int64) (n int,
        }
        defer f.Close()
        // Limit the read to within the expected bounds of this file.
-       if int64(len(b)) > file.length-off {
-               b = b[:file.length-off]
+       if int64(len(b)) > file.length()-off {
+               b = b[:file.length()-off]
        }
-       for off < file.length && len(b) != 0 {
+       for off < file.length() && len(b) != 0 {
                n1, err1 := f.ReadAt(b, off)
                b = b[n1:]
                n += n1
@@ -59,7 +59,7 @@ func (fst fileTorrentImplIO) readFileAt(file *file, b []byte, off int64) (n int,
 // Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF.
 func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
        fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(b))}, func(i int, e segments.Extent) bool {
-               n1, err1 := fst.readFileAt(&fst.fts.files[i], b[:e.Length], e.Start)
+               n1, err1 := fst.readFileAt(fst.fts.file(i), b[:e.Length], e.Start)
                n += n1
                b = b[n1:]
                err = err1
@@ -71,10 +71,10 @@ func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
        return
 }
 
-func (fst fileTorrentImplIO) openForWrite(file *file) (f *os.File, err error) {
+func (fst fileTorrentImplIO) openForWrite(file file) (f *os.File, err error) {
        // It might be possible to have a writable handle shared files cache if we need it.
        fst.fts.logger().Debug("openForWrite", "file.safeOsPath", file.safeOsPath)
-       p := fst.fts.pathForWrite(file)
+       p := fst.fts.pathForWrite(&file)
        f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm)
        if err == nil {
                return
@@ -100,7 +100,7 @@ func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
                segments.Extent{off, int64(len(p))},
                func(i int, e segments.Extent) bool {
                        var f *os.File
-                       f, err = fst.openForWrite(&fst.fts.files[i])
+                       f, err = fst.openForWrite(fst.fts.file(i))
                        if err != nil {
                                return false
                        }
index ad5a6b2a9d5aca53f8af678867ae34aae6133810..f59057d5cd3feb08b2138685b8b99fc1a85eb858 100644 (file)
@@ -15,10 +15,11 @@ import (
 )
 
 type fileTorrentImpl struct {
-       info           *metainfo.Info
-       files          []file
-       segmentLocater segments.Index
-       infoHash       metainfo.Hash
+       info              *metainfo.Info
+       files             []fileExtra
+       metainfoFileInfos []metainfo.FileInfo
+       segmentLocater    segments.Index
+       infoHash          metainfo.Hash
        // Save memory by pointing to the other data.
        client *fileClientImpl
 }
@@ -45,18 +46,19 @@ func (fts *fileTorrentImpl) setPieceCompletion(p int, complete bool) error {
 // Set piece completions based on whether all files in each piece are not .part files.
 func (fts *fileTorrentImpl) setCompletionFromPartFiles() error {
        notComplete := make([]bool, fts.info.NumPieces())
-       for _, f := range fts.files {
+       for fileIndex := range fts.files {
+               f := fts.file(fileIndex)
                fi, err := os.Stat(f.safeOsPath)
                if err == nil {
-                       if fi.Size() == f.length {
+                       if fi.Size() == f.length() {
                                continue
                        }
                        fts.logger().Warn("file has unexpected size", "file", f.safeOsPath, "size", fi.Size(), "expected", f.length)
                } else if !errors.Is(err, fs.ErrNotExist) {
                        fts.logger().Warn("error checking file size", "err", err)
                }
-               for i := f.beginPieceIndex; i < f.endPieceIndex; i++ {
-                       notComplete[i] = true
+               for pieceIndex := f.beginPieceIndex(); pieceIndex < f.endPieceIndex(); pieceIndex++ {
+                       notComplete[pieceIndex] = true
                }
        }
        for i, nc := range notComplete {
@@ -107,11 +109,19 @@ func (fs *fileTorrentImpl) Close() error {
 
 func (fts *fileTorrentImpl) Flush() error {
        for i := range fts.files {
-               f := &fts.files[i]
+               f := fts.file(i)
                fts.logger().Debug("flushing", "file.safeOsPath", f.safeOsPath)
-               if err := fsync(fts.pathForWrite(f)); err != nil {
+               if err := fsync(fts.pathForWrite(&f)); err != nil {
                        return err
                }
        }
        return nil
 }
+
+func (fts *fileTorrentImpl) file(index int) file {
+       return file{
+               Info:      fts.info,
+               FileInfo:  &fts.metainfoFileInfos[index],
+               fileExtra: &fts.files[index],
+       }
+}