]> Sergey Matveev's repositories - btrtrc.git/blob - fs/torrentfs.go
Revert "Merge pull request #54 from zhulik/master"
[btrtrc.git] / fs / torrentfs.go
1 package torrentfs
2
3 import (
4         "expvar"
5         "fmt"
6         "os"
7         "path"
8         "strings"
9         "sync"
10
11         "bazil.org/fuse"
12         fusefs "bazil.org/fuse/fs"
13         "golang.org/x/net/context"
14
15         "github.com/anacrolix/torrent"
16         "github.com/anacrolix/torrent/metainfo"
17 )
18
19 const (
20         defaultMode = 0555
21 )
22
23 var (
24         torrentfsReadRequests        = expvar.NewInt("torrentfsReadRequests")
25         torrentfsDelayedReadRequests = expvar.NewInt("torrentfsDelayedReadRequests")
26         interruptedReads             = expvar.NewInt("interruptedReads")
27 )
28
29 type TorrentFS struct {
30         Client       *torrent.Client
31         destroyed    chan struct{}
32         mu           sync.Mutex
33         blockedReads int
34         event        sync.Cond
35 }
36
37 var (
38         _ fusefs.FSDestroyer = &TorrentFS{}
39
40         _ fusefs.NodeForgetter      = rootNode{}
41         _ fusefs.HandleReadDirAller = rootNode{}
42         _ fusefs.HandleReadDirAller = dirNode{}
43 )
44
45 type rootNode struct {
46         fs *TorrentFS
47 }
48
49 type node struct {
50         path     string
51         metadata *metainfo.Info
52         FS       *TorrentFS
53         t        torrent.Torrent
54 }
55
56 type fileNode struct {
57         node
58         size          uint64
59         TorrentOffset int64
60 }
61
62 func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
63         attr.Size = fn.size
64         attr.Mode = defaultMode
65         return nil
66 }
67
68 func (n *node) fsPath() string {
69         return "/" + n.metadata.Name + "/" + n.path
70 }
71
72 func blockingRead(ctx context.Context, fs *TorrentFS, t torrent.Torrent, off int64, p []byte) (n int, err error) {
73         fs.mu.Lock()
74         fs.blockedReads++
75         fs.event.Broadcast()
76         fs.mu.Unlock()
77         var (
78                 _n   int
79                 _err error
80         )
81         readDone := make(chan struct{})
82         go func() {
83                 r := t.NewReader()
84                 defer r.Close()
85                 _n, _err = r.ReadAt(p, off)
86                 close(readDone)
87         }()
88         select {
89         case <-readDone:
90                 n = _n
91                 err = _err
92         case <-fs.destroyed:
93                 err = fuse.EIO
94         case <-ctx.Done():
95                 err = fuse.EINTR
96         }
97         fs.mu.Lock()
98         fs.blockedReads--
99         fs.event.Broadcast()
100         fs.mu.Unlock()
101         return
102 }
103
104 func readFull(ctx context.Context, fs *TorrentFS, t torrent.Torrent, off int64, p []byte) (n int, err error) {
105         for len(p) != 0 {
106                 var nn int
107                 nn, err = blockingRead(ctx, fs, t, off, p)
108                 if err != nil {
109                         break
110                 }
111                 n += nn
112                 off += int64(nn)
113                 p = p[nn:]
114         }
115         return
116 }
117
118 func (fn fileNode) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
119         torrentfsReadRequests.Add(1)
120         if req.Dir {
121                 panic("read on directory")
122         }
123         size := req.Size
124         fileLeft := int64(fn.size) - req.Offset
125         if fileLeft < 0 {
126                 fileLeft = 0
127         }
128         if fileLeft < int64(size) {
129                 size = int(fileLeft)
130         }
131         resp.Data = resp.Data[:size]
132         if len(resp.Data) == 0 {
133                 return nil
134         }
135         torrentOff := fn.TorrentOffset + req.Offset
136         n, err := readFull(ctx, fn.FS, fn.t, torrentOff, resp.Data)
137         if err != nil {
138                 return err
139         }
140         if n != size {
141                 panic(fmt.Sprintf("%d < %d", n, size))
142         }
143         return nil
144 }
145
146 type dirNode struct {
147         node
148 }
149
150 var (
151         _ fusefs.HandleReadDirAller = dirNode{}
152         _ fusefs.HandleReader       = fileNode{}
153 )
154
155 func isSubPath(parent, child string) bool {
156         if !strings.HasPrefix(child, parent) {
157                 return false
158         }
159         s := child[len(parent):]
160         if len(s) == 0 {
161                 return false
162         }
163         return s[0] == '/'
164 }
165
166 func (dn dirNode) ReadDirAll(ctx context.Context) (des []fuse.Dirent, err error) {
167         names := map[string]bool{}
168         for _, fi := range dn.metadata.Files {
169                 if !isSubPath(dn.path, strings.Join(fi.Path, "/")) {
170                         continue
171                 }
172                 name := fi.Path[len(dn.path)]
173                 if names[name] {
174                         continue
175                 }
176                 names[name] = true
177                 de := fuse.Dirent{
178                         Name: name,
179                 }
180                 if len(fi.Path) == len(dn.path)+1 {
181                         de.Type = fuse.DT_File
182                 } else {
183                         de.Type = fuse.DT_Dir
184                 }
185                 des = append(des, de)
186         }
187         return
188 }
189
190 func (dn dirNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) {
191         var torrentOffset int64
192         for _, fi := range dn.metadata.Files {
193                 if !isSubPath(dn.path, strings.Join(fi.Path, "/")) {
194                         torrentOffset += fi.Length
195                         continue
196                 }
197                 if fi.Path[len(dn.path)] != name {
198                         torrentOffset += fi.Length
199                         continue
200                 }
201                 __node := dn.node
202                 __node.path = path.Join(__node.path, name)
203                 if len(fi.Path) == len(dn.path)+1 {
204                         _node = fileNode{
205                                 node:          __node,
206                                 size:          uint64(fi.Length),
207                                 TorrentOffset: torrentOffset,
208                         }
209                 } else {
210                         _node = dirNode{__node}
211                 }
212                 break
213         }
214         if _node == nil {
215                 err = fuse.ENOENT
216         }
217         return
218 }
219
220 func (dn dirNode) Attr(ctx context.Context, attr *fuse.Attr) error {
221         attr.Mode = os.ModeDir | defaultMode
222         return nil
223 }
224
225 func (me rootNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) {
226         for _, t := range me.fs.Client.Torrents() {
227                 info := t.Info()
228                 if t.Name() != name || info == nil {
229                         continue
230                 }
231                 __node := node{
232                         metadata: info,
233                         FS:       me.fs,
234                         t:        t,
235                 }
236                 if !info.IsDir() {
237                         _node = fileNode{__node, uint64(info.Length), 0}
238                 } else {
239                         _node = dirNode{__node}
240                 }
241                 break
242         }
243         if _node == nil {
244                 err = fuse.ENOENT
245         }
246         return
247 }
248
249 func (me rootNode) ReadDirAll(ctx context.Context) (dirents []fuse.Dirent, err error) {
250         for _, t := range me.fs.Client.Torrents() {
251                 info := t.Info()
252                 if info == nil {
253                         continue
254                 }
255                 dirents = append(dirents, fuse.Dirent{
256                         Name: info.Name,
257                         Type: func() fuse.DirentType {
258                                 if !info.IsDir() {
259                                         return fuse.DT_File
260                                 } else {
261                                         return fuse.DT_Dir
262                                 }
263                         }(),
264                 })
265         }
266         return
267 }
268
269 func (rootNode) Attr(ctx context.Context, attr *fuse.Attr) error {
270         attr.Mode = os.ModeDir
271         return nil
272 }
273
274 // TODO(anacrolix): Why should rootNode implement this?
275 func (me rootNode) Forget() {
276         me.fs.Destroy()
277 }
278
279 func (tfs *TorrentFS) Root() (fusefs.Node, error) {
280         return rootNode{tfs}, nil
281 }
282
283 func (me *TorrentFS) Destroy() {
284         me.mu.Lock()
285         select {
286         case <-me.destroyed:
287         default:
288                 close(me.destroyed)
289         }
290         me.mu.Unlock()
291 }
292
293 func New(cl *torrent.Client) *TorrentFS {
294         fs := &TorrentFS{
295                 Client:    cl,
296                 destroyed: make(chan struct{}),
297         }
298         fs.event.L = &fs.mu
299         return fs
300 }