]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Start implementing part file support for file storage
authorMatt Joiner <anacrolix@gmail.com>
Fri, 25 Apr 2025 06:36:53 +0000 (16:36 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 28 Apr 2025 01:38:07 +0000 (11:38 +1000)
file.go
metainfo/file-tree.go
metainfo/fileinfo.go
metainfo/info.go
segments/index.go
segments/segments.go
storage/file-piece.go
storage/file.go
storage/piece-completion.go
torrent.go

diff --git a/file.go b/file.go
index 2ff474426d62ac1b3bf8a72616ad7a8ee7cce04b..26c3812319d987b2db30eec94a6fa6d6f6eefba6 100644 (file)
--- a/file.go
+++ b/file.go
@@ -207,18 +207,12 @@ func (f *File) Priority() (prio PiecePriority) {
 
 // Returns the index of the first piece containing data for the file.
 func (f *File) BeginPieceIndex() int {
-       if f.t.usualPieceSize() == 0 {
-               return 0
-       }
-       return pieceIndex(f.offset / int64(f.t.usualPieceSize()))
+       return f.fi.BeginPieceIndex(int64(f.t.usualPieceSize()))
 }
 
 // Returns the index of the piece after the last one containing data for the file.
 func (f *File) EndPieceIndex() int {
-       if f.t.usualPieceSize() == 0 {
-               return 0
-       }
-       return pieceIndex((f.offset + f.length + int64(f.t.usualPieceSize()) - 1) / int64(f.t.usualPieceSize()))
+       return f.fi.EndPieceIndex(int64(f.t.usualPieceSize()))
 }
 
 func (f *File) numPieces() int {
index e04867d0e4dedefaa14ff26d8595fd204e6b5e27..08039e6dd1970170269fdaafab20c9503c8bbbdf 100644 (file)
@@ -127,6 +127,7 @@ func (ft *FileTree) upvertedFilesInner(
                                PiecesRoot:    ft.PiecesRootAsByteArray(),
                                TorrentOffset: *offset,
                        })
+                       // v2 files are piece aligned. This bumps up the offset to the next piece boundary.
                        *offset += (ft.File.Length + pieceLength - 1) / pieceLength * pieceLength
                }
 
index cb9c5be84900d86e0845962df36134df04ccf37d..867a56d04d3cd0e94d5d5b38fa957d41d25e5804 100644 (file)
@@ -38,3 +38,17 @@ func (fi *FileInfo) BestPath() []string {
        }
        return fi.Path
 }
+
+func (fi *FileInfo) BeginPieceIndex(pieceLength int64) int {
+       if pieceLength == 0 {
+               return 0
+       }
+       return int(fi.TorrentOffset / pieceLength)
+}
+
+func (fi *FileInfo) EndPieceIndex(pieceLength int64) int {
+       if pieceLength == 0 {
+               return 0
+       }
+       return int((fi.TorrentOffset + fi.Length + pieceLength - 1) / pieceLength)
+}
index e6c29530f13290f8170a8d44523d2ff4490b567c..6fc46dbb2b39543edd4638d90373fef2c7626aa4 100644 (file)
@@ -227,3 +227,5 @@ func (info *Info) FileSegmentsIndex() segments.Index {
                }
        }))
 }
+
+// TODO: Add NumFiles helper?
index 577ec90f5de54df817480a433b6096549dc7efca..73dca69a0029224a4b6e5b9f36a541a5ba61112e 100644 (file)
@@ -1,6 +1,7 @@
 package segments
 
 import (
+       "iter"
        "sort"
 
        g "github.com/anacrolix/generics"
@@ -61,3 +62,25 @@ func (me Index) Locate(e Extent, output Callback) bool {
                return output(i+first, e)
        })
 }
+
+func (me Index) LocateIter(e Extent) iter.Seq2[int, Extent] {
+       return func(yield func(int, Extent) bool) {
+               first := sort.Search(len(me.segments), func(i int) bool {
+                       _e := me.segments[i]
+                       return _e.End() > e.Start
+               })
+               if first == len(me.segments) {
+                       return
+               }
+               e.Start -= me.segments[first].Start
+               // The extent is before the first segment.
+               if e.Start < 0 {
+                       e.Length += e.Start
+                       e.Start = 0
+               }
+               me.segments = me.segments[first:]
+               ScanConsecutive(me.iterSegments(), e, func(i int, e Extent) bool {
+                       return yield(i+first, e)
+               })
+       }
+}
index 35e7e4c8e51d497bebf25e09285f375fd4be01de..83f1ea5c18b15d91531484f6dea2a82029938717 100644 (file)
@@ -32,11 +32,11 @@ func Scan(haystack LengthIter, needle Extent, callback Callback) bool {
 }
 
 // Returns true if callback returns false early, or all segments in the haystack for the needle are
-// found.
+// found. TODO: Does this handle discontiguous extents?
 func ScanConsecutive(haystack ConsecutiveExtentIter, needle Extent, callback Callback) bool {
        i := 0
-       // Extents have been found in the haystack and we're waiting for the needle to end. This is kind
-       // of for backwards compatibility for some tests that expect to have zero-length extents.
+       // Extents have been found in the haystack, and we're waiting for the needle to end. This is
+       // kind of for backwards compatibility for some tests that expect to have zero-length extents.
        startedNeedle := false
        for needle.Length != 0 {
                l, ok := haystack()
index 98fb806b8d3ae8ff5317a4accc401ae55075d240..108911a8c4f75f6b28d60f7236e0e202348685b3 100644 (file)
@@ -1,8 +1,9 @@
 package storage
 
 import (
+       "fmt"
        "io"
-       "log"
+       "iter"
        "os"
 
        "github.com/anacrolix/torrent/metainfo"
@@ -10,7 +11,7 @@ import (
 )
 
 type filePieceImpl struct {
-       *fileTorrentImpl
+       *fileTorrentImpl
        p metainfo.Piece
        io.WriterAt
        io.ReaderAt
@@ -19,49 +20,82 @@ type filePieceImpl struct {
 var _ PieceImpl = (*filePieceImpl)(nil)
 
 func (me *filePieceImpl) pieceKey() metainfo.PieceKey {
-       return metainfo.PieceKey{me.infoHash, me.p.Index()}
+       return metainfo.PieceKey{me.t.infoHash, me.p.Index()}
+}
+
+func (fs *filePieceImpl) extent() segments.Extent {
+       return segments.Extent{
+               Start:  fs.p.Offset(),
+               Length: fs.p.Length(),
+       }
+}
+
+func (fs *filePieceImpl) pieceFiles() iter.Seq2[int, file] {
+       return func(yield func(int, file) bool) {
+               for fileIndex := range fs.t.segmentLocater.LocateIter(fs.extent()) {
+                       if !yield(fileIndex, fs.t.files[fileIndex]) {
+                               return
+                       }
+               }
+       }
 }
 
 func (fs *filePieceImpl) Completion() Completion {
-       c, err := fs.completion.Get(fs.pieceKey())
-       if err != nil {
-               log.Printf("error getting piece completion: %s", err)
-               c.Ok = false
+       c := fs.t.getCompletion(fs.p.Index())
+       if !c.Ok {
                return c
        }
-
        verified := true
        if c.Complete {
                // If it's allegedly complete, check that its constituent files have the necessary length.
-               if !fs.segmentLocater.Locate(segments.Extent{
-                       Start:  fs.p.Offset(),
-                       Length: fs.p.Length(),
-               }, func(i int, extent segments.Extent) bool {
-                       file := fs.files[i]
-                       s, err := os.Stat(file.path)
-                       if err != nil || s.Size() < extent.Start+extent.Length {
-                               verified = false
-                               return false
-                       }
-                       return true
-               }) {
+               if !fs.t.segmentLocater.Locate(
+                       fs.extent(),
+                       func(i int, extent segments.Extent) bool {
+                               file := fs.t.files[i]
+                               s, err := os.Stat(file.safeOsPath)
+                               if err != nil || s.Size() < extent.Start+extent.Length {
+                                       verified = false
+                                       return false
+                               }
+                               return true
+                       }) {
                        panic("files do not cover piece extent")
                }
        }
 
        if !verified {
-               // The completion was wrong, fix it.
+               // The completion was wrong, fix it. TODO: Should we use MarkNotComplete?
                c.Complete = false
-               fs.completion.Set(fs.pieceKey(), false)
+               fs.t.completion.Set(fs.pieceKey(), false)
        }
 
        return c
 }
 
-func (fs *filePieceImpl) MarkComplete() error {
-       return fs.completion.Set(fs.pieceKey(), true)
+func (fs *filePieceImpl) MarkComplete() (err error) {
+       err = fs.t.completion.Set(fs.pieceKey(), true)
+       if err != nil {
+               return
+       }
+nextFile:
+       for i, f := range fs.pieceFiles() {
+               for p := f.beginPieceIndex; p < f.endPieceIndex; p++ {
+                       _ = i
+                       //fmt.Printf("%v %#v %v\n", i, f, p)
+                       cmpl := fs.t.getCompletion(p)
+                       if !cmpl.Ok || !cmpl.Complete {
+                               continue nextFile
+                       }
+               }
+               err = fs.t.promotePartFile(f)
+               if err != nil {
+                       err = fmt.Errorf("error promoting part file %q: %w", f.safeOsPath, err)
+                       return
+               }
+       }
+       return
 }
 
 func (fs *filePieceImpl) MarkNotComplete() error {
-       return fs.completion.Set(fs.pieceKey(), false)
+       return fs.t.completion.Set(fs.pieceKey(), false)
 }
index aa52034ca4a33a66c197951da8beb33119bff5d0..2cacb1ac752328dd5114bafb30ee5ec73a0558bb 100644 (file)
@@ -2,12 +2,15 @@ package storage
 
 import (
        "context"
+       "errors"
        "fmt"
        "io"
+       "iter"
        "log/slog"
        "os"
        "path/filepath"
 
+       g "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2"
 
@@ -31,6 +34,7 @@ type NewFileClientOpts struct {
        FilePathMaker   FilePathMaker
        TorrentDirMaker TorrentDirFilePathMaker
        PieceCompletion PieceCompletion
+       UsePartFiles    g.Option[bool]
 }
 
 // NewFileOpts creates a new ClientImplCloser that stores files using the OS native filesystem.
@@ -57,6 +61,18 @@ func (me fileClientImpl) Close() error {
        return me.opts.PieceCompletion.Close()
 }
 
+func enumIter[T any](i iter.Seq[T]) iter.Seq2[int, T] {
+       return func(yield func(int, T) bool) {
+               j := 0
+               for t := range i {
+                       if !yield(j, t) {
+                               return
+                       }
+                       j++
+               }
+       }
+}
+
 func (fs fileClientImpl) OpenTorrent(
        ctx context.Context,
        info *metainfo.Info,
@@ -65,9 +81,8 @@ func (fs fileClientImpl) OpenTorrent(
        dir := fs.opts.TorrentDirMaker(fs.opts.ClientBaseDir, info, infoHash)
        logger := log.ContextLogger(ctx).Slogger()
        logger.DebugContext(ctx, "opened file torrent storage", slog.String("dir", dir))
-       upvertedFiles := info.UpvertedFiles()
-       files := make([]file, 0, len(upvertedFiles))
-       for i, fileInfo := range upvertedFiles {
+       var files []file
+       for i, fileInfo := range enumIter(info.UpvertedFilesIter()) {
                filePath := filepath.Join(dir, fs.opts.FilePathMaker(FilePathMakerOpts{
                        Info: info,
                        File: &fileInfo,
@@ -77,11 +92,13 @@ func (fs fileClientImpl) OpenTorrent(
                        return
                }
                f := file{
-                       path:   filePath,
-                       length: fileInfo.Length,
+                       safeOsPath:      filePath,
+                       length:          fileInfo.Length,
+                       beginPieceIndex: fileInfo.BeginPieceIndex(info.PieceLength),
+                       endPieceIndex:   fileInfo.EndPieceIndex(info.PieceLength),
                }
                if f.length == 0 {
-                       err = CreateNativeZeroLengthFile(f.path)
+                       err = CreateNativeZeroLengthFile(f.safeOsPath)
                        if err != nil {
                                err = fmt.Errorf("creating zero length file: %w", err)
                                return
@@ -90,10 +107,12 @@ func (fs fileClientImpl) OpenTorrent(
                files = append(files, f)
        }
        t := &fileTorrentImpl{
+               info,
                files,
                info.FileSegmentsIndex(),
                infoHash,
                fs.opts.PieceCompletion,
+               fs.opts.UsePartFiles.UnwrapOr(true),
        }
        return TorrentImpl{
                Piece: t.Piece,
@@ -104,15 +123,50 @@ func (fs fileClientImpl) OpenTorrent(
 
 type file struct {
        // The safe, OS-local file path.
-       path   string
-       length int64
+       safeOsPath      string
+       beginPieceIndex int
+       endPieceIndex   int
+       length          int64
 }
 
 type fileTorrentImpl struct {
+       info           *metainfo.Info
        files          []file
        segmentLocater segments.Index
        infoHash       metainfo.Hash
        completion     PieceCompletion
+       partFiles      bool
+}
+
+func (fts *fileTorrentImpl) promotePartFile(f file) (err error) {
+       //fmt.Printf("promoting %q\n", f.safeOsPath)
+       if fts.partFiles {
+               err = os.Rename(f.safeOsPath+".part", f.safeOsPath)
+               if err != nil {
+                       err = fmt.Errorf("renaming part file: %w", err)
+                       return
+               }
+       }
+       info, err := os.Stat(f.safeOsPath)
+       if err != nil {
+               err = fmt.Errorf("statting file: %w", err)
+               return
+       }
+       // Clear writability for the file.
+       err = os.Chmod(f.safeOsPath, info.Mode().Perm()&^0o222)
+       if err != nil {
+               err = fmt.Errorf("setting file to read-only: %w", err)
+               return
+       }
+       return
+}
+
+func (fts *fileTorrentImpl) getCompletion(piece int) Completion {
+       cmpl, err := fts.completion.Get(metainfo.PieceKey{
+               fts.infoHash, piece,
+       })
+       cmpl.Err = errors.Join(cmpl.Err, err)
+       return cmpl
 }
 
 func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
@@ -147,7 +201,7 @@ func fsync(filePath string) (err error) {
 
 func (fts *fileTorrentImpl) Flush() error {
        for _, f := range fts.files {
-               if err := fsync(f.path); err != nil {
+               if err := fsync(f.safeOsPath); err != nil {
                        return err
                }
        }
@@ -174,7 +228,7 @@ type fileTorrentImplIO struct {
 
 // Returns EOF on short or missing file.
 func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
-       f, err := os.Open(file.path)
+       f, err := os.Open(file.safeOsPath)
        if os.IsNotExist(err) {
                // File missing is treated the same as a short file.
                err = io.EOF
@@ -219,7 +273,7 @@ func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
 func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
        // log.Printf("write at %v: %v bytes", off, len(p))
        fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
-               name := fst.fts.files[i].path
+               name := fst.fts.files[i].safeOsPath
                os.MkdirAll(filepath.Dir(name), 0o777)
                var f *os.File
                f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666)
index 1260aba0b5c8ac9fcde886c42871a401b1c5a2e3..bf6dd41b1a6cf4f57361f0cf6e77a58e063819ad 100644 (file)
@@ -9,6 +9,8 @@ import (
 )
 
 type PieceCompletionGetSetter interface {
+       // I think the extra error parameter is vestigial. Looks like you should put your error in
+       // Completion.Err.
        Get(metainfo.PieceKey) (Completion, error)
        Set(_ metainfo.PieceKey, complete bool) error
 }
index da83d1077351f44e70de4684522ee463444c8a75..f6dff13ac698e634507e0c28ef4d2dfc6656bf3b 100644 (file)
@@ -2485,7 +2485,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
                }
                err := p.Storage().MarkComplete()
                if err != nil {
-                       t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err)
+                       t.logger.Levelf(log.Error, "%T: error marking piece complete %d: %s", t.storage, piece, err)
                }
                t.cl.lock()