]> Sergey Matveev's repositories - btrtrc.git/blob - fs/torrentfs.go
06e05e008fd48599af4ea19da1bd096c19777f19
[btrtrc.git] / fs / torrentfs.go
1 package torrentfs
2
3 import (
4         "expvar"
5         "fmt"
6         "io"
7         "os"
8         "path"
9         "strings"
10         "sync"
11
12         "bazil.org/fuse"
13         fusefs "bazil.org/fuse/fs"
14         "golang.org/x/net/context"
15
16         "github.com/anacrolix/torrent"
17         "github.com/anacrolix/torrent/metainfo"
18 )
19
20 const (
21         defaultMode = 0555
22 )
23
24 var (
25         torrentfsReadRequests        = expvar.NewInt("torrentfsReadRequests")
26         torrentfsDelayedReadRequests = expvar.NewInt("torrentfsDelayedReadRequests")
27         interruptedReads             = expvar.NewInt("interruptedReads")
28 )
29
30 type TorrentFS struct {
31         Client       *torrent.Client
32         destroyed    chan struct{}
33         mu           sync.Mutex
34         blockedReads int
35         event        sync.Cond
36 }
37
38 var (
39         _ fusefs.FSDestroyer = &TorrentFS{}
40
41         _ fusefs.NodeForgetter      = rootNode{}
42         _ fusefs.HandleReadDirAller = rootNode{}
43         _ fusefs.HandleReadDirAller = dirNode{}
44 )
45
46 type rootNode struct {
47         fs *TorrentFS
48 }
49
50 type node struct {
51         path     string
52         metadata *metainfo.Info
53         FS       *TorrentFS
54         t        *torrent.Torrent
55 }
56
57 type fileNode struct {
58         node
59         size          uint64
60         TorrentOffset int64
61 }
62
63 func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
64         attr.Size = fn.size
65         attr.Mode = defaultMode
66         return nil
67 }
68
69 func (n *node) fsPath() string {
70         return "/" + n.metadata.Name + "/" + n.path
71 }
72
73 func blockingRead(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) {
74         fs.mu.Lock()
75         fs.blockedReads++
76         fs.event.Broadcast()
77         fs.mu.Unlock()
78         var (
79                 _n   int
80                 _err error
81         )
82         readDone := make(chan struct{})
83         go func() {
84                 defer close(readDone)
85                 r := t.NewReader()
86                 defer r.Close()
87                 _, _err = r.Seek(off, os.SEEK_SET)
88                 if _err != nil {
89                         return
90                 }
91                 _n, _err = io.ReadFull(r, p)
92         }()
93         select {
94         case <-readDone:
95                 n = _n
96                 err = _err
97         case <-fs.destroyed:
98                 err = fuse.EIO
99         case <-ctx.Done():
100                 err = fuse.EINTR
101         }
102         fs.mu.Lock()
103         fs.blockedReads--
104         fs.event.Broadcast()
105         fs.mu.Unlock()
106         return
107 }
108
109 func readFull(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) {
110         for len(p) != 0 {
111                 var nn int
112                 nn, err = blockingRead(ctx, fs, t, off, p)
113                 if err != nil {
114                         break
115                 }
116                 n += nn
117                 off += int64(nn)
118                 p = p[nn:]
119         }
120         return
121 }
122
123 func (fn fileNode) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
124         torrentfsReadRequests.Add(1)
125         if req.Dir {
126                 panic("read on directory")
127         }
128         size := req.Size
129         fileLeft := int64(fn.size) - req.Offset
130         if fileLeft < 0 {
131                 fileLeft = 0
132         }
133         if fileLeft < int64(size) {
134                 size = int(fileLeft)
135         }
136         resp.Data = resp.Data[:size]
137         if len(resp.Data) == 0 {
138                 return nil
139         }
140         torrentOff := fn.TorrentOffset + req.Offset
141         n, err := readFull(ctx, fn.FS, fn.t, torrentOff, resp.Data)
142         if err != nil {
143                 return err
144         }
145         if n != size {
146                 panic(fmt.Sprintf("%d < %d", n, size))
147         }
148         return nil
149 }
150
151 type dirNode struct {
152         node
153 }
154
155 var (
156         _ fusefs.HandleReadDirAller = dirNode{}
157         _ fusefs.HandleReader       = fileNode{}
158 )
159
160 func isSubPath(parent, child string) bool {
161         if len(parent) == 0 {
162                 return len(child) > 0
163         }
164         if !strings.HasPrefix(child, parent) {
165                 return false
166         }
167         s := child[len(parent):]
168         if len(s) == 0 {
169                 return false
170         }
171         return s[0] == '/'
172 }
173
174 func (dn dirNode) ReadDirAll(ctx context.Context) (des []fuse.Dirent, err error) {
175         names := map[string]bool{}
176         for _, fi := range dn.metadata.Files {
177                 if !isSubPath(dn.path, strings.Join(fi.Path, "/")) {
178                         continue
179                 }
180                 name := fi.Path[len(dn.path)]
181                 if names[name] {
182                         continue
183                 }
184                 names[name] = true
185                 de := fuse.Dirent{
186                         Name: name,
187                 }
188                 if len(fi.Path) == len(dn.path)+1 {
189                         de.Type = fuse.DT_File
190                 } else {
191                         de.Type = fuse.DT_Dir
192                 }
193                 des = append(des, de)
194         }
195         return
196 }
197
198 func (dn dirNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) {
199         var torrentOffset int64
200         for _, fi := range dn.metadata.Files {
201                 if !isSubPath(dn.path, strings.Join(fi.Path, "/")) {
202                         torrentOffset += fi.Length
203                         continue
204                 }
205                 if fi.Path[len(dn.path)] != name {
206                         torrentOffset += fi.Length
207                         continue
208                 }
209                 __node := dn.node
210                 __node.path = path.Join(__node.path, name)
211                 if len(fi.Path) == len(dn.path)+1 {
212                         _node = fileNode{
213                                 node:          __node,
214                                 size:          uint64(fi.Length),
215                                 TorrentOffset: torrentOffset,
216                         }
217                 } else {
218                         _node = dirNode{__node}
219                 }
220                 break
221         }
222         if _node == nil {
223                 err = fuse.ENOENT
224         }
225         return
226 }
227
228 func (dn dirNode) Attr(ctx context.Context, attr *fuse.Attr) error {
229         attr.Mode = os.ModeDir | defaultMode
230         return nil
231 }
232
233 func (rn rootNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) {
234         for _, t := range rn.fs.Client.Torrents() {
235                 info := t.Info()
236                 if t.Name() != name || info == nil {
237                         continue
238                 }
239                 __node := node{
240                         metadata: info,
241                         FS:       rn.fs,
242                         t:        t,
243                 }
244                 if !info.IsDir() {
245                         _node = fileNode{__node, uint64(info.Length), 0}
246                 } else {
247                         _node = dirNode{__node}
248                 }
249                 break
250         }
251         if _node == nil {
252                 err = fuse.ENOENT
253         }
254         return
255 }
256
257 func (rn rootNode) ReadDirAll(ctx context.Context) (dirents []fuse.Dirent, err error) {
258         for _, t := range rn.fs.Client.Torrents() {
259                 info := t.Info()
260                 if info == nil {
261                         continue
262                 }
263                 dirents = append(dirents, fuse.Dirent{
264                         Name: info.Name,
265                         Type: func() fuse.DirentType {
266                                 if !info.IsDir() {
267                                         return fuse.DT_File
268                                 } else {
269                                         return fuse.DT_Dir
270                                 }
271                         }(),
272                 })
273         }
274         return
275 }
276
277 func (rn rootNode) Attr(ctx context.Context, attr *fuse.Attr) error {
278         attr.Mode = os.ModeDir
279         return nil
280 }
281
282 // TODO(anacrolix): Why should rootNode implement this?
283 func (rn rootNode) Forget() {
284         rn.fs.Destroy()
285 }
286
287 func (tfs *TorrentFS) Root() (fusefs.Node, error) {
288         return rootNode{tfs}, nil
289 }
290
291 func (tfs *TorrentFS) Destroy() {
292         tfs.mu.Lock()
293         select {
294         case <-tfs.destroyed:
295         default:
296                 close(tfs.destroyed)
297         }
298         tfs.mu.Unlock()
299 }
300
301 func New(cl *torrent.Client) *TorrentFS {
302         fs := &TorrentFS{
303                 Client:    cl,
304                 destroyed: make(chan struct{}),
305         }
306         fs.event.L = &fs.mu
307         return fs
308 }