package storage
import (
+ "fmt"
"io"
"os"
"path/filepath"
+ "sync"
+ "time"
- "github.com/anacrolix/missinggo"
+ "github.com/anacrolix/missinggo/v2"
+ "github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/metainfo"
+ "github.com/anacrolix/torrent/segments"
)
-type fileStorage struct {
- baseDir string
- completed map[[20]byte]bool
+const fdCacheAliveTime = 10
+
+type fdCacheEntry struct {
+ last int64
+ fd *os.File
+ sync.Mutex
}
-func NewFile(baseDir string) I {
- return &fileStorage{
- baseDir: baseDir,
+var (
+ fdRCache = map[string]*fdCacheEntry{}
+ fdRCacheM sync.Mutex
+ fdWCache = map[string]*fdCacheEntry{}
+ fdWCacheM sync.Mutex
+ fdMkdirAllCache = map[string]struct{}{}
+ fdCacheCleanerM sync.Once
+)
+
+func fdCacheCleaner() {
+ cleaner := func(c map[string]*fdCacheEntry, m *sync.Mutex) {
+ now := time.Now().Unix()
+ m.Lock()
+ for k, v := range c {
+ if now-v.last > fdCacheAliveTime {
+ go func() {
+ v.Lock()
+ v.fd.Close()
+ v.Unlock()
+ }()
+ }
+ delete(c, k)
+ }
+ m.Unlock()
+ }
+ for range time.Tick(fdCacheAliveTime * time.Second) {
+ cleaner(fdRCache, &fdRCacheM)
+ cleaner(fdWCache, &fdWCacheM)
}
}
-func (me *fileStorage) OpenTorrent(info *metainfo.InfoEx) (Torrent, error) {
- return fileTorrentStorage{me}, nil
+// File-based storage for torrents, that isn't yet bound to a particular torrent.
+type fileClientImpl struct {
+ opts NewFileClientOpts
+}
+
+// All Torrent data stored in this baseDir. The info names of each torrent are used as directories.
+func NewFile(baseDir string) ClientImplCloser {
+ return NewFileWithCompletion(baseDir, pieceCompletionForDir(baseDir))
}
-type fileTorrentStorage struct {
- *fileStorage
+type NewFileClientOpts struct {
+ // The base directory for all downloads.
+ ClientBaseDir string
+ FilePathMaker FilePathMaker
+ TorrentDirMaker TorrentDirFilePathMaker
+ PieceCompletion PieceCompletion
}
-func (me *fileStorage) Piece(p metainfo.Piece) Piece {
- _io := &fileStorageTorrent{
- p.Info,
- me.baseDir,
+// NewFileOpts creates a new ClientImplCloser that stores files using the OS native filesystem.
+func NewFileOpts(opts NewFileClientOpts) ClientImplCloser {
+ if opts.TorrentDirMaker == nil {
+ opts.TorrentDirMaker = defaultPathMaker
}
- return &fileStoragePiece{
- me,
- p,
- missinggo.NewSectionWriter(_io, p.Offset(), p.Length()),
- io.NewSectionReader(_io, p.Offset(), p.Length()),
+ if opts.FilePathMaker == nil {
+ opts.FilePathMaker = func(opts FilePathMakerOpts) string {
+ var parts []string
+ if opts.Info.Name != metainfo.NoName {
+ parts = append(parts, opts.Info.Name)
+ }
+ return filepath.Join(append(parts, opts.File.Path...)...)
+ }
+ }
+ if opts.PieceCompletion == nil {
+ opts.PieceCompletion = pieceCompletionForDir(opts.ClientBaseDir)
}
+ fdCacheCleanerM.Do(func() { go fdCacheCleaner() })
+ return fileClientImpl{opts}
}
-func (me *fileStorage) Close() error {
- return nil
+func (me fileClientImpl) Close() error {
+ return me.opts.PieceCompletion.Close()
+}
+
+func (fs fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
+ dir := fs.opts.TorrentDirMaker(fs.opts.ClientBaseDir, info, infoHash)
+ upvertedFiles := info.UpvertedFiles()
+ files := make([]file, 0, len(upvertedFiles))
+ for i, fileInfo := range upvertedFiles {
+ filePath := filepath.Join(dir, fs.opts.FilePathMaker(FilePathMakerOpts{
+ Info: info,
+ File: &fileInfo,
+ }))
+ if !isSubFilepath(dir, filePath) {
+ err = fmt.Errorf("file %v: path %q is not sub path of %q", i, filePath, dir)
+ return
+ }
+ f := file{
+ path: filePath,
+ length: fileInfo.Length,
+ }
+ if f.length == 0 {
+ err = CreateNativeZeroLengthFile(PathShortener(f.path))
+ if err != nil {
+ err = fmt.Errorf("creating zero length file: %w", err)
+ return
+ }
+ }
+ files = append(files, f)
+ }
+ t := &fileTorrentImpl{
+ files,
+ segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
+ infoHash,
+ fs.opts.PieceCompletion,
+ }
+ return TorrentImpl{
+ Piece: t.Piece,
+ Close: t.Close,
+ }, nil
}
-type fileStoragePiece struct {
- *fileStorage
- p metainfo.Piece
- io.WriterAt
- io.ReaderAt
+type file struct {
+ // The safe, OS-local file path.
+ path string
+ length int64
}
-func (me *fileStoragePiece) GetIsComplete() bool {
- return me.completed[me.p.Hash()]
+type fileTorrentImpl struct {
+ files []file
+ segmentLocater segments.Index
+ infoHash metainfo.Hash
+ completion PieceCompletion
}
-func (me *fileStoragePiece) MarkComplete() error {
- if me.completed == nil {
- me.completed = make(map[[20]byte]bool)
+func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
+ // Create a view onto the file-based torrent storage.
+ _io := fileTorrentImplIO{fts}
+ // Return the appropriate segments of this.
+ return &filePieceImpl{
+ fts,
+ p,
+ missinggo.NewSectionWriter(_io, p.Offset(), p.Length()),
+ io.NewSectionReader(_io, p.Offset(), p.Length()),
}
- me.completed[me.p.Hash()] = true
+}
+
+func (fs *fileTorrentImpl) Close() error {
return nil
}
-type fileStorageTorrent struct {
- info *metainfo.InfoEx
- baseDir string
+// A helper to create zero-length files which won't appear for file-orientated storage since no
+// writes will ever occur to them (no torrent data is associated with a zero-length file). The
+// caller should make sure the file name provided is safe/sanitized.
+func CreateNativeZeroLengthFile(name string) error {
+ os.MkdirAll(filepath.Dir(name), 0o777)
+ var f io.Closer
+ f, err := os.Create(name)
+ if err != nil {
+ return err
+ }
+ return f.Close()
+}
+
+// Exposes file-based storage of a torrent, as one big ReadWriterAt.
+type fileTorrentImplIO struct {
+ fts *fileTorrentImpl
}
// Returns EOF on short or missing file.
-func (me *fileStorageTorrent) readFileAt(fi metainfo.FileInfo, b []byte, off int64) (n int, err error) {
- f, err := os.Open(me.fileInfoName(fi))
- if os.IsNotExist(err) {
- // File missing is treated the same as a short file.
- err = io.EOF
- return
- }
- if err != nil {
- return
+func (fst *fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
+ fdRCacheM.Lock()
+ pth := PathShortener(file.path)
+ centry := fdRCache[pth]
+ if centry == nil {
+ var fd *os.File
+ fd, err = os.Open(pth)
+ if os.IsNotExist(err) {
+ // File missing is treated the same as a short file.
+ err = io.EOF
+ }
+ if err != nil {
+ fdRCacheM.Unlock()
+ return
+ }
+ centry = &fdCacheEntry{fd: fd}
+ fdRCache[pth] = centry
}
- defer f.Close()
+ fdRCacheM.Unlock()
// Limit the read to within the expected bounds of this file.
- if int64(len(b)) > fi.Length-off {
- b = b[:fi.Length-off]
- }
- for off < fi.Length && len(b) != 0 {
- n1, err1 := f.ReadAt(b, off)
- b = b[n1:]
- n += n1
- off += int64(n1)
- if n1 == 0 {
- err = err1
- break
- }
+ if int64(len(b)) > file.length-off {
+ b = b[:file.length-off]
}
+ centry.Lock()
+ centry.last = time.Now().Unix()
+ n, err = centry.fd.ReadAt(b, off)
+ centry.Unlock()
return
}
// Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF.
-func (me *fileStorageTorrent) ReadAt(b []byte, off int64) (n int, err error) {
- for _, fi := range me.info.UpvertedFiles() {
- for off < fi.Length {
- n1, err1 := me.readFileAt(fi, b, off)
- n += n1
- off += int64(n1)
- b = b[n1:]
- if len(b) == 0 {
- // Got what we need.
- return
- }
- if n1 != 0 {
- // Made progress.
- continue
- }
- err = err1
- if err == io.EOF {
- // Lies.
- err = io.ErrUnexpectedEOF
- }
- return
- }
- off -= fi.Length
+func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
+ fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(b))}, func(i int, e segments.Extent) bool {
+ n1, err1 := fst.readFileAt(fst.fts.files[i], b[:e.Length], e.Start)
+ n += n1
+ b = b[n1:]
+ err = err1
+ return err == nil // && int64(n1) == e.Length
+ })
+ if len(b) != 0 && err == nil {
+ err = io.EOF
}
- err = io.EOF
return
}
-func (me *fileStorageTorrent) WriteAt(p []byte, off int64) (n int, err error) {
- for _, fi := range me.info.UpvertedFiles() {
- if off >= fi.Length {
- off -= fi.Length
- continue
- }
- n1 := len(p)
- if int64(n1) > fi.Length-off {
- n1 = int(fi.Length - off)
- }
- name := me.fileInfoName(fi)
- os.MkdirAll(filepath.Dir(name), 0770)
- var f *os.File
- f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0660)
- if err != nil {
- return
+func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
+ fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
+ name := PathShortener(fst.fts.files[i].path)
+ _, ok := fdMkdirAllCache[filepath.Dir(name)]
+ if !ok {
+ os.MkdirAll(filepath.Dir(name), 0o777)
+ fdMkdirAllCache[filepath.Dir(name)] = struct{}{}
}
- n1, err = f.WriteAt(p[:n1], off)
- f.Close()
- if err != nil {
- return
+ fdWCacheM.Lock()
+ centry := fdWCache[name]
+ if centry == nil {
+ var fd *os.File
+ fd, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666)
+ if err != nil {
+ fdWCacheM.Unlock()
+ return false
+ }
+ centry = &fdCacheEntry{fd: fd}
+ fdWCache[name] = centry
}
+ fdWCacheM.Unlock()
+ var n1 int
+ centry.Lock()
+ centry.last = time.Now().Unix()
+ n1, err = centry.fd.WriteAt(p[:e.Length], e.Start)
+ centry.Unlock()
n += n1
- off = 0
p = p[n1:]
- if len(p) == 0 {
- break
+ if err == nil && int64(n1) != e.Length {
+ err = io.ErrShortWrite
}
- }
+ return err == nil
+ })
return
}
-
-func (me *fileStorageTorrent) fileInfoName(fi metainfo.FileInfo) string {
- return filepath.Join(append([]string{me.baseDir, me.info.Name}, fi.Path...)...)
-}