]> Sergey Matveev's repositories - btrtrc.git/blob - storage/file.go
Linear IO and cached FDs
[btrtrc.git] / storage / file.go
1 package storage
2
3 import (
4         "fmt"
5         "io"
6         "os"
7         "path/filepath"
8         "sync"
9         "time"
10
11         "github.com/anacrolix/missinggo/v2"
12
13         "github.com/anacrolix/torrent/common"
14         "github.com/anacrolix/torrent/metainfo"
15         "github.com/anacrolix/torrent/segments"
16 )
17
18 var (
19         fdRCache        = map[string]*os.File{}
20         fdRCacheM       sync.Mutex
21         fdWCache        = map[string]*os.File{}
22         fdWCacheM       sync.Mutex
23         fdMkdirAllCache = map[string]struct{}{}
24         fdCacheCleanerM sync.Once
25 )
26
27 func fdCacheCleaner() {
28         for range time.Tick(10 * time.Second) {
29                 fdRCacheM.Lock()
30                 for _, v := range fdRCache {
31                         v.Close()
32                 }
33                 fdRCache = make(map[string]*os.File)
34                 fdRCacheM.Unlock()
35
36                 fdWCacheM.Lock()
37                 for _, v := range fdWCache {
38                         v.Close()
39                 }
40                 fdWCache = make(map[string]*os.File)
41                 fdWCacheM.Unlock()
42         }
43 }
44
45 // File-based storage for torrents, that isn't yet bound to a particular torrent.
46 type fileClientImpl struct {
47         opts NewFileClientOpts
48 }
49
50 // All Torrent data stored in this baseDir. The info names of each torrent are used as directories.
51 func NewFile(baseDir string) ClientImplCloser {
52         return NewFileWithCompletion(baseDir, pieceCompletionForDir(baseDir))
53 }
54
55 type NewFileClientOpts struct {
56         // The base directory for all downloads.
57         ClientBaseDir   string
58         FilePathMaker   FilePathMaker
59         TorrentDirMaker TorrentDirFilePathMaker
60         PieceCompletion PieceCompletion
61 }
62
63 // NewFileOpts creates a new ClientImplCloser that stores files using the OS native filesystem.
64 func NewFileOpts(opts NewFileClientOpts) ClientImplCloser {
65         if opts.TorrentDirMaker == nil {
66                 opts.TorrentDirMaker = defaultPathMaker
67         }
68         if opts.FilePathMaker == nil {
69                 opts.FilePathMaker = func(opts FilePathMakerOpts) string {
70                         var parts []string
71                         if opts.Info.Name != metainfo.NoName {
72                                 parts = append(parts, opts.Info.Name)
73                         }
74                         return filepath.Join(append(parts, opts.File.Path...)...)
75                 }
76         }
77         if opts.PieceCompletion == nil {
78                 opts.PieceCompletion = pieceCompletionForDir(opts.ClientBaseDir)
79         }
80         fdCacheCleanerM.Do(func() { go fdCacheCleaner() })
81         return fileClientImpl{opts}
82 }
83
84 func (me fileClientImpl) Close() error {
85         return me.opts.PieceCompletion.Close()
86 }
87
88 func (fs fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
89         dir := fs.opts.TorrentDirMaker(fs.opts.ClientBaseDir, info, infoHash)
90         upvertedFiles := info.UpvertedFiles()
91         files := make([]file, 0, len(upvertedFiles))
92         for i, fileInfo := range upvertedFiles {
93                 filePath := filepath.Join(dir, fs.opts.FilePathMaker(FilePathMakerOpts{
94                         Info: info,
95                         File: &fileInfo,
96                 }))
97                 if !isSubFilepath(dir, filePath) {
98                         err = fmt.Errorf("file %v: path %q is not sub path of %q", i, filePath, dir)
99                         return
100                 }
101                 f := file{
102                         path:   filePath,
103                         length: fileInfo.Length,
104                 }
105                 if f.length == 0 {
106                         err = CreateNativeZeroLengthFile(f.path)
107                         if err != nil {
108                                 err = fmt.Errorf("creating zero length file: %w", err)
109                                 return
110                         }
111                 }
112                 files = append(files, f)
113         }
114         t := &fileTorrentImpl{
115                 files,
116                 segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
117                 infoHash,
118                 fs.opts.PieceCompletion,
119         }
120         return TorrentImpl{
121                 Piece: t.Piece,
122                 Close: t.Close,
123         }, nil
124 }
125
126 type file struct {
127         // The safe, OS-local file path.
128         path   string
129         length int64
130 }
131
132 type fileTorrentImpl struct {
133         files          []file
134         segmentLocater segments.Index
135         infoHash       metainfo.Hash
136         completion     PieceCompletion
137 }
138
139 func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
140         // Create a view onto the file-based torrent storage.
141         _io := fileTorrentImplIO{fts}
142         // Return the appropriate segments of this.
143         return &filePieceImpl{
144                 fts,
145                 p,
146                 missinggo.NewSectionWriter(_io, p.Offset(), p.Length()),
147                 io.NewSectionReader(_io, p.Offset(), p.Length()),
148         }
149 }
150
151 func (fs *fileTorrentImpl) Close() error {
152         return nil
153 }
154
155 // A helper to create zero-length files which won't appear for file-orientated storage since no
156 // writes will ever occur to them (no torrent data is associated with a zero-length file). The
157 // caller should make sure the file name provided is safe/sanitized.
158 func CreateNativeZeroLengthFile(name string) error {
159         os.MkdirAll(filepath.Dir(name), 0o777)
160         var f io.Closer
161         f, err := os.Create(name)
162         if err != nil {
163                 return err
164         }
165         return f.Close()
166 }
167
168 // Exposes file-based storage of a torrent, as one big ReadWriterAt.
169 type fileTorrentImplIO struct {
170         fts *fileTorrentImpl
171 }
172
173 // Returns EOF on short or missing file.
174 func (fst *fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
175         fdRCacheM.Lock()
176         defer fdRCacheM.Unlock()
177         f := fdRCache[file.path]
178         if f == nil {
179                 f, err = os.Open(file.path)
180                 if os.IsNotExist(err) {
181                         // File missing is treated the same as a short file.
182                         err = io.EOF
183                         return
184                 }
185                 if err != nil {
186                         return
187                 }
188                 fdRCache[file.path] = f
189         }
190         // Limit the read to within the expected bounds of this file.
191         if int64(len(b)) > file.length-off {
192                 b = b[:file.length-off]
193         }
194         return f.ReadAt(b, off)
195 }
196
197 // Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF.
198 func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
199         fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(b))}, func(i int, e segments.Extent) bool {
200                 n1, err1 := fst.readFileAt(fst.fts.files[i], b[:e.Length], e.Start)
201                 n += n1
202                 b = b[n1:]
203                 err = err1
204                 return err == nil // && int64(n1) == e.Length
205         })
206         if len(b) != 0 && err == nil {
207                 err = io.EOF
208         }
209         return
210 }
211
212 func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
213         fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
214                 name := fst.fts.files[i].path
215                 _, ok := fdMkdirAllCache[filepath.Dir(name)]
216                 if !ok {
217                         os.MkdirAll(filepath.Dir(name), 0o777)
218                         fdMkdirAllCache[filepath.Dir(name)] = struct{}{}
219                 }
220                 fdWCacheM.Lock()
221                 defer fdWCacheM.Unlock()
222                 f := fdWCache[name]
223                 if f == nil {
224                         f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666)
225                         if err != nil {
226                                 return false
227                         }
228                         fdWCache[name] = f
229                 }
230                 var n1 int
231                 n1, err = f.WriteAt(p[:e.Length], e.Start)
232                 // log.Printf("%v %v wrote %v: %v", i, e, n1, err)
233                 n += n1
234                 p = p[n1:]
235                 if err == nil && int64(n1) != e.Length {
236                         err = io.ErrShortWrite
237                 }
238                 return err == nil
239         })
240         return
241 }