From 1a235d61c6d4808b14539556a2a2354e4974a47b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 14 Aug 2025 12:13:49 +1000 Subject: [PATCH] Use msync flushing in mmap file io --- mmap-span/mmap-span.go | 7 ++--- piece.go | 6 ++-- storage/file-client.go | 1 - storage/file-io-classic.go | 22 ++++++++------ storage/file-io-mmap.go | 17 ++++++++++- storage/file-io.go | 1 + storage/file-misc.go | 4 +-- storage/file-piece.go | 20 ++++++++++++- storage/file-torrent.go | 11 ------- storage/interface.go | 5 +++- storage/mmap.go | 34 ++++++++++------------ storage/seek-data_windows.go | 10 ------- storage/{seek-data_unix.go => sys_unix.go} | 5 ++++ storage/sys_windows.go | 17 +++++++++++ storage/wrappers.go | 7 +++++ time_test.go | 1 + torrent.go | 7 ++++- 17 files changed, 110 insertions(+), 65 deletions(-) delete mode 100644 storage/seek-data_windows.go rename storage/{seek-data_unix.go => sys_unix.go} (69%) create mode 100644 storage/sys_windows.go create mode 100644 time_test.go diff --git a/mmap-span/mmap-span.go b/mmap-span/mmap-span.go index 91cc6b26..1617d864 100644 --- a/mmap-span/mmap-span.go +++ b/mmap-span/mmap-span.go @@ -31,14 +31,11 @@ func New(mMaps []Mmap, index segments.Index) *MMapSpan { } } -func (ms *MMapSpan) Flush() (errs []error) { +func (ms *MMapSpan) Flush() (err error) { ms.mu.RLock() defer ms.mu.RUnlock() for _, mMap := range ms.mMaps { - err := mMap.Flush() - if err != nil { - errs = append(errs, err) - } + err = errors.Join(err, mMap.Flush()) } return } diff --git a/piece.go b/piece.go index da548935..1a039750 100644 --- a/piece.go +++ b/piece.go @@ -86,10 +86,8 @@ func (p *Piece) Storage() storage.Piece { return p.t.storage.PieceWithHash(p.Info(), pieceHash) } -func (p *Piece) Flush() { - if p.t.storage.Flush != nil { - _ = p.t.storage.Flush() - } +func (p *Piece) Flush() error { + return p.Storage().Flush() } func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool { diff --git a/storage/file-client.go b/storage/file-client.go index 3bcbd634..dde12315 100644 --- a/storage/file-client.go +++ b/storage/file-client.go @@ -141,6 +141,5 @@ func (fs *fileClientImpl) OpenTorrent( return TorrentImpl{ Piece: t.Piece, Close: t.Close, - Flush: t.Flush, }, nil } diff --git a/storage/file-io-classic.go b/storage/file-io-classic.go index 9a36ae9f..bb54e5a7 100644 --- a/storage/file-io-classic.go +++ b/storage/file-io-classic.go @@ -6,19 +6,15 @@ import ( type classicFileIo struct{} -func (i classicFileIo) openForSharedRead(name string) (sharedFileIf, error) { - return sharedFiles.Open(name) -} - -type classicFileReader struct { - *os.File +func (me classicFileIo) flush(name string, offset, nbytes int64) error { + return fsync(name) } -func (c classicFileReader) seekData(offset int64) (ret int64, err error) { - return seekData(c.File, offset) +func (me classicFileIo) openForSharedRead(name string) (sharedFileIf, error) { + return sharedFiles.Open(name) } -func (i classicFileIo) openForRead(name string) (fileReader, error) { +func (me classicFileIo) openForRead(name string) (fileReader, error) { f, err := os.Open(name) return classicFileReader{f}, err } @@ -26,3 +22,11 @@ func (i classicFileIo) openForRead(name string) (fileReader, error) { func (classicFileIo) openForWrite(p string, size int64) (f fileWriter, err error) { return openFileExtra(p, os.O_WRONLY) } + +type classicFileReader struct { + *os.File +} + +func (c classicFileReader) seekData(offset int64) (ret int64, err error) { + return seekData(c.File, offset) +} diff --git a/storage/file-io-mmap.go b/storage/file-io-mmap.go index af070cb3..ed181f3c 100644 --- a/storage/file-io-mmap.go +++ b/storage/file-io-mmap.go @@ -14,10 +14,25 @@ import ( ) type mmapFileIo struct { - mu sync.Mutex + mu sync.RWMutex paths map[string]*fileMmap } +func (me *mmapFileIo) flush(name string, offset, nbytes int64) error { + // Since we are only flushing writes that we created, and we don't currently unmap files after + // we've opened them, then if the mmap doesn't exist yet then there's nothing to flush. + me.mu.RLock() + defer me.mu.RUnlock() + v, ok := me.paths[name] + if !ok { + return nil + } + if !v.writable { + return nil + } + return msync(v.m, int(offset), int(nbytes)) +} + type fileMmap struct { m mmap.MMap writable bool diff --git a/storage/file-io.go b/storage/file-io.go index a8962e19..f7c4495e 100644 --- a/storage/file-io.go +++ b/storage/file-io.go @@ -21,4 +21,5 @@ type fileIo interface { openForSharedRead(name string) (sharedFileIf, error) openForRead(name string) (fileReader, error) openForWrite(name string, size int64) (fileWriter, error) + flush(name string, offset, nbytes int64) error } diff --git a/storage/file-misc.go b/storage/file-misc.go index 16a80e29..73396200 100644 --- a/storage/file-misc.go +++ b/storage/file-misc.go @@ -1,9 +1,7 @@ package storage import ( - "errors" "io" - "io/fs" "os" "path/filepath" "sync" @@ -31,7 +29,7 @@ func minFileLengthsForTorrentExtent( func fsync(filePath string) (err error) { f, err := os.OpenFile(filePath, os.O_WRONLY, filePerm) - if err != nil && !errors.Is(err, fs.ErrNotExist) { + if err != nil { return } defer f.Close() diff --git a/storage/file-piece.go b/storage/file-piece.go index 080b3b64..76ec477e 100644 --- a/storage/file-piece.go +++ b/storage/file-piece.go @@ -29,8 +29,22 @@ var _ interface { PieceImpl //PieceReaderer io.WriterTo + Flusher } = (*filePieceImpl)(nil) +func (me *filePieceImpl) Flush() (err error) { + for fileIndex, extent := range me.fileExtents() { + file := me.t.file(fileIndex) + name := me.t.pathForWrite(&file) + err1 := me.t.io.flush(name, extent.Start, extent.Length) + if err1 != nil { + err = errors.Join(err, fmt.Errorf("flushing %q: %w", name, err1)) + return + } + } + return nil +} + func (me *filePieceImpl) logger() *slog.Logger { return me.t.client.opts.Logger } @@ -46,9 +60,13 @@ func (me *filePieceImpl) extent() segments.Extent { } } +func (me *filePieceImpl) fileExtents() iter.Seq2[int, segments.Extent] { + return me.t.segmentLocater.LocateIter(me.extent()) +} + func (me *filePieceImpl) pieceFiles() iter.Seq[file] { return func(yield func(file) bool) { - for fileIndex := range me.t.segmentLocater.LocateIter(me.extent()) { + for fileIndex := range me.fileExtents() { f := me.t.file(fileIndex) if !yield(f) { return diff --git a/storage/file-torrent.go b/storage/file-torrent.go index 54237a70..fac6ab7f 100644 --- a/storage/file-torrent.go +++ b/storage/file-torrent.go @@ -115,17 +115,6 @@ func (fs *fileTorrentImpl) Close() error { return nil } -func (fts *fileTorrentImpl) Flush() error { - for i := range fts.files { - f := fts.file(i) - fts.logger().Debug("flushing", "file.safeOsPath", f.safeOsPath) - if err := fsync(fts.pathForWrite(&f)); err != nil { - return err - } - } - return nil -} - func (fts *fileTorrentImpl) file(index int) file { return file{ Info: fts.info, diff --git a/storage/interface.go b/storage/interface.go index 9aadfc8a..a5acc56f 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -32,7 +32,6 @@ type TorrentImpl struct { // Preferred over PieceWithHash. Called with the piece hash if it's available. PieceWithHash func(p metainfo.Piece, pieceHash g.Option[[]byte]) PieceImpl Close func() error - Flush func() error // Storages that share the same space, will provide equal pointers. The function is called once // to determine the storage for torrents sharing the same function pointer, and mutated in // place. @@ -64,6 +63,10 @@ type PieceImpl interface { Completion() Completion } +type Flusher interface { + Flush() error +} + // Completion state of a piece. type Completion struct { Err error diff --git a/storage/mmap.go b/storage/mmap.go index 95879276..6b8478d8 100644 --- a/storage/mmap.go +++ b/storage/mmap.go @@ -46,7 +46,7 @@ func (s *mmapClientImpl) OpenTorrent( span: span, pc: s.pc, } - return TorrentImpl{Piece: t.Piece, Close: t.Close, Flush: t.Flush}, err + return TorrentImpl{Piece: t.Piece, Close: t.Close}, err } func (s *mmapClientImpl) Close() error { @@ -61,9 +61,8 @@ type mmapTorrentStorage struct { func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) PieceImpl { return mmapStoragePiece{ - pc: ts.pc, + t: ts, p: p, - ih: ts.infoHash, ReaderAt: io.NewSectionReader(ts.span, p.Offset(), p.Length()), WriterAt: missinggo.NewSectionWriter(ts.span, p.Offset(), p.Length()), } @@ -73,28 +72,27 @@ func (ts *mmapTorrentStorage) Close() error { return ts.span.Close() } -func (ts *mmapTorrentStorage) Flush() error { - errs := ts.span.Flush() - if len(errs) > 0 { - return errs[0] - } - return nil -} - type mmapStoragePiece struct { - pc PieceCompletionGetSetter - p metainfo.Piece - ih metainfo.Hash + t *mmapTorrentStorage + p metainfo.Piece io.ReaderAt io.WriterAt } +var _ Flusher = mmapStoragePiece{} + +func (me mmapStoragePiece) Flush() error { + // TODO: Flush just the regions of the files we care about. At least this is no worse than it + // was previously. + return me.t.span.Flush() +} + func (me mmapStoragePiece) pieceKey() metainfo.PieceKey { - return metainfo.PieceKey{me.ih, me.p.Index()} + return metainfo.PieceKey{me.t.infoHash, me.p.Index()} } func (sp mmapStoragePiece) Completion() Completion { - c, err := sp.pc.Get(sp.pieceKey()) + c, err := sp.t.pc.Get(sp.pieceKey()) if err != nil { panic(err) } @@ -102,11 +100,11 @@ func (sp mmapStoragePiece) Completion() Completion { } func (sp mmapStoragePiece) MarkComplete() error { - return sp.pc.Set(sp.pieceKey(), true) + return sp.t.pc.Set(sp.pieceKey(), true) } func (sp mmapStoragePiece) MarkNotComplete() error { - return sp.pc.Set(sp.pieceKey(), false) + return sp.t.pc.Set(sp.pieceKey(), false) } func mMapTorrent(md *metainfo.Info, location string) (mms *mmapSpan.MMapSpan, err error) { diff --git a/storage/seek-data_windows.go b/storage/seek-data_windows.go deleted file mode 100644 index ccfe202f..00000000 --- a/storage/seek-data_windows.go +++ /dev/null @@ -1,10 +0,0 @@ -package storage - -import ( - "io" - "os" -) - -func seekData(f *os.File, offset int64) (ret int64, err error) { - return f.Seek(offset, io.SeekStart) -} diff --git a/storage/seek-data_unix.go b/storage/sys_unix.go similarity index 69% rename from storage/seek-data_unix.go rename to storage/sys_unix.go index 35e0dee0..625f420f 100644 --- a/storage/seek-data_unix.go +++ b/storage/sys_unix.go @@ -6,6 +6,7 @@ import ( "io" "os" + "github.com/edsrzf/mmap-go" "golang.org/x/sys/unix" ) @@ -17,3 +18,7 @@ func seekData(f *os.File, offset int64) (ret int64, err error) { } return } + +func msync(mm mmap.MMap, offset, nbytes int) error { + return unix.Msync(mm[offset:offset+nbytes], unix.MS_SYNC) +} diff --git a/storage/sys_windows.go b/storage/sys_windows.go new file mode 100644 index 00000000..10ce40f8 --- /dev/null +++ b/storage/sys_windows.go @@ -0,0 +1,17 @@ +package storage + +import ( + "io" + "os" + + "github.com/edsrzf/mmap-go" +) + +func seekData(f *os.File, offset int64) (ret int64, err error) { + return f.Seek(offset, io.SeekStart) +} + +func msync(mm mmap.MMap, offset, nbytes int) error { + // Fuck you Windows you suck. TODO: Use windows.FlushViewOfFile. + return mm.Flush() +} diff --git a/storage/wrappers.go b/storage/wrappers.go index 6d7ee64f..22e3812d 100644 --- a/storage/wrappers.go +++ b/storage/wrappers.go @@ -144,6 +144,13 @@ func (p Piece) NewReader() (PieceReader, error) { }, nil } +func (p Piece) Flush() error { + if fl, ok := p.PieceImpl.(Flusher); ok { + return fl.Flush() + } + return nil +} + type nopCloser struct{} func (nopCloser) Close() error { diff --git a/time_test.go b/time_test.go new file mode 100644 index 00000000..10cbafc7 --- /dev/null +++ b/time_test.go @@ -0,0 +1 @@ +package torrent diff --git a/torrent.go b/torrent.go index 6ba87218..e3516f20 100644 --- a/torrent.go +++ b/torrent.go @@ -2558,7 +2558,12 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { hasDirty := p.hasDirtyChunks() t.cl.unlock() if hasDirty { - p.Flush() // You can be synchronous here! + // This could return fs.ErrNotExist, and that would be unexpected since we haven't + // marked it complete yet, and nobody should have moved it. + err := p.Flush() // You can be synchronous here! + if err != nil { + t.slogger().Warn("error flushing piece storage", "piece", piece, "err", err) + } } p.race++ err := p.Storage().MarkComplete() -- 2.51.0