From b4e1f9f06821f0b79e5587018d2761681ee9e0f1 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Mon, 28 Nov 2022 16:57:00 +0300 Subject: [PATCH] Linear IO and cached FDs --- cmd/btrtrc/README | 3 ++ storage/file.go | 89 +++++++++++++++++++++++++++++++---------------- 2 files changed, 62 insertions(+), 30 deletions(-) diff --git a/cmd/btrtrc/README b/cmd/btrtrc/README index 89c20ad8..d485409d 100644 --- a/cmd/btrtrc/README +++ b/cmd/btrtrc/README @@ -6,6 +6,9 @@ cmd/torrent it has much less configuration options, mainly hardcoding the most of them. Also it lacks various features like ability to select the file to delete. But what advantages does it have? +* Optimized file-based storage: + * linearized I/O operations prevent creation of huge quantity of threads + * cached file descriptors save a lot of syscalls * Ability to specify both IPv4 and IPv6 addresses to announce * Ability to specify DHT bootstrap nodes * Dynamic addition and removing of the torrents diff --git a/storage/file.go b/storage/file.go index b8739647..674a506b 100644 --- a/storage/file.go +++ b/storage/file.go @@ -5,6 +5,8 @@ import ( "io" "os" "path/filepath" + "sync" + "time" "github.com/anacrolix/missinggo/v2" @@ -13,6 +15,33 @@ import ( "github.com/anacrolix/torrent/segments" ) +var ( + fdRCache = map[string]*os.File{} + fdRCacheM sync.Mutex + fdWCache = map[string]*os.File{} + fdWCacheM sync.Mutex + fdMkdirAllCache = map[string]struct{}{} + fdCacheCleanerM sync.Once +) + +func fdCacheCleaner() { + for range time.Tick(10 * time.Second) { + fdRCacheM.Lock() + for _, v := range fdRCache { + v.Close() + } + fdRCache = make(map[string]*os.File) + fdRCacheM.Unlock() + + fdWCacheM.Lock() + for _, v := range fdWCache { + v.Close() + } + fdWCache = make(map[string]*os.File) + fdWCacheM.Unlock() + } +} + // File-based storage for torrents, that isn't yet bound to a particular torrent. type fileClientImpl struct { opts NewFileClientOpts @@ -48,6 +77,7 @@ func NewFileOpts(opts NewFileClientOpts) ClientImplCloser { if opts.PieceCompletion == nil { opts.PieceCompletion = pieceCompletionForDir(opts.ClientBaseDir) } + fdCacheCleanerM.Do(func() { go fdCacheCleaner() }) return fileClientImpl{opts} } @@ -142,31 +172,26 @@ type fileTorrentImplIO struct { // Returns EOF on short or missing file. func (fst *fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) { - f, err := os.Open(file.path) - if os.IsNotExist(err) { - // File missing is treated the same as a short file. - err = io.EOF - return - } - if err != nil { - return + fdRCacheM.Lock() + defer fdRCacheM.Unlock() + f := fdRCache[file.path] + if f == nil { + f, err = os.Open(file.path) + if os.IsNotExist(err) { + // File missing is treated the same as a short file. + err = io.EOF + return + } + if err != nil { + return + } + fdRCache[file.path] = f } - defer f.Close() // Limit the read to within the expected bounds of this file. if int64(len(b)) > file.length-off { b = b[:file.length-off] } - for off < file.length && len(b) != 0 { - n1, err1 := f.ReadAt(b, off) - b = b[n1:] - n += n1 - off += int64(n1) - if n1 == 0 { - err = err1 - break - } - } - return + return f.ReadAt(b, off) } // Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF. @@ -185,24 +210,28 @@ func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) { } func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) { - // log.Printf("write at %v: %v bytes", off, len(p)) fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool { name := fst.fts.files[i].path - os.MkdirAll(filepath.Dir(name), 0o777) - var f *os.File - f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666) - if err != nil { - return false + _, ok := fdMkdirAllCache[filepath.Dir(name)] + if !ok { + os.MkdirAll(filepath.Dir(name), 0o777) + fdMkdirAllCache[filepath.Dir(name)] = struct{}{} + } + fdWCacheM.Lock() + defer fdWCacheM.Unlock() + f := fdWCache[name] + if f == nil { + f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666) + if err != nil { + return false + } + fdWCache[name] = f } var n1 int n1, err = f.WriteAt(p[:e.Length], e.Start) // log.Printf("%v %v wrote %v: %v", i, e, n1, err) - closeErr := f.Close() n += n1 p = p[n1:] - if err == nil { - err = closeErr - } if err == nil && int64(n1) != e.Length { err = io.ErrShortWrite } -- 2.48.1