From 8c2bedb3c835a3bf25ca521d2b57e7b103e40aac Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 23 May 2025 14:53:54 +1000 Subject: [PATCH] Rearrange file storage code Needed because complex refactors and optional interfaces get tricky to reason about without good organization. --- storage/file-client.go | 123 ++++++++++++ storage/file-misc.go | 43 +++++ storage/file-torrent-io.go | 113 +++++++++++ storage/file-torrent.go | 115 +++++++++++ storage/file.go | 381 ------------------------------------- storage/iter.go | 17 ++ 6 files changed, 411 insertions(+), 381 deletions(-) create mode 100644 storage/file-client.go create mode 100644 storage/file-torrent-io.go create mode 100644 storage/file-torrent.go delete mode 100644 storage/file.go create mode 100644 storage/iter.go diff --git a/storage/file-client.go b/storage/file-client.go new file mode 100644 index 00000000..406684c4 --- /dev/null +++ b/storage/file-client.go @@ -0,0 +1,123 @@ +package storage + +import ( + "context" + "fmt" + "log/slog" + "path/filepath" + + g "github.com/anacrolix/generics" + "github.com/anacrolix/log" + + "github.com/anacrolix/torrent/metainfo" +) + +// File-based storage for torrents, that isn't yet bound to a particular torrent. +type fileClientImpl struct { + opts NewFileClientOpts +} + +// All Torrent data stored in this baseDir. The info names of each torrent are used as directories. +func NewFile(baseDir string) ClientImplCloser { + return NewFileWithCompletion(baseDir, pieceCompletionForDir(baseDir)) +} + +type NewFileClientOpts struct { + // The base directory for all downloads. + ClientBaseDir string + FilePathMaker FilePathMaker + TorrentDirMaker TorrentDirFilePathMaker + PieceCompletion PieceCompletion + UsePartFiles g.Option[bool] + Logger *slog.Logger +} + +// The specific part-files option or the default. +func (me NewFileClientOpts) partFiles() bool { + return me.UsePartFiles.UnwrapOr(true) +} + +// NewFileOpts creates a new ClientImplCloser that stores files using the OS native filesystem. +func NewFileOpts(opts NewFileClientOpts) ClientImplCloser { + if opts.TorrentDirMaker == nil { + opts.TorrentDirMaker = defaultPathMaker + } + if opts.FilePathMaker == nil { + opts.FilePathMaker = func(opts FilePathMakerOpts) string { + var parts []string + if opts.Info.BestName() != metainfo.NoName { + parts = append(parts, opts.Info.BestName()) + } + return filepath.Join(append(parts, opts.File.BestPath()...)...) + } + } + if opts.PieceCompletion == nil { + if opts.partFiles() { + opts.PieceCompletion = NewMapPieceCompletion() + } else { + opts.PieceCompletion = pieceCompletionForDir(opts.ClientBaseDir) + } + } + if opts.Logger == nil { + opts.Logger = log.Default.Slogger() + } + return &fileClientImpl{opts} +} + +func (me *fileClientImpl) Close() error { + return me.opts.PieceCompletion.Close() +} + +func (fs *fileClientImpl) OpenTorrent( + ctx context.Context, + info *metainfo.Info, + infoHash metainfo.Hash, +) (_ TorrentImpl, err error) { + dir := fs.opts.TorrentDirMaker(fs.opts.ClientBaseDir, info, infoHash) + logger := log.ContextLogger(ctx).Slogger() + logger.DebugContext(ctx, "opened file torrent storage", slog.String("dir", dir)) + var files []file + for i, fileInfo := range enumIter(info.UpvertedFilesIter()) { + filePath := filepath.Join(dir, fs.opts.FilePathMaker(FilePathMakerOpts{ + Info: info, + File: &fileInfo, + })) + if !isSubFilepath(dir, filePath) { + err = fmt.Errorf("file %v: path %q is not sub path of %q", i, filePath, dir) + return + } + f := file{ + safeOsPath: filePath, + length: fileInfo.Length, + beginPieceIndex: fileInfo.BeginPieceIndex(info.PieceLength), + endPieceIndex: fileInfo.EndPieceIndex(info.PieceLength), + } + if f.length == 0 { + err = CreateNativeZeroLengthFile(f.safeOsPath) + if err != nil { + err = fmt.Errorf("creating zero length file: %w", err) + return + } + } + files = append(files, f) + } + t := &fileTorrentImpl{ + info, + files, + info.FileSegmentsIndex(), + infoHash, + fs, + } + if t.partFiles() { + err = t.setCompletionFromPartFiles() + if err != nil { + err = fmt.Errorf("setting completion from part files: %w", err) + return + } + } + return TorrentImpl{ + Piece: t.Piece, + Close: t.Close, + Flush: t.Flush, + }, nil +} diff --git a/storage/file-misc.go b/storage/file-misc.go index 67514420..c298e516 100644 --- a/storage/file-misc.go +++ b/storage/file-misc.go @@ -1,6 +1,12 @@ package storage import ( + "errors" + "io" + "io/fs" + "os" + "path/filepath" + "github.com/anacrolix/torrent/segments" ) @@ -18,3 +24,40 @@ func minFileLengthsForTorrentExtent( return each(fileIndex, segmentBounds.Start+segmentBounds.Length) }) } + +func fsync(filePath string) (err error) { + f, err := os.OpenFile(filePath, os.O_WRONLY, filePerm) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return + } + defer f.Close() + if err = f.Sync(); err != nil { + return err + } + return f.Close() +} + +// A helper to create zero-length files which won't appear for file-orientated storage since no +// writes will ever occur to them (no torrent data is associated with a zero-length file). The +// caller should make sure the file name provided is safe/sanitized. +func CreateNativeZeroLengthFile(name string) error { + os.MkdirAll(filepath.Dir(name), dirPerm) + var f io.Closer + f, err := os.Create(name) + if err != nil { + return err + } + return f.Close() +} + +type file struct { + // The safe, OS-local file path. + safeOsPath string + beginPieceIndex int + endPieceIndex int + length int64 +} + +func (f file) partFilePath() string { + return f.safeOsPath + ".part" +} diff --git a/storage/file-torrent-io.go b/storage/file-torrent-io.go new file mode 100644 index 00000000..ef6e949a --- /dev/null +++ b/storage/file-torrent-io.go @@ -0,0 +1,113 @@ +package storage + +import ( + "errors" + "io" + "io/fs" + "os" + "path/filepath" + + "github.com/anacrolix/torrent/segments" +) + +// Exposes file-based storage of a torrent, as one big ReadWriterAt. +type fileTorrentImplIO struct { + fts *fileTorrentImpl +} + +// Returns EOF on short or missing file. +func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) { + var f *os.File + if fst.fts.partFiles() { + f, err = os.Open(file.partFilePath()) + } + if err == nil && f == nil || errors.Is(err, fs.ErrNotExist) { + f, err = os.Open(file.safeOsPath) + } + if errors.Is(err, fs.ErrNotExist) { + // File missing is treated the same as a short file. Should we propagate this through the + // interface now that fs.ErrNotExist is a thing? + err = io.EOF + return + } + if err != nil { + return + } + defer f.Close() + // Limit the read to within the expected bounds of this file. + if int64(len(b)) > file.length-off { + b = b[:file.length-off] + } + for off < file.length && len(b) != 0 { + n1, err1 := f.ReadAt(b, off) + b = b[n1:] + n += n1 + off += int64(n1) + if n1 == 0 { + err = err1 + break + } + } + return +} + +// Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF. +func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) { + fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(b))}, func(i int, e segments.Extent) bool { + n1, err1 := fst.readFileAt(fst.fts.files[i], b[:e.Length], e.Start) + n += n1 + b = b[n1:] + err = err1 + return err == nil // && int64(n1) == e.Length + }) + if len(b) != 0 && err == nil { + err = io.EOF + } + return +} + +func (fst fileTorrentImplIO) openForWrite(file file) (f *os.File, err error) { + p := fst.fts.pathForWrite(file) + f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm) + if err == nil { + return + } + if errors.Is(err, fs.ErrNotExist) { + err = os.MkdirAll(filepath.Dir(p), dirPerm) + if err != nil { + return + } + } else if errors.Is(err, fs.ErrPermission) { + err = os.Chmod(p, filePerm) + if err != nil { + return + } + } + f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm) + return +} + +func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) { + // log.Printf("write at %v: %v bytes", off, len(p)) + fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool { + var f *os.File + f, err = fst.openForWrite(fst.fts.files[i]) + if err != nil { + return false + } + var n1 int + n1, err = f.WriteAt(p[:e.Length], e.Start) + // log.Printf("%v %v wrote %v: %v", i, e, n1, err) + closeErr := f.Close() + n += n1 + p = p[n1:] + if err == nil { + err = closeErr + } + if err == nil && int64(n1) != e.Length { + err = io.ErrShortWrite + } + return err == nil + }) + return +} diff --git a/storage/file-torrent.go b/storage/file-torrent.go new file mode 100644 index 00000000..b08c2dca --- /dev/null +++ b/storage/file-torrent.go @@ -0,0 +1,115 @@ +package storage + +import ( + "errors" + "fmt" + "io" + "io/fs" + "log/slog" + "os" + + "github.com/anacrolix/missinggo/v2" + + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/segments" +) + +type fileTorrentImpl struct { + info *metainfo.Info + files []file + segmentLocater segments.Index + infoHash metainfo.Hash + // Save memory by pointing to the other data. + client *fileClientImpl +} + +func (fts *fileTorrentImpl) logger() *slog.Logger { + return fts.client.opts.Logger +} + +func (fts *fileTorrentImpl) pieceCompletion() PieceCompletion { + return fts.client.opts.PieceCompletion +} + +func (fts *fileTorrentImpl) pieceCompletionKey(p int) metainfo.PieceKey { + return metainfo.PieceKey{ + InfoHash: fts.infoHash, + Index: p, + } +} + +func (fts *fileTorrentImpl) setPieceCompletion(p int, complete bool) error { + return fts.pieceCompletion().Set(fts.pieceCompletionKey(p), complete) +} + +// Set piece completions based on whether all files in each piece are not .part files. +func (fts *fileTorrentImpl) setCompletionFromPartFiles() error { + notComplete := make([]bool, fts.info.NumPieces()) + for _, f := range fts.files { + fi, err := os.Stat(f.safeOsPath) + if err == nil { + if fi.Size() == f.length { + continue + } + fts.logger().Warn("file has unexpected size", "file", f.safeOsPath, "size", fi.Size(), "expected", f.length) + } else if !errors.Is(err, fs.ErrNotExist) { + fts.logger().Warn("error checking file size", "err", err) + } + for i := f.beginPieceIndex; i < f.endPieceIndex; i++ { + notComplete[i] = true + } + } + for i, nc := range notComplete { + if nc { + // Use whatever the piece completion has, or trigger a hash. + continue + } + err := fts.setPieceCompletion(i, true) + if err != nil { + return fmt.Errorf("setting piece %v completion: %w", i, err) + } + } + return nil +} + +func (fts *fileTorrentImpl) partFiles() bool { + return fts.client.opts.partFiles() +} + +func (fts *fileTorrentImpl) pathForWrite(f file) string { + if fts.partFiles() { + return f.partFilePath() + } + return f.safeOsPath +} + +func (fts *fileTorrentImpl) getCompletion(piece int) Completion { + cmpl, err := fts.pieceCompletion().Get(metainfo.PieceKey{fts.infoHash, piece}) + cmpl.Err = errors.Join(cmpl.Err, err) + return cmpl +} + +func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl { + // Create a view onto the file-based torrent storage. + _io := fileTorrentImplIO{fts} + // Return the appropriate segments of this. + return &filePieceImpl{ + fts, + p, + missinggo.NewSectionWriter(_io, p.Offset(), p.Length()), + io.NewSectionReader(_io, p.Offset(), p.Length()), + } +} + +func (fs *fileTorrentImpl) Close() error { + return nil +} + +func (fts *fileTorrentImpl) Flush() error { + for _, f := range fts.files { + if err := fsync(fts.pathForWrite(f)); err != nil { + return err + } + } + return nil +} diff --git a/storage/file.go b/storage/file.go deleted file mode 100644 index f823e831..00000000 --- a/storage/file.go +++ /dev/null @@ -1,381 +0,0 @@ -package storage - -import ( - "context" - "errors" - "fmt" - "io" - "io/fs" - "iter" - "log/slog" - "os" - "path/filepath" - - g "github.com/anacrolix/generics" - "github.com/anacrolix/log" - "github.com/anacrolix/missinggo/v2" - - "github.com/anacrolix/torrent/metainfo" - "github.com/anacrolix/torrent/segments" -) - -// File-based storage for torrents, that isn't yet bound to a particular torrent. -type fileClientImpl struct { - opts NewFileClientOpts -} - -// All Torrent data stored in this baseDir. The info names of each torrent are used as directories. -func NewFile(baseDir string) ClientImplCloser { - return NewFileWithCompletion(baseDir, pieceCompletionForDir(baseDir)) -} - -type NewFileClientOpts struct { - // The base directory for all downloads. - ClientBaseDir string - FilePathMaker FilePathMaker - TorrentDirMaker TorrentDirFilePathMaker - PieceCompletion PieceCompletion - UsePartFiles g.Option[bool] - Logger *slog.Logger -} - -// The specific part-files option or the default. -func (me NewFileClientOpts) partFiles() bool { - return me.UsePartFiles.UnwrapOr(true) -} - -// NewFileOpts creates a new ClientImplCloser that stores files using the OS native filesystem. -func NewFileOpts(opts NewFileClientOpts) ClientImplCloser { - if opts.TorrentDirMaker == nil { - opts.TorrentDirMaker = defaultPathMaker - } - if opts.FilePathMaker == nil { - opts.FilePathMaker = func(opts FilePathMakerOpts) string { - var parts []string - if opts.Info.BestName() != metainfo.NoName { - parts = append(parts, opts.Info.BestName()) - } - return filepath.Join(append(parts, opts.File.BestPath()...)...) - } - } - if opts.PieceCompletion == nil { - if opts.partFiles() { - opts.PieceCompletion = NewMapPieceCompletion() - } else { - opts.PieceCompletion = pieceCompletionForDir(opts.ClientBaseDir) - } - } - if opts.Logger == nil { - opts.Logger = log.Default.Slogger() - } - return &fileClientImpl{opts} -} - -func (me *fileClientImpl) Close() error { - return me.opts.PieceCompletion.Close() -} - -func enumIter[T any](i iter.Seq[T]) iter.Seq2[int, T] { - return func(yield func(int, T) bool) { - j := 0 - for t := range i { - if !yield(j, t) { - return - } - j++ - } - } -} - -func (fs *fileClientImpl) OpenTorrent( - ctx context.Context, - info *metainfo.Info, - infoHash metainfo.Hash, -) (_ TorrentImpl, err error) { - dir := fs.opts.TorrentDirMaker(fs.opts.ClientBaseDir, info, infoHash) - logger := log.ContextLogger(ctx).Slogger() - logger.DebugContext(ctx, "opened file torrent storage", slog.String("dir", dir)) - var files []file - for i, fileInfo := range enumIter(info.UpvertedFilesIter()) { - filePath := filepath.Join(dir, fs.opts.FilePathMaker(FilePathMakerOpts{ - Info: info, - File: &fileInfo, - })) - if !isSubFilepath(dir, filePath) { - err = fmt.Errorf("file %v: path %q is not sub path of %q", i, filePath, dir) - return - } - f := file{ - safeOsPath: filePath, - length: fileInfo.Length, - beginPieceIndex: fileInfo.BeginPieceIndex(info.PieceLength), - endPieceIndex: fileInfo.EndPieceIndex(info.PieceLength), - } - if f.length == 0 { - err = CreateNativeZeroLengthFile(f.safeOsPath) - if err != nil { - err = fmt.Errorf("creating zero length file: %w", err) - return - } - } - files = append(files, f) - } - t := &fileTorrentImpl{ - info, - files, - info.FileSegmentsIndex(), - infoHash, - fs, - } - if t.partFiles() { - err = t.setCompletionFromPartFiles() - if err != nil { - err = fmt.Errorf("setting completion from part files: %w", err) - return - } - } - return TorrentImpl{ - Piece: t.Piece, - Close: t.Close, - Flush: t.Flush, - }, nil -} - -type file struct { - // The safe, OS-local file path. - safeOsPath string - beginPieceIndex int - endPieceIndex int - length int64 -} - -func (f file) partFilePath() string { - return f.safeOsPath + ".part" -} - -type fileTorrentImpl struct { - info *metainfo.Info - files []file - segmentLocater segments.Index - infoHash metainfo.Hash - // Save memory by pointing to the other data. - client *fileClientImpl -} - -func (fts *fileTorrentImpl) logger() *slog.Logger { - return fts.client.opts.Logger -} - -func (fts *fileTorrentImpl) pieceCompletion() PieceCompletion { - return fts.client.opts.PieceCompletion -} - -func (fts *fileTorrentImpl) pieceCompletionKey(p int) metainfo.PieceKey { - return metainfo.PieceKey{ - InfoHash: fts.infoHash, - Index: p, - } -} - -func (fts *fileTorrentImpl) setPieceCompletion(p int, complete bool) error { - return fts.pieceCompletion().Set(fts.pieceCompletionKey(p), complete) -} - -// Set piece completions based on whether all files in each piece are not .part files. -func (fts *fileTorrentImpl) setCompletionFromPartFiles() error { - notComplete := make([]bool, fts.info.NumPieces()) - for _, f := range fts.files { - fi, err := os.Stat(f.safeOsPath) - if err == nil { - if fi.Size() == f.length { - continue - } - fts.logger().Warn("file has unexpected size", "file", f.safeOsPath, "size", fi.Size(), "expected", f.length) - } else if !errors.Is(err, fs.ErrNotExist) { - fts.logger().Warn("error checking file size", "err", err) - } - for i := f.beginPieceIndex; i < f.endPieceIndex; i++ { - notComplete[i] = true - } - } - for i, nc := range notComplete { - if nc { - // Use whatever the piece completion has, or trigger a hash. - continue - } - err := fts.setPieceCompletion(i, true) - if err != nil { - return fmt.Errorf("setting piece %v completion: %w", i, err) - } - } - return nil -} - -func (fts *fileTorrentImpl) partFiles() bool { - return fts.client.opts.partFiles() -} - -func (fts *fileTorrentImpl) pathForWrite(f file) string { - if fts.partFiles() { - return f.partFilePath() - } - return f.safeOsPath -} - -func (fts *fileTorrentImpl) getCompletion(piece int) Completion { - cmpl, err := fts.pieceCompletion().Get(metainfo.PieceKey{fts.infoHash, piece}) - cmpl.Err = errors.Join(cmpl.Err, err) - return cmpl -} - -func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl { - // Create a view onto the file-based torrent storage. - _io := fileTorrentImplIO{fts} - // Return the appropriate segments of this. - return &filePieceImpl{ - fts, - p, - missinggo.NewSectionWriter(_io, p.Offset(), p.Length()), - io.NewSectionReader(_io, p.Offset(), p.Length()), - } -} - -func (fs *fileTorrentImpl) Close() error { - return nil -} - -func fsync(filePath string) (err error) { - f, err := os.OpenFile(filePath, os.O_WRONLY, filePerm) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - return - } - defer f.Close() - if err = f.Sync(); err != nil { - return err - } - return f.Close() -} - -func (fts *fileTorrentImpl) Flush() error { - for _, f := range fts.files { - if err := fsync(fts.pathForWrite(f)); err != nil { - return err - } - } - return nil -} - -// A helper to create zero-length files which won't appear for file-orientated storage since no -// writes will ever occur to them (no torrent data is associated with a zero-length file). The -// caller should make sure the file name provided is safe/sanitized. -func CreateNativeZeroLengthFile(name string) error { - os.MkdirAll(filepath.Dir(name), dirPerm) - var f io.Closer - f, err := os.Create(name) - if err != nil { - return err - } - return f.Close() -} - -// Exposes file-based storage of a torrent, as one big ReadWriterAt. -type fileTorrentImplIO struct { - fts *fileTorrentImpl -} - -// Returns EOF on short or missing file. -func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) { - var f *os.File - if fst.fts.partFiles() { - f, err = os.Open(file.partFilePath()) - } - if err == nil && f == nil || errors.Is(err, fs.ErrNotExist) { - f, err = os.Open(file.safeOsPath) - } - if errors.Is(err, fs.ErrNotExist) { - // File missing is treated the same as a short file. Should we propagate this through the - // interface now that fs.ErrNotExist is a thing? - err = io.EOF - return - } - if err != nil { - return - } - defer f.Close() - // Limit the read to within the expected bounds of this file. - if int64(len(b)) > file.length-off { - b = b[:file.length-off] - } - for off < file.length && len(b) != 0 { - n1, err1 := f.ReadAt(b, off) - b = b[n1:] - n += n1 - off += int64(n1) - if n1 == 0 { - err = err1 - break - } - } - return -} - -// Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF. -func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) { - fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(b))}, func(i int, e segments.Extent) bool { - n1, err1 := fst.readFileAt(fst.fts.files[i], b[:e.Length], e.Start) - n += n1 - b = b[n1:] - err = err1 - return err == nil // && int64(n1) == e.Length - }) - if len(b) != 0 && err == nil { - err = io.EOF - } - return -} - -func (fst fileTorrentImplIO) openForWrite(file file) (f *os.File, err error) { - p := fst.fts.pathForWrite(file) - f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm) - if err == nil { - return - } - if errors.Is(err, fs.ErrNotExist) { - err = os.MkdirAll(filepath.Dir(p), dirPerm) - if err != nil { - return - } - } else if errors.Is(err, fs.ErrPermission) { - err = os.Chmod(p, filePerm) - if err != nil { - return - } - } - f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm) - return -} - -func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) { - // log.Printf("write at %v: %v bytes", off, len(p)) - fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool { - var f *os.File - f, err = fst.openForWrite(fst.fts.files[i]) - if err != nil { - return false - } - var n1 int - n1, err = f.WriteAt(p[:e.Length], e.Start) - // log.Printf("%v %v wrote %v: %v", i, e, n1, err) - closeErr := f.Close() - n += n1 - p = p[n1:] - if err == nil { - err = closeErr - } - if err == nil && int64(n1) != e.Length { - err = io.ErrShortWrite - } - return err == nil - }) - return -} diff --git a/storage/iter.go b/storage/iter.go new file mode 100644 index 00000000..9fe49c5e --- /dev/null +++ b/storage/iter.go @@ -0,0 +1,17 @@ +package storage + +import ( + "iter" +) + +func enumIter[T any](i iter.Seq[T]) iter.Seq2[int, T] { + return func(yield func(int, T) bool) { + j := 0 + for t := range i { + if !yield(j, t) { + return + } + j++ + } + } +} -- 2.51.0