package storage
import (
+ "context"
"fmt"
"io"
+ "log/slog"
"os"
"path/filepath"
+ "sync"
+ "time"
+ "github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2"
- "github.com/anacrolix/torrent/common"
- "github.com/anacrolix/torrent/segments"
"github.com/anacrolix/torrent/metainfo"
+ "github.com/anacrolix/torrent/segments"
)
+const fdCacheAliveTime = 10
+
+type fdCacheEntry struct {
+ last int64
+ fd *os.File
+ sync.Mutex
+}
+
+var (
+ fdRCache = map[string]*fdCacheEntry{}
+ fdRCacheM sync.Mutex
+ fdWCache = map[string]*fdCacheEntry{}
+ fdWCacheM sync.Mutex
+ fdMkdirAllCache = map[string]struct{}{}
+ fdCacheCleanerM sync.Once
+)
+
+func fdCacheCleaner() {
+ cleaner := func(c map[string]*fdCacheEntry, m *sync.Mutex) {
+ now := time.Now().Unix()
+ m.Lock()
+ for k, v := range c {
+ if now-v.last > fdCacheAliveTime {
+ go func() {
+ v.Lock()
+ v.fd.Close()
+ v.Unlock()
+ }()
+ }
+ delete(c, k)
+ }
+ m.Unlock()
+ }
+ for range time.Tick(fdCacheAliveTime * time.Second) {
+ cleaner(fdRCache, &fdRCacheM)
+ cleaner(fdWCache, &fdWCacheM)
+ }
+}
+
// File-based storage for torrents, that isn't yet bound to a particular torrent.
type fileClientImpl struct {
opts NewFileClientOpts
if opts.FilePathMaker == nil {
opts.FilePathMaker = func(opts FilePathMakerOpts) string {
var parts []string
- if opts.Info.Name != metainfo.NoName {
- parts = append(parts, opts.Info.Name)
+ if opts.Info.BestName() != metainfo.NoName {
+ parts = append(parts, opts.Info.BestName())
}
- return filepath.Join(append(parts, opts.File.Path...)...)
+ return filepath.Join(append(parts, opts.File.BestPath()...)...)
}
}
if opts.PieceCompletion == nil {
opts.PieceCompletion = pieceCompletionForDir(opts.ClientBaseDir)
}
+ fdCacheCleanerM.Do(func() { go fdCacheCleaner() })
return fileClientImpl{opts}
}
return me.opts.PieceCompletion.Close()
}
-func (fs fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
+func (fs fileClientImpl) OpenTorrent(
+ ctx context.Context,
+ info *metainfo.Info,
+ infoHash metainfo.Hash,
+) (_ TorrentImpl, err error) {
dir := fs.opts.TorrentDirMaker(fs.opts.ClientBaseDir, info, infoHash)
+ logger := log.ContextLogger(ctx).Slogger()
+ logger.DebugContext(ctx, "opened file torrent storage", slog.String("dir", dir))
upvertedFiles := info.UpvertedFiles()
files := make([]file, 0, len(upvertedFiles))
for i, fileInfo := range upvertedFiles {
length: fileInfo.Length,
}
if f.length == 0 {
- err = CreateNativeZeroLengthFile(f.path)
+ err = CreateNativeZeroLengthFile(PathShortener(f.path))
if err != nil {
err = fmt.Errorf("creating zero length file: %w", err)
return
}
t := &fileTorrentImpl{
files,
- segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
+ info.FileSegmentsIndex(),
infoHash,
fs.opts.PieceCompletion,
}
return TorrentImpl{
Piece: t.Piece,
Close: t.Close,
+ Flush: t.Flush,
}, nil
}
return nil
}
+func fsync(filePath string) (err error) {
+ _ = os.MkdirAll(filepath.Dir(filePath), 0o777)
+ var f *os.File
+ f, err = os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0o666)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ if err = f.Sync(); err != nil {
+ return err
+ }
+ return f.Close()
+}
+
+func (fts *fileTorrentImpl) Flush() error {
+ for _, f := range fts.files {
+ if err := fsync(f.path); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
// A helper to create zero-length files which won't appear for file-orientated storage since no
// writes will ever occur to them (no torrent data is associated with a zero-length file). The
// caller should make sure the file name provided is safe/sanitized.
func CreateNativeZeroLengthFile(name string) error {
- os.MkdirAll(filepath.Dir(name), 0777)
+ os.MkdirAll(filepath.Dir(name), 0o777)
var f io.Closer
f, err := os.Create(name)
if err != nil {
}
// Returns EOF on short or missing file.
-func (fst *fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
- f, err := os.Open(file.path)
- if os.IsNotExist(err) {
- // File missing is treated the same as a short file.
- err = io.EOF
- return
- }
- if err != nil {
- return
+func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
+ fdRCacheM.Lock()
+ pth := PathShortener(file.path)
+ centry := fdRCache[pth]
+ if centry == nil {
+ var fd *os.File
+ fd, err = os.Open(pth)
+ if os.IsNotExist(err) {
+ // File missing is treated the same as a short file.
+ err = io.EOF
+ }
+ if err != nil {
+ fdRCacheM.Unlock()
+ return
+ }
+ centry = &fdCacheEntry{fd: fd}
+ fdRCache[pth] = centry
}
- defer f.Close()
+ fdRCacheM.Unlock()
// Limit the read to within the expected bounds of this file.
if int64(len(b)) > file.length-off {
b = b[:file.length-off]
}
- for off < file.length && len(b) != 0 {
- n1, err1 := f.ReadAt(b, off)
- b = b[n1:]
- n += n1
- off += int64(n1)
- if n1 == 0 {
- err = err1
- break
- }
- }
+ centry.Lock()
+ centry.last = time.Now().Unix()
+ n, err = centry.fd.ReadAt(b, off)
+ centry.Unlock()
return
}
}
func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
- //log.Printf("write at %v: %v bytes", off, len(p))
fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
- name := fst.fts.files[i].path
- os.MkdirAll(filepath.Dir(name), 0777)
- var f *os.File
- f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0666)
- if err != nil {
- return false
+ name := PathShortener(fst.fts.files[i].path)
+ _, ok := fdMkdirAllCache[filepath.Dir(name)]
+ if !ok {
+ os.MkdirAll(filepath.Dir(name), 0o777)
+ fdMkdirAllCache[filepath.Dir(name)] = struct{}{}
}
+ fdWCacheM.Lock()
+ centry := fdWCache[name]
+ if centry == nil {
+ var fd *os.File
+ fd, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666)
+ if err != nil {
+ fdWCacheM.Unlock()
+ return false
+ }
+ centry = &fdCacheEntry{fd: fd}
+ fdWCache[name] = centry
+ }
+ fdWCacheM.Unlock()
var n1 int
- n1, err = f.WriteAt(p[:e.Length], e.Start)
- //log.Printf("%v %v wrote %v: %v", i, e, n1, err)
- closeErr := f.Close()
+ centry.Lock()
+ centry.last = time.Now().Unix()
+ n1, err = centry.fd.WriteAt(p[:e.Length], e.Start)
+ centry.Unlock()
n += n1
p = p[n1:]
- if err == nil {
- err = closeErr
- }
if err == nil && int64(n1) != e.Length {
err = io.ErrShortWrite
}