]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add file handle caching
authorMatt Joiner <anacrolix@gmail.com>
Mon, 26 May 2025 06:51:32 +0000 (16:51 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 26 May 2025 06:51:32 +0000 (16:51 +1000)
storage/file-handle-cache.go
storage/file-piece.go
storage/file-torrent-io.go
storage/file-torrent.go

index 82be0547ed04672ecc607b84eae202ce663dfdfb..97c1506524c91009a74eb405702d5981a84fae25 100644 (file)
@@ -1 +1,108 @@
 package storage
+
+import (
+       "cmp"
+       "expvar"
+       "fmt"
+       "io"
+       "maps"
+       "net/http"
+       "os"
+       "slices"
+       "sync"
+       "sync/atomic"
+)
+
+var (
+       sharedFiles = sharedFilesType{
+               m: make(map[string]*sharedFile),
+       }
+)
+
+func init() {
+       http.HandleFunc("/debug/shared-files", func(w http.ResponseWriter, r *http.Request) {
+               sharedFiles.mu.Lock()
+               defer sharedFiles.mu.Unlock()
+               byRefs := slices.SortedFunc(maps.Keys(sharedFiles.m), func(a, b string) int {
+                       return cmp.Or(
+                               sharedFiles.m[b].refs-sharedFiles.m[a].refs,
+                               cmp.Compare(a, b))
+               })
+               for _, key := range byRefs {
+                       sf := sharedFiles.m[key]
+                       fmt.Fprintf(w, "%v: refs=%v, name=%v\n", key, sf.refs, sf.f.Name())
+               }
+       })
+}
+
+type sharedFilesType struct {
+       mu sync.Mutex
+       m  map[string]*sharedFile
+}
+
+// How many opens wouldn't have been needed with singleflight.
+var sharedFilesWastedOpens = expvar.NewInt("sharedFilesWastedOpens")
+
+func (me *sharedFilesType) Open(name string) (ret *sharedFileRef, err error) {
+       me.mu.Lock()
+       sf, ok := me.m[name]
+       if !ok {
+               me.mu.Unlock()
+               // Can singleflight here...
+               var f *os.File
+               f, err = os.Open(name)
+               if err != nil {
+                       return
+               }
+               me.mu.Lock()
+               sf, ok = me.m[name]
+               if ok {
+                       sharedFilesWastedOpens.Add(1)
+                       f.Close()
+               } else {
+                       sf = &sharedFile{pool: me, f: f}
+                       me.m[name] = sf
+               }
+       }
+       ret = sf.newRef()
+       me.mu.Unlock()
+       return
+}
+
+type sharedFile struct {
+       pool *sharedFilesType
+       f    *os.File
+       // Could do this with weakrefs... Wonder if it works well with OS resources like that.
+       refs int
+}
+
+func (me *sharedFile) newRef() *sharedFileRef {
+       me.refs++
+       return &sharedFileRef{
+               sf:      me,
+               inherit: me.f,
+       }
+}
+
+type inherit interface {
+       io.ReaderAt
+}
+
+type sharedFileRef struct {
+       // Only methods that are safe for concurrent use.
+       inherit
+       sf     *sharedFile
+       closed atomic.Bool
+}
+
+func (me *sharedFileRef) Close() (err error) {
+       if !me.closed.CompareAndSwap(false, true) {
+               return
+       }
+       me.inherit = nil
+       me.sf.pool.mu.Lock()
+       me.sf.refs--
+       me.sf.pool.mu.Unlock()
+       me.sf = nil
+       return
+}
index 24e28f7f082a18afea9547c81d1b9901ec3aa783..d3e7b5ca672722b2f3c30d7dbac58fa685ecc6be 100644 (file)
@@ -21,7 +21,10 @@ type filePieceImpl struct {
        io.ReaderAt
 }
 
-var _ PieceImpl = (*filePieceImpl)(nil)
+var _ interface {
+       PieceImpl
+       //PieceReaderer
+} = (*filePieceImpl)(nil)
 
 func (me *filePieceImpl) logger() *slog.Logger {
        return me.t.client.opts.Logger
@@ -66,6 +69,7 @@ func (me *filePieceImpl) Completion() Completion {
                        file := me.t.files[i]
                        s, err := os.Stat(file.partFilePath())
                        if errors.Is(err, fs.ErrNotExist) {
+                               // Can we use shared files for this? Is it faster?
                                s, err = os.Stat(file.safeOsPath)
                        }
                        if err != nil {
@@ -151,10 +155,8 @@ func (me *filePieceImpl) MarkNotComplete() (err error) {
 
 func (me *filePieceImpl) promotePartFile(f file) (err error) {
        if me.partFiles() {
-               err = os.Rename(f.partFilePath(), f.safeOsPath)
-               // If we get ENOENT, the file may already be in the final location.
-               if err != nil && !errors.Is(err, fs.ErrNotExist) {
-                       err = fmt.Errorf("renaming part file: %w", err)
+               err = me.exclRenameIfExists(f.partFilePath(), f.safeOsPath)
+               if err != nil {
                        return
                }
        }
@@ -184,7 +186,11 @@ func (me *filePieceImpl) exclRenameIfExists(from, to string) (err error) {
                return fmt.Errorf("error creating destination file: %w", err)
        }
        f.Close()
-       return os.Rename(from, to)
+       err = os.Rename(from, to)
+       if err == nil {
+               fmt.Printf("renamed %v -> %v\n", from, to)
+       }
+       return
 }
 
 func (me *filePieceImpl) onFileNotComplete(f file) (err error) {
@@ -216,3 +222,9 @@ func (me *filePieceImpl) pathForWrite(f file) string {
 func (me *filePieceImpl) partFiles() bool {
        return me.t.partFiles()
 }
+
+//
+//// TODO: Just implement StorageReader already.
+//func (me *filePieceImpl) NewReader() (PieceReader, error) {
+//
+//}
index ef6e949af38c32c3b8c55e5dbc5096dce333350e..96e1acd09e445d6957f2e29fed7c9c7230872561 100644 (file)
@@ -17,12 +17,18 @@ type fileTorrentImplIO struct {
 
 // Returns EOF on short or missing file.
 func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
-       var f *os.File
+       fst.fts.logger().Debug("readFileAt", "file.safeOsPath", file.safeOsPath)
+       var f interface {
+               io.ReaderAt
+               io.Closer
+       }
+       // Fine to open once under each name on a unix system. We could make the shared file keys more
+       // constrained but it shouldn't matter. TODO: Ensure at most one of the names exist.
        if fst.fts.partFiles() {
-               f, err = os.Open(file.partFilePath())
+               f, err = sharedFiles.Open(file.partFilePath())
        }
        if err == nil && f == nil || errors.Is(err, fs.ErrNotExist) {
-               f, err = os.Open(file.safeOsPath)
+               f, err = sharedFiles.Open(file.safeOsPath)
        }
        if errors.Is(err, fs.ErrNotExist) {
                // File missing is treated the same as a short file. Should we propagate this through the
@@ -67,6 +73,8 @@ func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
 }
 
 func (fst fileTorrentImplIO) openForWrite(file file) (f *os.File, err error) {
+       // It might be possible to have a writable handle shared files cache if we need it.
+       fst.fts.logger().Debug("openForWrite", "file.safeOsPath", file.safeOsPath)
        p := fst.fts.pathForWrite(file)
        f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm)
        if err == nil {
index b08c2dca975bab8e598e0a5bba47917392dbef6f..09caa255323e4447962f31233e1a7c39886b9610 100644 (file)
@@ -107,6 +107,7 @@ func (fs *fileTorrentImpl) Close() error {
 
 func (fts *fileTorrentImpl) Flush() error {
        for _, f := range fts.files {
+               fts.logger().Debug("flushing", "file.safeOsPath", f.safeOsPath)
                if err := fsync(fts.pathForWrite(f)); err != nil {
                        return err
                }