]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Use msync flushing in mmap file io
authorMatt Joiner <anacrolix@gmail.com>
Thu, 14 Aug 2025 02:13:49 +0000 (12:13 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 14 Aug 2025 02:13:49 +0000 (12:13 +1000)
17 files changed:
mmap-span/mmap-span.go
piece.go
storage/file-client.go
storage/file-io-classic.go
storage/file-io-mmap.go
storage/file-io.go
storage/file-misc.go
storage/file-piece.go
storage/file-torrent.go
storage/interface.go
storage/mmap.go
storage/seek-data_windows.go [deleted file]
storage/sys_unix.go [moved from storage/seek-data_unix.go with 69% similarity]
storage/sys_windows.go [new file with mode: 0644]
storage/wrappers.go
time_test.go [new file with mode: 0644]
torrent.go

index 91cc6b269bc79322b62de5b83632b1ff8a2bbfae..1617d86477318af850cd35eb6a7126d5ba80d15b 100644 (file)
@@ -31,14 +31,11 @@ func New(mMaps []Mmap, index segments.Index) *MMapSpan {
        }
 }
 
-func (ms *MMapSpan) Flush() (errs []error) {
+func (ms *MMapSpan) Flush() (err error) {
        ms.mu.RLock()
        defer ms.mu.RUnlock()
        for _, mMap := range ms.mMaps {
-               err := mMap.Flush()
-               if err != nil {
-                       errs = append(errs, err)
-               }
+               err = errors.Join(err, mMap.Flush())
        }
        return
 }
index da548935ea79f726e6c46dd663b4b8cf7a9dd29e..1a039750f5095bb276966856686e6f2546c610de 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -86,10 +86,8 @@ func (p *Piece) Storage() storage.Piece {
        return p.t.storage.PieceWithHash(p.Info(), pieceHash)
 }
 
-func (p *Piece) Flush() {
-       if p.t.storage.Flush != nil {
-               _ = p.t.storage.Flush()
-       }
+func (p *Piece) Flush() error {
+       return p.Storage().Flush()
 }
 
 func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool {
index 3bcbd634d02a888def6275a735ed2d5a533df3b8..dde123150ea4cb9796487e0de9a59ea362882e9e 100644 (file)
@@ -141,6 +141,5 @@ func (fs *fileClientImpl) OpenTorrent(
        return TorrentImpl{
                Piece: t.Piece,
                Close: t.Close,
-               Flush: t.Flush,
        }, nil
 }
index 9a36ae9f3316f34f7b5ff49961d7541407d94060..bb54e5a773a01c1d00a7e9d5eb8402278a933ce4 100644 (file)
@@ -6,19 +6,15 @@ import (
 
 type classicFileIo struct{}
 
-func (i classicFileIo) openForSharedRead(name string) (sharedFileIf, error) {
-       return sharedFiles.Open(name)
-}
-
-type classicFileReader struct {
-       *os.File
+func (me classicFileIo) flush(name string, offset, nbytes int64) error {
+       return fsync(name)
 }
 
-func (c classicFileReader) seekData(offset int64) (ret int64, err error) {
-       return seekData(c.File, offset)
+func (me classicFileIo) openForSharedRead(name string) (sharedFileIf, error) {
+       return sharedFiles.Open(name)
 }
 
-func (i classicFileIo) openForRead(name string) (fileReader, error) {
+func (me classicFileIo) openForRead(name string) (fileReader, error) {
        f, err := os.Open(name)
        return classicFileReader{f}, err
 }
@@ -26,3 +22,11 @@ func (i classicFileIo) openForRead(name string) (fileReader, error) {
 func (classicFileIo) openForWrite(p string, size int64) (f fileWriter, err error) {
        return openFileExtra(p, os.O_WRONLY)
 }
+
+type classicFileReader struct {
+       *os.File
+}
+
+func (c classicFileReader) seekData(offset int64) (ret int64, err error) {
+       return seekData(c.File, offset)
+}
index af070cb3c9c4c7f184b3e0c432af81f007f68e5f..ed181f3c489128321d90d5f25f88065e031e5a7c 100644 (file)
@@ -14,10 +14,25 @@ import (
 )
 
 type mmapFileIo struct {
-       mu    sync.Mutex
+       mu    sync.RWMutex
        paths map[string]*fileMmap
 }
 
+func (me *mmapFileIo) flush(name string, offset, nbytes int64) error {
+       // Since we are only flushing writes that we created, and we don't currently unmap files after
+       // we've opened them, then if the mmap doesn't exist yet then there's nothing to flush.
+       me.mu.RLock()
+       defer me.mu.RUnlock()
+       v, ok := me.paths[name]
+       if !ok {
+               return nil
+       }
+       if !v.writable {
+               return nil
+       }
+       return msync(v.m, int(offset), int(nbytes))
+}
+
 type fileMmap struct {
        m        mmap.MMap
        writable bool
index a8962e19786b3f9990e3c139e1e5e5e7fcbefc12..f7c4495eb6dda458f8d663a775e1bbec79ac574b 100644 (file)
@@ -21,4 +21,5 @@ type fileIo interface {
        openForSharedRead(name string) (sharedFileIf, error)
        openForRead(name string) (fileReader, error)
        openForWrite(name string, size int64) (fileWriter, error)
+       flush(name string, offset, nbytes int64) error
 }
index 16a80e29c2ab00861a8f693fe836b8d2c4980b8e..73396200bd0c8d5567d89ea30493be940a35e463 100644 (file)
@@ -1,9 +1,7 @@
 package storage
 
 import (
-       "errors"
        "io"
-       "io/fs"
        "os"
        "path/filepath"
        "sync"
@@ -31,7 +29,7 @@ func minFileLengthsForTorrentExtent(
 
 func fsync(filePath string) (err error) {
        f, err := os.OpenFile(filePath, os.O_WRONLY, filePerm)
-       if err != nil && !errors.Is(err, fs.ErrNotExist) {
+       if err != nil {
                return
        }
        defer f.Close()
index 080b3b643adbb5cdb4456f2f4ae05dc867792bc6..76ec477e25c30c3c9241dffbb2c166894fb89bf1 100644 (file)
@@ -29,8 +29,22 @@ var _ interface {
        PieceImpl
        //PieceReaderer
        io.WriterTo
+       Flusher
 } = (*filePieceImpl)(nil)
 
+func (me *filePieceImpl) Flush() (err error) {
+       for fileIndex, extent := range me.fileExtents() {
+               file := me.t.file(fileIndex)
+               name := me.t.pathForWrite(&file)
+               err1 := me.t.io.flush(name, extent.Start, extent.Length)
+               if err1 != nil {
+                       err = errors.Join(err, fmt.Errorf("flushing %q: %w", name, err1))
+                       return
+               }
+       }
+       return nil
+}
+
 func (me *filePieceImpl) logger() *slog.Logger {
        return me.t.client.opts.Logger
 }
@@ -46,9 +60,13 @@ func (me *filePieceImpl) extent() segments.Extent {
        }
 }
 
+func (me *filePieceImpl) fileExtents() iter.Seq2[int, segments.Extent] {
+       return me.t.segmentLocater.LocateIter(me.extent())
+}
+
 func (me *filePieceImpl) pieceFiles() iter.Seq[file] {
        return func(yield func(file) bool) {
-               for fileIndex := range me.t.segmentLocater.LocateIter(me.extent()) {
+               for fileIndex := range me.fileExtents() {
                        f := me.t.file(fileIndex)
                        if !yield(f) {
                                return
index 54237a70adc97a2c853426a84907b52fe84e9423..fac6ab7f62dc1ed406031fd58511a4b8860a8011 100644 (file)
@@ -115,17 +115,6 @@ func (fs *fileTorrentImpl) Close() error {
        return nil
 }
 
-func (fts *fileTorrentImpl) Flush() error {
-       for i := range fts.files {
-               f := fts.file(i)
-               fts.logger().Debug("flushing", "file.safeOsPath", f.safeOsPath)
-               if err := fsync(fts.pathForWrite(&f)); err != nil {
-                       return err
-               }
-       }
-       return nil
-}
-
 func (fts *fileTorrentImpl) file(index int) file {
        return file{
                Info:      fts.info,
index 9aadfc8a4988fc3e2c6d66111f71ca28a5207297..a5acc56f62ebf92e786b2075c72178da50ed8b55 100644 (file)
@@ -32,7 +32,6 @@ type TorrentImpl struct {
        // Preferred over PieceWithHash. Called with the piece hash if it's available.
        PieceWithHash func(p metainfo.Piece, pieceHash g.Option[[]byte]) PieceImpl
        Close         func() error
-       Flush         func() error
        // Storages that share the same space, will provide equal pointers. The function is called once
        // to determine the storage for torrents sharing the same function pointer, and mutated in
        // place.
@@ -64,6 +63,10 @@ type PieceImpl interface {
        Completion() Completion
 }
 
+type Flusher interface {
+       Flush() error
+}
+
 // Completion state of a piece.
 type Completion struct {
        Err error
index 9587927608739af7fee6d7533a680ded145357be..6b8478d8460fdc5e15928ddf94c6e8da6327a2c2 100644 (file)
@@ -46,7 +46,7 @@ func (s *mmapClientImpl) OpenTorrent(
                span:     span,
                pc:       s.pc,
        }
-       return TorrentImpl{Piece: t.Piece, Close: t.Close, Flush: t.Flush}, err
+       return TorrentImpl{Piece: t.Piece, Close: t.Close}, err
 }
 
 func (s *mmapClientImpl) Close() error {
@@ -61,9 +61,8 @@ type mmapTorrentStorage struct {
 
 func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
        return mmapStoragePiece{
-               pc:       ts.pc,
+               t:        ts,
                p:        p,
-               ih:       ts.infoHash,
                ReaderAt: io.NewSectionReader(ts.span, p.Offset(), p.Length()),
                WriterAt: missinggo.NewSectionWriter(ts.span, p.Offset(), p.Length()),
        }
@@ -73,28 +72,27 @@ func (ts *mmapTorrentStorage) Close() error {
        return ts.span.Close()
 }
 
-func (ts *mmapTorrentStorage) Flush() error {
-       errs := ts.span.Flush()
-       if len(errs) > 0 {
-               return errs[0]
-       }
-       return nil
-}
-
 type mmapStoragePiece struct {
-       pc PieceCompletionGetSetter
-       p  metainfo.Piece
-       ih metainfo.Hash
+       t *mmapTorrentStorage
+       p metainfo.Piece
        io.ReaderAt
        io.WriterAt
 }
 
+var _ Flusher = mmapStoragePiece{}
+
+func (me mmapStoragePiece) Flush() error {
+       // TODO: Flush just the regions of the files we care about. At least this is no worse than it
+       // was previously.
+       return me.t.span.Flush()
+}
+
 func (me mmapStoragePiece) pieceKey() metainfo.PieceKey {
-       return metainfo.PieceKey{me.ih, me.p.Index()}
+       return metainfo.PieceKey{me.t.infoHash, me.p.Index()}
 }
 
 func (sp mmapStoragePiece) Completion() Completion {
-       c, err := sp.pc.Get(sp.pieceKey())
+       c, err := sp.t.pc.Get(sp.pieceKey())
        if err != nil {
                panic(err)
        }
@@ -102,11 +100,11 @@ func (sp mmapStoragePiece) Completion() Completion {
 }
 
 func (sp mmapStoragePiece) MarkComplete() error {
-       return sp.pc.Set(sp.pieceKey(), true)
+       return sp.t.pc.Set(sp.pieceKey(), true)
 }
 
 func (sp mmapStoragePiece) MarkNotComplete() error {
-       return sp.pc.Set(sp.pieceKey(), false)
+       return sp.t.pc.Set(sp.pieceKey(), false)
 }
 
 func mMapTorrent(md *metainfo.Info, location string) (mms *mmapSpan.MMapSpan, err error) {
diff --git a/storage/seek-data_windows.go b/storage/seek-data_windows.go
deleted file mode 100644 (file)
index ccfe202..0000000
+++ /dev/null
@@ -1,10 +0,0 @@
-package storage
-
-import (
-       "io"
-       "os"
-)
-
-func seekData(f *os.File, offset int64) (ret int64, err error) {
-       return f.Seek(offset, io.SeekStart)
-}
similarity index 69%
rename from storage/seek-data_unix.go
rename to storage/sys_unix.go
index 35e0dee023658e847955b11627caefbb096816ec..625f420f10ee3e8314bb7ec273c1b87386d87191 100644 (file)
@@ -6,6 +6,7 @@ import (
        "io"
        "os"
 
+       "github.com/edsrzf/mmap-go"
        "golang.org/x/sys/unix"
 )
 
@@ -17,3 +18,7 @@ func seekData(f *os.File, offset int64) (ret int64, err error) {
        }
        return
 }
+
+func msync(mm mmap.MMap, offset, nbytes int) error {
+       return unix.Msync(mm[offset:offset+nbytes], unix.MS_SYNC)
+}
diff --git a/storage/sys_windows.go b/storage/sys_windows.go
new file mode 100644 (file)
index 0000000..10ce40f
--- /dev/null
@@ -0,0 +1,17 @@
+package storage
+
+import (
+       "io"
+       "os"
+
+       "github.com/edsrzf/mmap-go"
+)
+
+func seekData(f *os.File, offset int64) (ret int64, err error) {
+       return f.Seek(offset, io.SeekStart)
+}
+
+func msync(mm mmap.MMap, offset, nbytes int) error {
+       // Fuck you Windows you suck. TODO: Use windows.FlushViewOfFile.
+       return mm.Flush()
+}
index 6d7ee64f3bfb7b10c73e556c3f4342367fb1dcea..22e3812d9790626d020b756153248aae6cc209dc 100644 (file)
@@ -144,6 +144,13 @@ func (p Piece) NewReader() (PieceReader, error) {
        }, nil
 }
 
+func (p Piece) Flush() error {
+       if fl, ok := p.PieceImpl.(Flusher); ok {
+               return fl.Flush()
+       }
+       return nil
+}
+
 type nopCloser struct{}
 
 func (nopCloser) Close() error {
diff --git a/time_test.go b/time_test.go
new file mode 100644 (file)
index 0000000..10cbafc
--- /dev/null
@@ -0,0 +1 @@
+package torrent
index 6ba872183fe020978338d18323fea94285cfbd76..e3516f20b3d3fc937e8fbaa5baaeb9226598a16a 100644 (file)
@@ -2558,7 +2558,12 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
                hasDirty := p.hasDirtyChunks()
                t.cl.unlock()
                if hasDirty {
-                       p.Flush() // You can be synchronous here!
+                       // This could return fs.ErrNotExist, and that would be unexpected since we haven't
+                       // marked it complete yet, and nobody should have moved it.
+                       err := p.Flush() // You can be synchronous here!
+                       if err != nil {
+                               t.slogger().Warn("error flushing piece storage", "piece", piece, "err", err)
+                       }
                }
                p.race++
                err := p.Storage().MarkComplete()