From df616a5120753cfcf7b94f9ed3cde2bee6cc5fc2 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 13 Aug 2025 00:27:26 +1000 Subject: [PATCH] Add mmap alternative IO system for file storage --- storage/file-client.go | 26 ++++++++++++++++++++++- storage/file-io.go | 47 +---------------------------------------- storage/file-piece.go | 31 ++++++++++++++++++++++++++- storage/file-torrent.go | 4 ++-- zero-reader.go | 1 + 5 files changed, 59 insertions(+), 50 deletions(-) diff --git a/storage/file-client.go b/storage/file-client.go index ee3b5efc..3bcbd634 100644 --- a/storage/file-client.go +++ b/storage/file-client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "os" "path/filepath" g "github.com/anacrolix/generics" @@ -70,6 +71,29 @@ func (me *fileClientImpl) Close() error { return me.opts.PieceCompletion.Close() } +var defaultFileIo func() fileIo = func() fileIo { + return classicFileIo{} +} + +func init() { + s, ok := os.LookupEnv("TORRENT_STORAGE_DEFAULT_FILE_IO") + if !ok { + return + } + switch s { + case "mmap": + defaultFileIo = func() fileIo { + return &mmapFileIo{} + } + case "classic": + defaultFileIo = func() fileIo { + return classicFileIo{} + } + default: + panic(s) + } +} + func (fs *fileClientImpl) OpenTorrent( ctx context.Context, info *metainfo.Info, @@ -104,7 +128,7 @@ func (fs *fileClientImpl) OpenTorrent( metainfoFileInfos, info.FileSegmentsIndex(), infoHash, - classicFileIo{}, + defaultFileIo(), fs, } if t.partFiles() { diff --git a/storage/file-io.go b/storage/file-io.go index 5dd6e3c7..a8962e19 100644 --- a/storage/file-io.go +++ b/storage/file-io.go @@ -1,11 +1,7 @@ package storage import ( - "errors" "io" - "io/fs" - "os" - "path/filepath" ) type fileWriter interface { @@ -24,46 +20,5 @@ type fileReader interface { type fileIo interface { openForSharedRead(name string) (sharedFileIf, error) openForRead(name string) (fileReader, error) - openForWrite(name string) (fileWriter, error) -} - -type classicFileIo struct{} - -func (i classicFileIo) openForSharedRead(name string) (sharedFileIf, error) { - return sharedFiles.Open(name) -} - -type classicFileReader struct { - *os.File -} - -func (c classicFileReader) seekData(offset int64) (ret int64, err error) { - return seekData(c.File, offset) -} - -func (i classicFileIo) openForRead(name string) (fileReader, error) { - f, err := os.Open(name) - return classicFileReader{f}, err -} - -func (classicFileIo) openForWrite(p string) (f fileWriter, err error) { - f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm) - if err == nil { - return - } - if errors.Is(err, fs.ErrNotExist) { - err = os.MkdirAll(filepath.Dir(p), dirPerm) - if err != nil { - return - } - } else if errors.Is(err, fs.ErrPermission) { - err = os.Chmod(p, filePerm) - if err != nil { - return - } - } else { - return - } - f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm) - return + openForWrite(name string, size int64) (fileWriter, error) } diff --git a/storage/file-piece.go b/storage/file-piece.go index ee2da24e..53f4af86 100644 --- a/storage/file-piece.go +++ b/storage/file-piece.go @@ -299,6 +299,24 @@ var ( packageExpvarMap = expvar.NewMap("torrentStorage") ) +type limitWriter struct { + rem int64 + w io.Writer +} + +func (me *limitWriter) Write(p []byte) (n int, err error) { + n, err = me.w.Write(p[:min(int64(len(p)), me.rem)]) + me.rem -= int64(n) + if err != nil { + return + } + p = p[n:] + if len(p) > 0 { + err = io.ErrShortWrite + } + return +} + func (me *filePieceImpl) writeFileTo(w io.Writer, fileIndex int, extent segments.Extent) (written int64, err error) { if extent.Length == 0 { return @@ -337,7 +355,18 @@ func (me *filePieceImpl) writeFileTo(w io.Writer, fileIndex int, extent segments extentRemaining -= n1 } var n1 int64 - n1, err = io.CopyN(w, f, extentRemaining) + if true { + n1, err = f.WriteTo(&limitWriter{ + rem: extentRemaining, + w: w, + }) + // limitWriter will block f from writing too much. + if n1 == extentRemaining { + err = nil + } + } else { + n1, err = io.CopyN(w, f, extentRemaining) + } packageExpvarMap.Add("bytesReadNotSkipped", n1) written += n1 return diff --git a/storage/file-torrent.go b/storage/file-torrent.go index 0c753ed2..54237a70 100644 --- a/storage/file-torrent.go +++ b/storage/file-torrent.go @@ -164,8 +164,8 @@ func (me *fileTorrentImpl) openFile(file file) (f fileReader, err error) { return } -func (fst fileTorrentImpl) openForWrite(file file) (_ fileWriter, err error) { +func (fst *fileTorrentImpl) openForWrite(file file) (_ fileWriter, err error) { // It might be possible to have a writable handle shared files cache if we need it. fst.logger().Debug("openForWrite", "file.safeOsPath", file.safeOsPath) - return fst.io.openForWrite(fst.pathForWrite(&file)) + return fst.io.openForWrite(fst.pathForWrite(&file), file.FileInfo.Length) } diff --git a/zero-reader.go b/zero-reader.go index 1d0a899b..92fc8059 100644 --- a/zero-reader.go +++ b/zero-reader.go @@ -1,5 +1,6 @@ package torrent +// TODO: This should implement extra methods to make io.CopyN more efficient. var zeroReader zeroReaderType type zeroReaderType struct{} -- 2.51.0