From 33aca73bb941012760c36806f168924690393ff4 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Mon, 28 Nov 2022 17:32:31 +0300 Subject: [PATCH] Per-file locks --- storage/file.go | 72 +++++++++++++++++++++++++++++-------------------- 1 file changed, 43 insertions(+), 29 deletions(-) diff --git a/storage/file.go b/storage/file.go index 674a506b..7a5bc33d 100644 --- a/storage/file.go +++ b/storage/file.go @@ -15,30 +15,35 @@ import ( "github.com/anacrolix/torrent/segments" ) +type fdCacheEntry struct { + fd *os.File + sync.Mutex +} + var ( - fdRCache = map[string]*os.File{} + fdRCache = map[string]*fdCacheEntry{} fdRCacheM sync.Mutex - fdWCache = map[string]*os.File{} + fdWCache = map[string]*fdCacheEntry{} fdWCacheM sync.Mutex fdMkdirAllCache = map[string]struct{}{} fdCacheCleanerM sync.Once ) func fdCacheCleaner() { - for range time.Tick(10 * time.Second) { - fdRCacheM.Lock() - for _, v := range fdRCache { - v.Close() - } - fdRCache = make(map[string]*os.File) - fdRCacheM.Unlock() - - fdWCacheM.Lock() - for _, v := range fdWCache { - v.Close() + cleaner := func(c *map[string]*fdCacheEntry, m *sync.Mutex) { + m.Lock() + prev := fdRCache + *c = make(map[string]*fdCacheEntry) + m.Unlock() + for _, v := range prev { + v.Lock() + v.fd.Close() + v.Unlock() } - fdWCache = make(map[string]*os.File) - fdWCacheM.Unlock() + } + for range time.Tick(10 * time.Second) { + cleaner(&fdRCache, &fdRCacheM) + cleaner(&fdWCache, &fdWCacheM) } } @@ -173,25 +178,30 @@ type fileTorrentImplIO struct { // Returns EOF on short or missing file. func (fst *fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) { fdRCacheM.Lock() - defer fdRCacheM.Unlock() - f := fdRCache[file.path] - if f == nil { - f, err = os.Open(file.path) + centry := fdRCache[file.path] + if centry == nil { + var fd *os.File + fd, err = os.Open(file.path) if os.IsNotExist(err) { // File missing is treated the same as a short file. err = io.EOF - return } if err != nil { + fdRCacheM.Unlock() return } - fdRCache[file.path] = f + centry = &fdCacheEntry{fd: fd} + fdRCache[file.path] = centry } + fdRCacheM.Unlock() // Limit the read to within the expected bounds of this file. if int64(len(b)) > file.length-off { b = b[:file.length-off] } - return f.ReadAt(b, off) + centry.Lock() + n, err = centry.fd.ReadAt(b, off) + centry.Unlock() + return } // Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF. @@ -218,18 +228,22 @@ func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) { fdMkdirAllCache[filepath.Dir(name)] = struct{}{} } fdWCacheM.Lock() - defer fdWCacheM.Unlock() - f := fdWCache[name] - if f == nil { - f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666) + centry := fdWCache[name] + if centry == nil { + var fd *os.File + fd, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666) if err != nil { + fdWCacheM.Unlock() return false } - fdWCache[name] = f + centry = &fdCacheEntry{fd: fd} + fdWCache[name] = centry } + fdWCacheM.Unlock() var n1 int - n1, err = f.WriteAt(p[:e.Length], e.Start) - // log.Printf("%v %v wrote %v: %v", i, e, n1, err) + centry.Lock() + n1, err = centry.fd.WriteAt(p[:e.Length], e.Start) + centry.Unlock() n += n1 p = p[n1:] if err == nil && int64(n1) != e.Length { -- 2.44.0