From: Matt Joiner Date: Mon, 26 May 2025 06:51:32 +0000 (+1000) Subject: Add file handle caching X-Git-Tag: v1.59.0~117 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=68cb87fd05f5fa62065b618200f36b7406f5b1af;p=btrtrc.git Add file handle caching --- diff --git a/storage/file-handle-cache.go b/storage/file-handle-cache.go index 82be0547..97c15065 100644 --- a/storage/file-handle-cache.go +++ b/storage/file-handle-cache.go @@ -1 +1,108 @@ package storage + +import ( + "cmp" + "expvar" + "fmt" + "io" + "maps" + "net/http" + "os" + "slices" + "sync" + "sync/atomic" +) + +var ( + sharedFiles = sharedFilesType{ + m: make(map[string]*sharedFile), + } +) + +func init() { + http.HandleFunc("/debug/shared-files", func(w http.ResponseWriter, r *http.Request) { + sharedFiles.mu.Lock() + defer sharedFiles.mu.Unlock() + byRefs := slices.SortedFunc(maps.Keys(sharedFiles.m), func(a, b string) int { + return cmp.Or( + sharedFiles.m[b].refs-sharedFiles.m[a].refs, + cmp.Compare(a, b)) + }) + for _, key := range byRefs { + sf := sharedFiles.m[key] + fmt.Fprintf(w, "%v: refs=%v, name=%v\n", key, sf.refs, sf.f.Name()) + } + }) +} + +type sharedFilesType struct { + mu sync.Mutex + m map[string]*sharedFile +} + +// How many opens wouldn't have been needed with singleflight. +var sharedFilesWastedOpens = expvar.NewInt("sharedFilesWastedOpens") + +func (me *sharedFilesType) Open(name string) (ret *sharedFileRef, err error) { + me.mu.Lock() + sf, ok := me.m[name] + if !ok { + me.mu.Unlock() + // Can singleflight here... + var f *os.File + f, err = os.Open(name) + if err != nil { + return + } + me.mu.Lock() + sf, ok = me.m[name] + if ok { + sharedFilesWastedOpens.Add(1) + f.Close() + } else { + sf = &sharedFile{pool: me, f: f} + me.m[name] = sf + } + } + ret = sf.newRef() + me.mu.Unlock() + return +} + +type sharedFile struct { + pool *sharedFilesType + f *os.File + // Could do this with weakrefs... Wonder if it works well with OS resources like that. + refs int +} + +func (me *sharedFile) newRef() *sharedFileRef { + me.refs++ + return &sharedFileRef{ + sf: me, + inherit: me.f, + } +} + +type inherit interface { + io.ReaderAt +} + +type sharedFileRef struct { + // Only methods that are safe for concurrent use. + inherit + sf *sharedFile + closed atomic.Bool +} + +func (me *sharedFileRef) Close() (err error) { + if !me.closed.CompareAndSwap(false, true) { + return + } + me.inherit = nil + me.sf.pool.mu.Lock() + me.sf.refs-- + me.sf.pool.mu.Unlock() + me.sf = nil + return +} diff --git a/storage/file-piece.go b/storage/file-piece.go index 24e28f7f..d3e7b5ca 100644 --- a/storage/file-piece.go +++ b/storage/file-piece.go @@ -21,7 +21,10 @@ type filePieceImpl struct { io.ReaderAt } -var _ PieceImpl = (*filePieceImpl)(nil) +var _ interface { + PieceImpl + //PieceReaderer +} = (*filePieceImpl)(nil) func (me *filePieceImpl) logger() *slog.Logger { return me.t.client.opts.Logger @@ -66,6 +69,7 @@ func (me *filePieceImpl) Completion() Completion { file := me.t.files[i] s, err := os.Stat(file.partFilePath()) if errors.Is(err, fs.ErrNotExist) { + // Can we use shared files for this? Is it faster? s, err = os.Stat(file.safeOsPath) } if err != nil { @@ -151,10 +155,8 @@ func (me *filePieceImpl) MarkNotComplete() (err error) { func (me *filePieceImpl) promotePartFile(f file) (err error) { if me.partFiles() { - err = os.Rename(f.partFilePath(), f.safeOsPath) - // If we get ENOENT, the file may already be in the final location. - if err != nil && !errors.Is(err, fs.ErrNotExist) { - err = fmt.Errorf("renaming part file: %w", err) + err = me.exclRenameIfExists(f.partFilePath(), f.safeOsPath) + if err != nil { return } } @@ -184,7 +186,11 @@ func (me *filePieceImpl) exclRenameIfExists(from, to string) (err error) { return fmt.Errorf("error creating destination file: %w", err) } f.Close() - return os.Rename(from, to) + err = os.Rename(from, to) + if err == nil { + fmt.Printf("renamed %v -> %v\n", from, to) + } + return } func (me *filePieceImpl) onFileNotComplete(f file) (err error) { @@ -216,3 +222,9 @@ func (me *filePieceImpl) pathForWrite(f file) string { func (me *filePieceImpl) partFiles() bool { return me.t.partFiles() } + +// +//// TODO: Just implement StorageReader already. +//func (me *filePieceImpl) NewReader() (PieceReader, error) { +// +//} diff --git a/storage/file-torrent-io.go b/storage/file-torrent-io.go index ef6e949a..96e1acd0 100644 --- a/storage/file-torrent-io.go +++ b/storage/file-torrent-io.go @@ -17,12 +17,18 @@ type fileTorrentImplIO struct { // Returns EOF on short or missing file. func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) { - var f *os.File + fst.fts.logger().Debug("readFileAt", "file.safeOsPath", file.safeOsPath) + var f interface { + io.ReaderAt + io.Closer + } + // Fine to open once under each name on a unix system. We could make the shared file keys more + // constrained but it shouldn't matter. TODO: Ensure at most one of the names exist. if fst.fts.partFiles() { - f, err = os.Open(file.partFilePath()) + f, err = sharedFiles.Open(file.partFilePath()) } if err == nil && f == nil || errors.Is(err, fs.ErrNotExist) { - f, err = os.Open(file.safeOsPath) + f, err = sharedFiles.Open(file.safeOsPath) } if errors.Is(err, fs.ErrNotExist) { // File missing is treated the same as a short file. Should we propagate this through the @@ -67,6 +73,8 @@ func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) { } func (fst fileTorrentImplIO) openForWrite(file file) (f *os.File, err error) { + // It might be possible to have a writable handle shared files cache if we need it. + fst.fts.logger().Debug("openForWrite", "file.safeOsPath", file.safeOsPath) p := fst.fts.pathForWrite(file) f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm) if err == nil { diff --git a/storage/file-torrent.go b/storage/file-torrent.go index b08c2dca..09caa255 100644 --- a/storage/file-torrent.go +++ b/storage/file-torrent.go @@ -107,6 +107,7 @@ func (fs *fileTorrentImpl) Close() error { func (fts *fileTorrentImpl) Flush() error { for _, f := range fts.files { + fts.logger().Debug("flushing", "file.safeOsPath", f.safeOsPath) if err := fsync(fts.pathForWrite(f)); err != nil { return err }