From: Matt Joiner Date: Mon, 14 Jul 2025 05:34:15 +0000 (+1000) Subject: Set pieces incomplete when files go missing or are truncated X-Git-Tag: v1.59.0~25 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=719ff67e68d680f7ad948adc17dde7816618f8e3;p=btrtrc.git Set pieces incomplete when files go missing or are truncated --- diff --git a/metainfo/piece-key.go b/metainfo/piece-key.go index 6ddf065c..f4bedca5 100644 --- a/metainfo/piece-key.go +++ b/metainfo/piece-key.go @@ -3,5 +3,5 @@ package metainfo // Uniquely identifies a piece. type PieceKey struct { InfoHash Hash - Index pieceIndex + Index PieceIndex } diff --git a/metainfo/piece.go b/metainfo/piece.go index 2adfa3be..4c79ddc3 100644 --- a/metainfo/piece.go +++ b/metainfo/piece.go @@ -9,14 +9,14 @@ import ( type Piece struct { Info *Info // Can we embed the fields here instead, or is it something to do with saving memory? - i pieceIndex + i PieceIndex } func (p Piece) String() string { return fmt.Sprintf("metainfo.Piece(Info.Name=%q, i=%v)", p.Info.Name, p.i) } -type pieceIndex = int +type PieceIndex = int func (p Piece) Length() int64 { if p.Info.HasV2() { diff --git a/storage/file-client.go b/storage/file-client.go index a912fda2..f4ffb037 100644 --- a/storage/file-client.go +++ b/storage/file-client.go @@ -78,8 +78,9 @@ func (fs *fileClientImpl) OpenTorrent( dir := fs.opts.TorrentDirMaker(fs.opts.ClientBaseDir, info, infoHash) logger := log.ContextLogger(ctx).Slogger() logger.DebugContext(ctx, "opened file torrent storage", slog.String("dir", dir)) - var files []file - for i, fileInfo := range enumIter(info.UpvertedFilesIter()) { + metainfoFileInfos := info.UpvertedFiles() + files := make([]fileExtra, len(metainfoFileInfos)) + for i, fileInfo := range metainfoFileInfos { filePath := filepath.Join(dir, fs.opts.FilePathMaker(FilePathMakerOpts{ Info: info, File: &fileInfo, @@ -88,24 +89,19 @@ func (fs *fileClientImpl) OpenTorrent( err = fmt.Errorf("file %v: path %q is not sub path of %q", i, filePath, dir) return } - f := file{ - safeOsPath: filePath, - length: fileInfo.Length, - beginPieceIndex: fileInfo.BeginPieceIndex(info.PieceLength), - endPieceIndex: fileInfo.EndPieceIndex(info.PieceLength), - } - if f.length == 0 { - err = CreateNativeZeroLengthFile(f.safeOsPath) + files[i].safeOsPath = filePath + if metainfoFileInfos[i].Length == 0 { + err = CreateNativeZeroLengthFile(filePath) if err != nil { err = fmt.Errorf("creating zero length file: %w", err) return } } - files = append(files, f) } t := &fileTorrentImpl{ info, files, + metainfoFileInfos, info.FileSegmentsIndex(), infoHash, fs, diff --git a/storage/file-misc.go b/storage/file-misc.go index 07e98e92..27e0c126 100644 --- a/storage/file-misc.go +++ b/storage/file-misc.go @@ -8,6 +8,7 @@ import ( "path/filepath" "sync" + "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/segments" ) @@ -51,18 +52,41 @@ func CreateNativeZeroLengthFile(name string) error { return f.Close() } +// Combines data from different locations required to handle files in file storage. type file struct { + // Required for piece length. + *metainfo.Info + // Enumerated when info is provided. + *metainfo.FileInfo + *fileExtra +} + +func (f *file) beginPieceIndex() int { + return f.FileInfo.BeginPieceIndex(f.Info.PieceLength) +} + +func (f *file) endPieceIndex() int { + return f.FileInfo.EndPieceIndex(f.Info.PieceLength) +} + +func (f *file) length() int64 { + return f.FileInfo.Length +} + +func (f *file) torrentOffset() int64 { + return f.FileInfo.TorrentOffset +} + +// Extra state in the file storage for each file. +type fileExtra struct { // This protects high level OS file state like partial file name, permission mod, renaming etc. mu sync.RWMutex // The safe, OS-local file path. - safeOsPath string - beginPieceIndex int - endPieceIndex int - length int64 + safeOsPath string // Utility value to help the race detector find issues for us. race byte } -func (f *file) partFilePath() string { +func (f *fileExtra) partFilePath() string { return f.safeOsPath + ".part" } diff --git a/storage/file-piece.go b/storage/file-piece.go index 07257a39..f6c9de96 100644 --- a/storage/file-piece.go +++ b/storage/file-piece.go @@ -16,7 +16,7 @@ import ( "github.com/anacrolix/torrent/segments" ) -// Piece within File storage. +// Piece within File storage. This is created on demand. type filePieceImpl struct { t *fileTorrentImpl p metainfo.Piece @@ -44,10 +44,11 @@ func (me *filePieceImpl) extent() segments.Extent { } } -func (me *filePieceImpl) pieceFiles() iter.Seq2[int, *file] { - return func(yield func(int, *file) bool) { +func (me *filePieceImpl) pieceFiles() iter.Seq[file] { + return func(yield func(file) bool) { for fileIndex := range me.t.segmentLocater.LocateIter(me.extent()) { - if !yield(fileIndex, &me.t.files[fileIndex]) { + f := me.t.file(fileIndex) + if !yield(f) { return } } @@ -58,61 +59,93 @@ func (me *filePieceImpl) pieceCompletion() PieceCompletion { return me.t.pieceCompletion() } -func (me *filePieceImpl) Completion() Completion { - c := me.t.getCompletion(me.p.Index()) +func (me *filePieceImpl) Completion() (c Completion) { + c = me.t.getCompletion(me.p.Index()) if !c.Ok || c.Err != nil { return c } - verified := true if c.Complete { + c = me.checkCompleteFileSizes() + } + return +} + +func (me *filePieceImpl) iterFileSegments() iter.Seq2[int, segments.Extent] { + return func(yield func(int, segments.Extent) bool) { noFiles := true - // If it's allegedly complete, check that its constituent files have the necessary length. for i, extent := range me.t.segmentLocater.LocateIter(me.extent()) { noFiles = false - file := &me.t.files[i] - file.mu.RLock() - s, err := os.Stat(file.safeOsPath) - if me.partFiles() && errors.Is(err, fs.ErrNotExist) { - // Can we use shared files for this? Is it faster? - s, err = os.Stat(file.partFilePath()) + if !yield(i, extent) { + return } - file.mu.RUnlock() - if err != nil { + } + if noFiles { + panic("files do not cover piece extent") + } + } +} + +// If a piece is complete, check consituent files have the minimum required sizes. +func (me *filePieceImpl) checkCompleteFileSizes() (c Completion) { + c.Complete = true + c.Ok = true + for i, extent := range me.iterFileSegments() { + file := me.t.file(i) + file.mu.RLock() + s, err := os.Stat(file.safeOsPath) + if me.partFiles() && errors.Is(err, fs.ErrNotExist) { + // Can we use shared files for this? Is it faster? + s, err = os.Stat(file.partFilePath()) + } + file.mu.RUnlock() + if err != nil { + if errors.Is(err, fs.ErrNotExist) { me.logger().Warn( - "error checking file for piece marked as complete", - "piece", me.p, + "error checking file size for piece marked as complete", "file", file.safeOsPath, + "piece", me.p.Index(), "err", err) - } else if s.Size() < extent.End() { - me.logger().Error( - "file too small for piece marked as complete", - "piece", me.p, - "file", file.safeOsPath, - "size", s.Size(), - "extent", extent) - } else { - continue + c.Complete = false + me.markIncompletePieces(&file, 0) + return } - verified = false - break + c.Err = fmt.Errorf("checking file %v: %w", file.safeOsPath, err) + c.Complete = false + return } - // This probably belongs in a wrapper helper of some kind. I will retain the logic for now. - if noFiles { - panic("files do not cover piece extent") + if s.Size() < extent.End() { + me.logger().Warn( + "file too small for piece marked as complete", + "piece", me.p.Index(), + "file", file.safeOsPath, + "size", s.Size(), + "extent", extent) + me.markIncompletePieces(&file, s.Size()) + c.Complete = false + return } } + return +} - if !verified { - // The completion was wrong, fix it. TODO: Fix all other affected pieces too so we don't - // spam log messages, or record that the file is known to be bad until it comes good again. - err := me.MarkNotComplete() +func (me *filePieceImpl) markIncompletePieces(file *file, size int64) { + if size >= file.length() { + return + } + pieceLength := me.t.info.PieceLength + begin := metainfo.PieceIndex((file.torrentOffset() + size) / pieceLength) + end := metainfo.PieceIndex((file.torrentOffset() + file.length() + pieceLength - 1) / pieceLength) + for p := begin; p < end; p++ { + key := metainfo.PieceKey{ + InfoHash: me.t.infoHash, + Index: p, + } + err := me.pieceCompletion().Set(key, false) if err != nil { - c.Err = fmt.Errorf("error marking piece not complete: %w", err) + me.logger().Error("error marking piece not complete", "piece", p, "err", err) + return } - c.Complete = false } - - return c } func (me *filePieceImpl) MarkComplete() (err error) { @@ -120,7 +153,7 @@ func (me *filePieceImpl) MarkComplete() (err error) { if err != nil { return } - for _, f := range me.pieceFiles() { + for f := range me.pieceFiles() { res := me.allFilePiecesComplete(f) if res.Err != nil { err = res.Err @@ -138,15 +171,15 @@ func (me *filePieceImpl) MarkComplete() (err error) { return } -func (me *filePieceImpl) allFilePiecesComplete(f *file) (ret g.Result[bool]) { +func (me *filePieceImpl) allFilePiecesComplete(f file) (ret g.Result[bool]) { next, stop := iter.Pull(GetPieceCompletionRange( me.t.pieceCompletion(), me.t.infoHash, - f.beginPieceIndex, - f.endPieceIndex, + f.beginPieceIndex(), + f.endPieceIndex(), )) defer stop() - for p := f.beginPieceIndex; p < f.endPieceIndex; p++ { + for p := f.beginPieceIndex(); p < f.endPieceIndex(); p++ { cmpl, ok := next() panicif.False(ok) if cmpl.Err != nil { @@ -168,8 +201,7 @@ func (me *filePieceImpl) MarkNotComplete() (err error) { if err != nil { return } - for i, f := range me.pieceFiles() { - _ = i + for f := range me.pieceFiles() { err = me.onFileNotComplete(f) if err != nil { err = fmt.Errorf("preparing incomplete file %q: %w", f.safeOsPath, err) @@ -180,7 +212,7 @@ func (me *filePieceImpl) MarkNotComplete() (err error) { } -func (me *filePieceImpl) promotePartFile(f *file) (err error) { +func (me *filePieceImpl) promotePartFile(f file) (err error) { f.mu.Lock() defer f.mu.Unlock() f.race++ @@ -247,7 +279,7 @@ func (me *filePieceImpl) exclRenameIfExists(from, to string) error { return nil } -func (me *filePieceImpl) onFileNotComplete(f *file) (err error) { +func (me *filePieceImpl) onFileNotComplete(f file) (err error) { f.mu.Lock() defer f.mu.Unlock() f.race++ @@ -258,7 +290,7 @@ func (me *filePieceImpl) onFileNotComplete(f *file) (err error) { return } } - info, err := os.Stat(me.pathForWrite(f)) + info, err := os.Stat(me.pathForWrite(&f)) if errors.Is(err, fs.ErrNotExist) { return nil } @@ -267,7 +299,7 @@ func (me *filePieceImpl) onFileNotComplete(f *file) (err error) { return } // Ensure the file is writable - err = os.Chmod(me.pathForWrite(f), info.Mode().Perm()|(filePerm&0o222)) + err = os.Chmod(me.pathForWrite(&f), info.Mode().Perm()|(filePerm&0o222)) if err != nil { err = fmt.Errorf("setting file writable: %w", err) return diff --git a/storage/file-torrent-io.go b/storage/file-torrent-io.go index b7fdb70d..107f28e9 100644 --- a/storage/file-torrent-io.go +++ b/storage/file-torrent-io.go @@ -16,7 +16,7 @@ type fileTorrentImplIO struct { } // Returns EOF on short or missing file. -func (fst fileTorrentImplIO) readFileAt(file *file, b []byte, off int64) (n int, err error) { +func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) { fst.fts.logger().Debug("readFileAt", "file.safeOsPath", file.safeOsPath) var f sharedFileIf file.mu.RLock() @@ -40,10 +40,10 @@ func (fst fileTorrentImplIO) readFileAt(file *file, b []byte, off int64) (n int, } defer f.Close() // Limit the read to within the expected bounds of this file. - if int64(len(b)) > file.length-off { - b = b[:file.length-off] + if int64(len(b)) > file.length()-off { + b = b[:file.length()-off] } - for off < file.length && len(b) != 0 { + for off < file.length() && len(b) != 0 { n1, err1 := f.ReadAt(b, off) b = b[n1:] n += n1 @@ -59,7 +59,7 @@ func (fst fileTorrentImplIO) readFileAt(file *file, b []byte, off int64) (n int, // Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF. func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) { fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(b))}, func(i int, e segments.Extent) bool { - n1, err1 := fst.readFileAt(&fst.fts.files[i], b[:e.Length], e.Start) + n1, err1 := fst.readFileAt(fst.fts.file(i), b[:e.Length], e.Start) n += n1 b = b[n1:] err = err1 @@ -71,10 +71,10 @@ func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) { return } -func (fst fileTorrentImplIO) openForWrite(file *file) (f *os.File, err error) { +func (fst fileTorrentImplIO) openForWrite(file file) (f *os.File, err error) { // It might be possible to have a writable handle shared files cache if we need it. fst.fts.logger().Debug("openForWrite", "file.safeOsPath", file.safeOsPath) - p := fst.fts.pathForWrite(file) + p := fst.fts.pathForWrite(&file) f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm) if err == nil { return @@ -100,7 +100,7 @@ func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) { segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool { var f *os.File - f, err = fst.openForWrite(&fst.fts.files[i]) + f, err = fst.openForWrite(fst.fts.file(i)) if err != nil { return false } diff --git a/storage/file-torrent.go b/storage/file-torrent.go index ad5a6b2a..f59057d5 100644 --- a/storage/file-torrent.go +++ b/storage/file-torrent.go @@ -15,10 +15,11 @@ import ( ) type fileTorrentImpl struct { - info *metainfo.Info - files []file - segmentLocater segments.Index - infoHash metainfo.Hash + info *metainfo.Info + files []fileExtra + metainfoFileInfos []metainfo.FileInfo + segmentLocater segments.Index + infoHash metainfo.Hash // Save memory by pointing to the other data. client *fileClientImpl } @@ -45,18 +46,19 @@ func (fts *fileTorrentImpl) setPieceCompletion(p int, complete bool) error { // Set piece completions based on whether all files in each piece are not .part files. func (fts *fileTorrentImpl) setCompletionFromPartFiles() error { notComplete := make([]bool, fts.info.NumPieces()) - for _, f := range fts.files { + for fileIndex := range fts.files { + f := fts.file(fileIndex) fi, err := os.Stat(f.safeOsPath) if err == nil { - if fi.Size() == f.length { + if fi.Size() == f.length() { continue } fts.logger().Warn("file has unexpected size", "file", f.safeOsPath, "size", fi.Size(), "expected", f.length) } else if !errors.Is(err, fs.ErrNotExist) { fts.logger().Warn("error checking file size", "err", err) } - for i := f.beginPieceIndex; i < f.endPieceIndex; i++ { - notComplete[i] = true + for pieceIndex := f.beginPieceIndex(); pieceIndex < f.endPieceIndex(); pieceIndex++ { + notComplete[pieceIndex] = true } } for i, nc := range notComplete { @@ -107,11 +109,19 @@ func (fs *fileTorrentImpl) Close() error { func (fts *fileTorrentImpl) Flush() error { for i := range fts.files { - f := &fts.files[i] + f := fts.file(i) fts.logger().Debug("flushing", "file.safeOsPath", f.safeOsPath) - if err := fsync(fts.pathForWrite(f)); err != nil { + if err := fsync(fts.pathForWrite(&f)); err != nil { return err } } return nil } + +func (fts *fileTorrentImpl) file(index int) file { + return file{ + Info: fts.info, + FileInfo: &fts.metainfoFileInfos[index], + fileExtra: &fts.files[index], + } +}