From f71b63748af7737866058f8d4d5cc10df92f828f Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 25 Apr 2025 16:36:53 +1000 Subject: [PATCH] Start implementing part file support for file storage --- file.go | 10 +---- metainfo/file-tree.go | 1 + metainfo/fileinfo.go | 14 +++++++ metainfo/info.go | 2 + segments/index.go | 23 ++++++++++ segments/segments.go | 6 +-- storage/file-piece.go | 84 ++++++++++++++++++++++++++----------- storage/file.go | 76 ++++++++++++++++++++++++++++----- storage/piece-completion.go | 2 + torrent.go | 2 +- 10 files changed, 172 insertions(+), 48 deletions(-) diff --git a/file.go b/file.go index 2ff47442..26c38123 100644 --- a/file.go +++ b/file.go @@ -207,18 +207,12 @@ func (f *File) Priority() (prio PiecePriority) { // Returns the index of the first piece containing data for the file. func (f *File) BeginPieceIndex() int { - if f.t.usualPieceSize() == 0 { - return 0 - } - return pieceIndex(f.offset / int64(f.t.usualPieceSize())) + return f.fi.BeginPieceIndex(int64(f.t.usualPieceSize())) } // Returns the index of the piece after the last one containing data for the file. func (f *File) EndPieceIndex() int { - if f.t.usualPieceSize() == 0 { - return 0 - } - return pieceIndex((f.offset + f.length + int64(f.t.usualPieceSize()) - 1) / int64(f.t.usualPieceSize())) + return f.fi.EndPieceIndex(int64(f.t.usualPieceSize())) } func (f *File) numPieces() int { diff --git a/metainfo/file-tree.go b/metainfo/file-tree.go index e04867d0..08039e6d 100644 --- a/metainfo/file-tree.go +++ b/metainfo/file-tree.go @@ -127,6 +127,7 @@ func (ft *FileTree) upvertedFilesInner( PiecesRoot: ft.PiecesRootAsByteArray(), TorrentOffset: *offset, }) + // v2 files are piece aligned. This bumps up the offset to the next piece boundary. *offset += (ft.File.Length + pieceLength - 1) / pieceLength * pieceLength } diff --git a/metainfo/fileinfo.go b/metainfo/fileinfo.go index cb9c5be8..867a56d0 100644 --- a/metainfo/fileinfo.go +++ b/metainfo/fileinfo.go @@ -38,3 +38,17 @@ func (fi *FileInfo) BestPath() []string { } return fi.Path } + +func (fi *FileInfo) BeginPieceIndex(pieceLength int64) int { + if pieceLength == 0 { + return 0 + } + return int(fi.TorrentOffset / pieceLength) +} + +func (fi *FileInfo) EndPieceIndex(pieceLength int64) int { + if pieceLength == 0 { + return 0 + } + return int((fi.TorrentOffset + fi.Length + pieceLength - 1) / pieceLength) +} diff --git a/metainfo/info.go b/metainfo/info.go index e6c29530..6fc46dbb 100644 --- a/metainfo/info.go +++ b/metainfo/info.go @@ -227,3 +227,5 @@ func (info *Info) FileSegmentsIndex() segments.Index { } })) } + +// TODO: Add NumFiles helper? diff --git a/segments/index.go b/segments/index.go index 577ec90f..73dca69a 100644 --- a/segments/index.go +++ b/segments/index.go @@ -1,6 +1,7 @@ package segments import ( + "iter" "sort" g "github.com/anacrolix/generics" @@ -61,3 +62,25 @@ func (me Index) Locate(e Extent, output Callback) bool { return output(i+first, e) }) } + +func (me Index) LocateIter(e Extent) iter.Seq2[int, Extent] { + return func(yield func(int, Extent) bool) { + first := sort.Search(len(me.segments), func(i int) bool { + _e := me.segments[i] + return _e.End() > e.Start + }) + if first == len(me.segments) { + return + } + e.Start -= me.segments[first].Start + // The extent is before the first segment. + if e.Start < 0 { + e.Length += e.Start + e.Start = 0 + } + me.segments = me.segments[first:] + ScanConsecutive(me.iterSegments(), e, func(i int, e Extent) bool { + return yield(i+first, e) + }) + } +} diff --git a/segments/segments.go b/segments/segments.go index 35e7e4c8..83f1ea5c 100644 --- a/segments/segments.go +++ b/segments/segments.go @@ -32,11 +32,11 @@ func Scan(haystack LengthIter, needle Extent, callback Callback) bool { } // Returns true if callback returns false early, or all segments in the haystack for the needle are -// found. +// found. TODO: Does this handle discontiguous extents? func ScanConsecutive(haystack ConsecutiveExtentIter, needle Extent, callback Callback) bool { i := 0 - // Extents have been found in the haystack and we're waiting for the needle to end. This is kind - // of for backwards compatibility for some tests that expect to have zero-length extents. + // Extents have been found in the haystack, and we're waiting for the needle to end. This is + // kind of for backwards compatibility for some tests that expect to have zero-length extents. startedNeedle := false for needle.Length != 0 { l, ok := haystack() diff --git a/storage/file-piece.go b/storage/file-piece.go index 98fb806b..108911a8 100644 --- a/storage/file-piece.go +++ b/storage/file-piece.go @@ -1,8 +1,9 @@ package storage import ( + "fmt" "io" - "log" + "iter" "os" "github.com/anacrolix/torrent/metainfo" @@ -10,7 +11,7 @@ import ( ) type filePieceImpl struct { - *fileTorrentImpl + t *fileTorrentImpl p metainfo.Piece io.WriterAt io.ReaderAt @@ -19,49 +20,82 @@ type filePieceImpl struct { var _ PieceImpl = (*filePieceImpl)(nil) func (me *filePieceImpl) pieceKey() metainfo.PieceKey { - return metainfo.PieceKey{me.infoHash, me.p.Index()} + return metainfo.PieceKey{me.t.infoHash, me.p.Index()} +} + +func (fs *filePieceImpl) extent() segments.Extent { + return segments.Extent{ + Start: fs.p.Offset(), + Length: fs.p.Length(), + } +} + +func (fs *filePieceImpl) pieceFiles() iter.Seq2[int, file] { + return func(yield func(int, file) bool) { + for fileIndex := range fs.t.segmentLocater.LocateIter(fs.extent()) { + if !yield(fileIndex, fs.t.files[fileIndex]) { + return + } + } + } } func (fs *filePieceImpl) Completion() Completion { - c, err := fs.completion.Get(fs.pieceKey()) - if err != nil { - log.Printf("error getting piece completion: %s", err) - c.Ok = false + c := fs.t.getCompletion(fs.p.Index()) + if !c.Ok { return c } - verified := true if c.Complete { // If it's allegedly complete, check that its constituent files have the necessary length. - if !fs.segmentLocater.Locate(segments.Extent{ - Start: fs.p.Offset(), - Length: fs.p.Length(), - }, func(i int, extent segments.Extent) bool { - file := fs.files[i] - s, err := os.Stat(file.path) - if err != nil || s.Size() < extent.Start+extent.Length { - verified = false - return false - } - return true - }) { + if !fs.t.segmentLocater.Locate( + fs.extent(), + func(i int, extent segments.Extent) bool { + file := fs.t.files[i] + s, err := os.Stat(file.safeOsPath) + if err != nil || s.Size() < extent.Start+extent.Length { + verified = false + return false + } + return true + }) { panic("files do not cover piece extent") } } if !verified { - // The completion was wrong, fix it. + // The completion was wrong, fix it. TODO: Should we use MarkNotComplete? c.Complete = false - fs.completion.Set(fs.pieceKey(), false) + fs.t.completion.Set(fs.pieceKey(), false) } return c } -func (fs *filePieceImpl) MarkComplete() error { - return fs.completion.Set(fs.pieceKey(), true) +func (fs *filePieceImpl) MarkComplete() (err error) { + err = fs.t.completion.Set(fs.pieceKey(), true) + if err != nil { + return + } +nextFile: + for i, f := range fs.pieceFiles() { + for p := f.beginPieceIndex; p < f.endPieceIndex; p++ { + _ = i + //fmt.Printf("%v %#v %v\n", i, f, p) + cmpl := fs.t.getCompletion(p) + if !cmpl.Ok || !cmpl.Complete { + continue nextFile + } + } + err = fs.t.promotePartFile(f) + if err != nil { + err = fmt.Errorf("error promoting part file %q: %w", f.safeOsPath, err) + return + } + } + return } func (fs *filePieceImpl) MarkNotComplete() error { - return fs.completion.Set(fs.pieceKey(), false) + return fs.t.completion.Set(fs.pieceKey(), false) } diff --git a/storage/file.go b/storage/file.go index aa52034c..2cacb1ac 100644 --- a/storage/file.go +++ b/storage/file.go @@ -2,12 +2,15 @@ package storage import ( "context" + "errors" "fmt" "io" + "iter" "log/slog" "os" "path/filepath" + g "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2" @@ -31,6 +34,7 @@ type NewFileClientOpts struct { FilePathMaker FilePathMaker TorrentDirMaker TorrentDirFilePathMaker PieceCompletion PieceCompletion + UsePartFiles g.Option[bool] } // NewFileOpts creates a new ClientImplCloser that stores files using the OS native filesystem. @@ -57,6 +61,18 @@ func (me fileClientImpl) Close() error { return me.opts.PieceCompletion.Close() } +func enumIter[T any](i iter.Seq[T]) iter.Seq2[int, T] { + return func(yield func(int, T) bool) { + j := 0 + for t := range i { + if !yield(j, t) { + return + } + j++ + } + } +} + func (fs fileClientImpl) OpenTorrent( ctx context.Context, info *metainfo.Info, @@ -65,9 +81,8 @@ func (fs fileClientImpl) OpenTorrent( dir := fs.opts.TorrentDirMaker(fs.opts.ClientBaseDir, info, infoHash) logger := log.ContextLogger(ctx).Slogger() logger.DebugContext(ctx, "opened file torrent storage", slog.String("dir", dir)) - upvertedFiles := info.UpvertedFiles() - files := make([]file, 0, len(upvertedFiles)) - for i, fileInfo := range upvertedFiles { + var files []file + for i, fileInfo := range enumIter(info.UpvertedFilesIter()) { filePath := filepath.Join(dir, fs.opts.FilePathMaker(FilePathMakerOpts{ Info: info, File: &fileInfo, @@ -77,11 +92,13 @@ func (fs fileClientImpl) OpenTorrent( return } f := file{ - path: filePath, - length: fileInfo.Length, + safeOsPath: filePath, + length: fileInfo.Length, + beginPieceIndex: fileInfo.BeginPieceIndex(info.PieceLength), + endPieceIndex: fileInfo.EndPieceIndex(info.PieceLength), } if f.length == 0 { - err = CreateNativeZeroLengthFile(f.path) + err = CreateNativeZeroLengthFile(f.safeOsPath) if err != nil { err = fmt.Errorf("creating zero length file: %w", err) return @@ -90,10 +107,12 @@ func (fs fileClientImpl) OpenTorrent( files = append(files, f) } t := &fileTorrentImpl{ + info, files, info.FileSegmentsIndex(), infoHash, fs.opts.PieceCompletion, + fs.opts.UsePartFiles.UnwrapOr(true), } return TorrentImpl{ Piece: t.Piece, @@ -104,15 +123,50 @@ func (fs fileClientImpl) OpenTorrent( type file struct { // The safe, OS-local file path. - path string - length int64 + safeOsPath string + beginPieceIndex int + endPieceIndex int + length int64 } type fileTorrentImpl struct { + info *metainfo.Info files []file segmentLocater segments.Index infoHash metainfo.Hash completion PieceCompletion + partFiles bool +} + +func (fts *fileTorrentImpl) promotePartFile(f file) (err error) { + //fmt.Printf("promoting %q\n", f.safeOsPath) + if fts.partFiles { + err = os.Rename(f.safeOsPath+".part", f.safeOsPath) + if err != nil { + err = fmt.Errorf("renaming part file: %w", err) + return + } + } + info, err := os.Stat(f.safeOsPath) + if err != nil { + err = fmt.Errorf("statting file: %w", err) + return + } + // Clear writability for the file. + err = os.Chmod(f.safeOsPath, info.Mode().Perm()&^0o222) + if err != nil { + err = fmt.Errorf("setting file to read-only: %w", err) + return + } + return +} + +func (fts *fileTorrentImpl) getCompletion(piece int) Completion { + cmpl, err := fts.completion.Get(metainfo.PieceKey{ + fts.infoHash, piece, + }) + cmpl.Err = errors.Join(cmpl.Err, err) + return cmpl } func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl { @@ -147,7 +201,7 @@ func fsync(filePath string) (err error) { func (fts *fileTorrentImpl) Flush() error { for _, f := range fts.files { - if err := fsync(f.path); err != nil { + if err := fsync(f.safeOsPath); err != nil { return err } } @@ -174,7 +228,7 @@ type fileTorrentImplIO struct { // Returns EOF on short or missing file. func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) { - f, err := os.Open(file.path) + f, err := os.Open(file.safeOsPath) if os.IsNotExist(err) { // File missing is treated the same as a short file. err = io.EOF @@ -219,7 +273,7 @@ func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) { func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) { // log.Printf("write at %v: %v bytes", off, len(p)) fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool { - name := fst.fts.files[i].path + name := fst.fts.files[i].safeOsPath os.MkdirAll(filepath.Dir(name), 0o777) var f *os.File f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666) diff --git a/storage/piece-completion.go b/storage/piece-completion.go index 1260aba0..bf6dd41b 100644 --- a/storage/piece-completion.go +++ b/storage/piece-completion.go @@ -9,6 +9,8 @@ import ( ) type PieceCompletionGetSetter interface { + // I think the extra error parameter is vestigial. Looks like you should put your error in + // Completion.Err. Get(metainfo.PieceKey) (Completion, error) Set(_ metainfo.PieceKey, complete bool) error } diff --git a/torrent.go b/torrent.go index da83d107..f6dff13a 100644 --- a/torrent.go +++ b/torrent.go @@ -2485,7 +2485,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { } err := p.Storage().MarkComplete() if err != nil { - t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err) + t.logger.Levelf(log.Error, "%T: error marking piece complete %d: %s", t.storage, piece, err) } t.cl.lock() -- 2.51.0