go.mod | 2 +- go.sum | 4 ++-- storage/file-handle-cache.go | 9 ++++++++- storage/file-io-classic.go | 11 +++++++++++ storage/file-io-mmap.go | 81 +++++++++++++++++++++++++++++++++++++++++++++++------ storage/file-io.go | 1 + storage/file-misc.go | 2 +- storage/file-piece.go | 2 +- diff --git a/go.mod b/go.mod index 48275a3b6580eaf49d3637f9038619ed78af0c75..f77855c390d158532edda633e20d9063a4c44455 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ github.com/anacrolix/missinggo/v2 v2.10.0 github.com/anacrolix/multiless v0.4.0 github.com/anacrolix/possum/go v0.4.1-0.20250821022006-9d91a37b5d3d github.com/anacrolix/squirrel v0.6.4 - github.com/anacrolix/sync v0.5.4 + github.com/anacrolix/sync v0.5.5-0.20251119100342-d78dd1f686f1 github.com/anacrolix/tagflag v1.3.0 github.com/anacrolix/upnp v0.1.4 github.com/anacrolix/utp v0.1.0 diff --git a/go.sum b/go.sum index e7f1ec0bbd18d63773ec2e73f02e216aac7e7ee9..e676a0b7df69e885777e7eea9ca027d300040ef9 100644 --- a/go.sum +++ b/go.sum @@ -123,8 +123,8 @@ github.com/anacrolix/stm v0.5.0 h1:9df1KBpttF0TzLgDq51Z+TEabZKMythqgx89f1FQJt8= github.com/anacrolix/stm v0.5.0/go.mod h1:MOwrSy+jCm8Y7HYfMAwPj7qWVu7XoVvjOiYwJmpeB/M= github.com/anacrolix/sync v0.0.0-20180808010631-44578de4e778/go.mod h1:s735Etp3joe/voe2sdaXLcqDdJSay1O0OPnM0ystjqk= github.com/anacrolix/sync v0.3.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g= -github.com/anacrolix/sync v0.5.4 h1:yXZLIjXh/G+Rh2mYGCAPmszmF/fvEPadDy7/pPChpKM= -github.com/anacrolix/sync v0.5.4/go.mod h1:21cUWerw9eiu/3T3kyoChu37AVO+YFue1/H15qqubS0= +github.com/anacrolix/sync v0.5.5-0.20251119100342-d78dd1f686f1 h1:oLCfNgEOR3/Z98mSwmwTM1pcqCDb/1zIjxCNn7dzVaE= +github.com/anacrolix/sync v0.5.5-0.20251119100342-d78dd1f686f1/go.mod h1:21cUWerw9eiu/3T3kyoChu37AVO+YFue1/H15qqubS0= github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8= diff --git a/storage/file-handle-cache.go b/storage/file-handle-cache.go index dbc3354bd2a91f81b097107f28b8131a2ffb90e2..6295f87dc0f97496b1145478cf7ef0a1db32186d 100644 --- a/storage/file-handle-cache.go +++ b/storage/file-handle-cache.go @@ -21,7 +21,13 @@ ) type regularFsSharedFiles struct{} -func (r regularFsSharedFiles) Open(name string) (sharedFileIf, error) { +func (me regularFsSharedFiles) CloseAll(name string) error { + // We don't track regular file handles, we will assume the client is synchronizing things + // correctly so that an open handle won't be around. + return nil +} + +func (me regularFsSharedFiles) Open(name string) (sharedFileIf, error) { return os.Open(name) } @@ -32,6 +38,7 @@ } type sharedFilesInterface interface { Open(name string) (sharedFileIf, error) + CloseAll(name string) error } func init() { diff --git a/storage/file-io-classic.go b/storage/file-io-classic.go index 56c5e9e19b5f352aa6d44c1a2872add2c5a0ffdf..7da9889c6614e1fd4f32830736af311363f9c0f5 100644 --- a/storage/file-io-classic.go +++ b/storage/file-io-classic.go @@ -1,11 +1,22 @@ package storage import ( + "errors" "io" "os" ) type classicFileIo struct{} + +func (me classicFileIo) rename(from, to string) error { + a := sharedFiles.CloseAll(from) + b := sharedFiles.CloseAll(to) + c := os.Rename(from, to) + if c != nil { + return errors.Join(a, b, c) + } + return nil +} func (me classicFileIo) flush(name string, offset, nbytes int64) error { return fsync(name) diff --git a/storage/file-io-mmap.go b/storage/file-io-mmap.go index af4642c12020c38aec8b197c9f2caa5e3565d91f..8c9c27aeda3082003c1b8b08009d960ac3df7fac 100644 --- a/storage/file-io-mmap.go +++ b/storage/file-io-mmap.go @@ -5,16 +5,20 @@ import ( "errors" "fmt" + "github.com/anacrolix/sync" "io" "io/fs" "os" - "sync" "sync/atomic" g "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2/panicif" "github.com/edsrzf/mmap-go" ) + +// Lock uses of shared handles, instead of having a lifetime RLock. Because sync.RWMutex is not safe +// for recursive RLocks, you can't have both. +const lockHandleOperations = false func init() { s, ok := os.LookupEnv("TORRENT_STORAGE_DEFAULT_FILE_IO") @@ -39,10 +43,31 @@ } } type mmapFileIo struct { - mu sync.RWMutex + mu sync.RWMutex + // We could automatically expire fileMmaps by using weak.Pointers? Currently the store never + // relinquishes its extra ref so we never clean up anyway. paths map[string]*fileMmap } +func (me *mmapFileIo) rename(from, to string) (err error) { + me.mu.Lock() + defer me.mu.Unlock() + me.close(from) + me.close(to) + return os.Rename(from, to) +} + +func (me *mmapFileIo) close(name string) { + v, ok := me.paths[name] + if ok { + // We're forcibly closing the handle. Leave the store's ref intact so we're the only one + // that closes it, then delete it anyway. We must be holding the IO context lock to be doing + // this if we're not using operation locks. + panicif.Err(v.close()) + g.MustDelete(me.paths, name) + } +} + func (me *mmapFileIo) flush(name string, offset, nbytes int64) error { // Since we are only flushing writes that we created, and we don't currently unmap files after // we've opened them, then if the mmap doesn't exist yet then there's nothing to flush. @@ -59,7 +84,10 @@ // Darwin doesn't have sync for file-offsets?! return msync(v.m, int(offset), int(nbytes)) } +// Shared file access. type fileMmap struct { + // Read lock held for each handle. Write lock taken for destructive action like close. + mu sync.RWMutex m mmap.MMap f *os.File refs atomic.Int32 @@ -74,6 +102,14 @@ return nil } func (me *fileMmap) close() (err error) { + // I can't see any way to avoid this. We need to forcibly alter the actual state of the handle + // underneath other consumers to kick them off. Additionally, we need to exclude users of its raw + // file descriptor. This is a potential deadlock zone if handles have lifetimes that escape the + // file storage implementation (like with NewReader, which don't provide for it). + me.mu.Lock() + defer me.mu.Unlock() + // There's no double-close protection here. Not sure if that's an issue. Probably not since we + // don't evict the store's reference anywhere for now. return errors.Join(me.m.Unmap(), me.f.Close()) } @@ -122,9 +158,9 @@ defer me.mu.Unlock() v, ok := me.paths[name] if ok { if int64(len(v.m)) == size && v.writable { - v.inc() return newMmapFile(v), nil } else { + // Drop the cache ref. We aren't presuming to require it to be closed here, hmm... v.dec() g.MustDelete(me.paths, name) } @@ -160,8 +196,18 @@ return newMmapFile(me.addNewMmap(name, mm, true, f)), nil } func newMmapFile(f *fileMmap) *mmapSharedFileHandle { + if !lockHandleOperations { + // This can't fail because we have to be holding the IO context lock to be here. + panicif.False(f.mu.TryRLock()) + } ret := &mmapSharedFileHandle{ f: f, + close: sync.OnceValue[error](func() error { + if !lockHandleOperations { + f.mu.RUnlock() + } + return f.dec() + }), } ret.f.inc() return ret @@ -184,7 +230,7 @@ var _ fileIo = (*mmapFileIo)(nil) type mmapSharedFileHandle struct { f *fileMmap - close sync.Once + close func() error } func (me *mmapSharedFileHandle) WriteAt(p []byte, off int64) (n int, err error) { @@ -207,11 +253,8 @@ } return } -func (me *mmapSharedFileHandle) Close() (err error) { - me.close.Do(func() { - err = me.f.dec() - }) - return +func (me *mmapSharedFileHandle) Close() error { + return me.close() } type mmapFileHandle struct { @@ -231,13 +274,22 @@ return } func (me *mmapFileHandle) writeToN(w io.Writer, n int64) (written int64, err error) { + mu := &me.shared.f.mu + // If this panics we need a close error. + if lockHandleOperations { + mu.RLock() + } b := me.shared.f.m + panicif.Nil(b) // It's been closed and we need to signal that. if me.pos >= int64(len(b)) { return } b = b[me.pos:] b = b[:min(int64(len(b)), n)] i, err := w.Write(b) + if lockHandleOperations { + mu.RUnlock() + } written = int64(i) me.pos += written return @@ -264,7 +316,18 @@ func (me *mmapFileHandle) seekDataOrEof(offset int64) (ret int64, err error) { // This should be fine as it's an atomic operation, on a shared file handle, so nobody will be // relying non-atomic operations on the file. TODO: Does this require msync first so we don't // skip our own writes. + + // We do need to protect the file descriptor as that's not synchronized outside os.File. If + // it's already closed before we call this, that's fine, we'll get EBADF. Don't recursively + // RLock here if we're RLocking at the reference level. + mu := &me.shared.f.mu + if lockHandleOperations { + mu.RLock() + } ret, err = seekData(me.shared.f.f, offset) + if lockHandleOperations { + mu.RUnlock() + } if err == nil { me.pos = ret } else if err == io.EOF { diff --git a/storage/file-io.go b/storage/file-io.go index 437ccc9570e5c3ff4fefa3de2c990a6c2e1fa948..5cd33a29e97d3a751e9f2dd6610dea60026a382c 100644 --- a/storage/file-io.go +++ b/storage/file-io.go @@ -21,4 +21,5 @@ openForSharedRead(name string) (sharedFileIf, error) openForRead(name string) (fileReader, error) openForWrite(name string, size int64) (fileWriter, error) flush(name string, offset, nbytes int64) error + rename(from, to string) error } diff --git a/storage/file-misc.go b/storage/file-misc.go index 73396200bd0c8d5567d89ea30493be940a35e463..9d6ff33f0baa693b4ae5a0b092c7bc5f6b934839 100644 --- a/storage/file-misc.go +++ b/storage/file-misc.go @@ -4,8 +4,8 @@ import ( "io" "os" "path/filepath" - "sync" + "github.com/anacrolix/sync" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/segments" ) diff --git a/storage/file-piece.go b/storage/file-piece.go index 57801f0516a446fe020275b2e550a08dd3894e7c..19e4498d29c2a70428d3c123642865d3d6aaba2e 100644 --- a/storage/file-piece.go +++ b/storage/file-piece.go @@ -266,7 +266,7 @@ } // Rename from if exists, and if so, to must not exist. func (me *filePieceImpl) exclRenameIfExists(from, to string) (renamed bool, err error) { - err = os.Rename(from, to) + err = me.t.io.rename(from, to) if err != nil { if errors.Is(err, fs.ErrNotExist) { err = nil