package storage
+
+import (
+ "cmp"
+ "expvar"
+ "fmt"
+ "io"
+ "maps"
+ "net/http"
+ "os"
+ "slices"
+ "sync"
+ "sync/atomic"
+)
+
+var (
+ sharedFiles = sharedFilesType{
+ m: make(map[string]*sharedFile),
+ }
+)
+
+func init() {
+ http.HandleFunc("/debug/shared-files", func(w http.ResponseWriter, r *http.Request) {
+ sharedFiles.mu.Lock()
+ defer sharedFiles.mu.Unlock()
+ byRefs := slices.SortedFunc(maps.Keys(sharedFiles.m), func(a, b string) int {
+ return cmp.Or(
+ sharedFiles.m[b].refs-sharedFiles.m[a].refs,
+ cmp.Compare(a, b))
+ })
+ for _, key := range byRefs {
+ sf := sharedFiles.m[key]
+ fmt.Fprintf(w, "%v: refs=%v, name=%v\n", key, sf.refs, sf.f.Name())
+ }
+ })
+}
+
+type sharedFilesType struct {
+ mu sync.Mutex
+ m map[string]*sharedFile
+}
+
+// How many opens wouldn't have been needed with singleflight.
+var sharedFilesWastedOpens = expvar.NewInt("sharedFilesWastedOpens")
+
+func (me *sharedFilesType) Open(name string) (ret *sharedFileRef, err error) {
+ me.mu.Lock()
+ sf, ok := me.m[name]
+ if !ok {
+ me.mu.Unlock()
+ // Can singleflight here...
+ var f *os.File
+ f, err = os.Open(name)
+ if err != nil {
+ return
+ }
+ me.mu.Lock()
+ sf, ok = me.m[name]
+ if ok {
+ sharedFilesWastedOpens.Add(1)
+ f.Close()
+ } else {
+ sf = &sharedFile{pool: me, f: f}
+ me.m[name] = sf
+ }
+ }
+ ret = sf.newRef()
+ me.mu.Unlock()
+ return
+}
+
+type sharedFile struct {
+ pool *sharedFilesType
+ f *os.File
+ // Could do this with weakrefs... Wonder if it works well with OS resources like that.
+ refs int
+}
+
+func (me *sharedFile) newRef() *sharedFileRef {
+ me.refs++
+ return &sharedFileRef{
+ sf: me,
+ inherit: me.f,
+ }
+}
+
+type inherit interface {
+ io.ReaderAt
+}
+
+type sharedFileRef struct {
+ // Only methods that are safe for concurrent use.
+ inherit
+ sf *sharedFile
+ closed atomic.Bool
+}
+
+func (me *sharedFileRef) Close() (err error) {
+ if !me.closed.CompareAndSwap(false, true) {
+ return
+ }
+ me.inherit = nil
+ me.sf.pool.mu.Lock()
+ me.sf.refs--
+ me.sf.pool.mu.Unlock()
+ me.sf = nil
+ return
+}
io.ReaderAt
}
-var _ PieceImpl = (*filePieceImpl)(nil)
+var _ interface {
+ PieceImpl
+ //PieceReaderer
+} = (*filePieceImpl)(nil)
func (me *filePieceImpl) logger() *slog.Logger {
return me.t.client.opts.Logger
file := me.t.files[i]
s, err := os.Stat(file.partFilePath())
if errors.Is(err, fs.ErrNotExist) {
+ // Can we use shared files for this? Is it faster?
s, err = os.Stat(file.safeOsPath)
}
if err != nil {
func (me *filePieceImpl) promotePartFile(f file) (err error) {
if me.partFiles() {
- err = os.Rename(f.partFilePath(), f.safeOsPath)
- // If we get ENOENT, the file may already be in the final location.
- if err != nil && !errors.Is(err, fs.ErrNotExist) {
- err = fmt.Errorf("renaming part file: %w", err)
+ err = me.exclRenameIfExists(f.partFilePath(), f.safeOsPath)
+ if err != nil {
return
}
}
return fmt.Errorf("error creating destination file: %w", err)
}
f.Close()
- return os.Rename(from, to)
+ err = os.Rename(from, to)
+ if err == nil {
+ fmt.Printf("renamed %v -> %v\n", from, to)
+ }
+ return
}
func (me *filePieceImpl) onFileNotComplete(f file) (err error) {
func (me *filePieceImpl) partFiles() bool {
return me.t.partFiles()
}
+
+//
+//// TODO: Just implement StorageReader already.
+//func (me *filePieceImpl) NewReader() (PieceReader, error) {
+//
+//}
// Returns EOF on short or missing file.
func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
- var f *os.File
+ fst.fts.logger().Debug("readFileAt", "file.safeOsPath", file.safeOsPath)
+ var f interface {
+ io.ReaderAt
+ io.Closer
+ }
+ // Fine to open once under each name on a unix system. We could make the shared file keys more
+ // constrained but it shouldn't matter. TODO: Ensure at most one of the names exist.
if fst.fts.partFiles() {
- f, err = os.Open(file.partFilePath())
+ f, err = sharedFiles.Open(file.partFilePath())
}
if err == nil && f == nil || errors.Is(err, fs.ErrNotExist) {
- f, err = os.Open(file.safeOsPath)
+ f, err = sharedFiles.Open(file.safeOsPath)
}
if errors.Is(err, fs.ErrNotExist) {
// File missing is treated the same as a short file. Should we propagate this through the
}
func (fst fileTorrentImplIO) openForWrite(file file) (f *os.File, err error) {
+ // It might be possible to have a writable handle shared files cache if we need it.
+ fst.fts.logger().Debug("openForWrite", "file.safeOsPath", file.safeOsPath)
p := fst.fts.pathForWrite(file)
f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm)
if err == nil {
func (fts *fileTorrentImpl) Flush() error {
for _, f := range fts.files {
+ fts.logger().Debug("flushing", "file.safeOsPath", f.safeOsPath)
if err := fsync(fts.pathForWrite(f)); err != nil {
return err
}