]> Sergey Matveev's repositories - btrtrc.git/blob - fs/torrentfs.go
Finish fixing tests
[btrtrc.git] / fs / torrentfs.go
1 package torrentfs
2
3 import (
4         "expvar"
5         "fmt"
6         "io"
7         "os"
8         "path"
9         "strings"
10         "sync"
11
12         "bazil.org/fuse"
13         fusefs "bazil.org/fuse/fs"
14         "golang.org/x/net/context"
15
16         "github.com/anacrolix/torrent"
17         "github.com/anacrolix/torrent/metainfo"
18 )
19
20 const (
21         defaultMode = 0555
22 )
23
24 var (
25         torrentfsReadRequests        = expvar.NewInt("torrentfsReadRequests")
26         torrentfsDelayedReadRequests = expvar.NewInt("torrentfsDelayedReadRequests")
27         interruptedReads             = expvar.NewInt("interruptedReads")
28 )
29
30 type TorrentFS struct {
31         Client       *torrent.Client
32         destroyed    chan struct{}
33         mu           sync.Mutex
34         blockedReads int
35         event        sync.Cond
36 }
37
38 var (
39         _ fusefs.FSDestroyer = &TorrentFS{}
40
41         _ fusefs.NodeForgetter      = rootNode{}
42         _ fusefs.HandleReadDirAller = rootNode{}
43         _ fusefs.HandleReadDirAller = dirNode{}
44 )
45
46 type rootNode struct {
47         fs *TorrentFS
48 }
49
50 type node struct {
51         path     string
52         metadata *metainfo.InfoEx
53         FS       *TorrentFS
54         t        torrent.Torrent
55 }
56
57 type fileNode struct {
58         node
59         size          uint64
60         TorrentOffset int64
61 }
62
63 func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
64         attr.Size = fn.size
65         attr.Mode = defaultMode
66         return nil
67 }
68
69 func (n *node) fsPath() string {
70         return "/" + n.metadata.Name + "/" + n.path
71 }
72
73 func blockingRead(ctx context.Context, fs *TorrentFS, t torrent.Torrent, off int64, p []byte) (n int, err error) {
74         fs.mu.Lock()
75         fs.blockedReads++
76         fs.event.Broadcast()
77         fs.mu.Unlock()
78         var (
79                 _n   int
80                 _err error
81         )
82         readDone := make(chan struct{})
83         go func() {
84                 defer close(readDone)
85                 r := t.NewReader()
86                 defer r.Close()
87                 _, _err = r.Seek(off, os.SEEK_SET)
88                 if _err != nil {
89                         return
90                 }
91                 _n, _err = io.ReadFull(r, p)
92         }()
93         select {
94         case <-readDone:
95                 n = _n
96                 err = _err
97         case <-fs.destroyed:
98                 err = fuse.EIO
99         case <-ctx.Done():
100                 err = fuse.EINTR
101         }
102         fs.mu.Lock()
103         fs.blockedReads--
104         fs.event.Broadcast()
105         fs.mu.Unlock()
106         return
107 }
108
109 func readFull(ctx context.Context, fs *TorrentFS, t torrent.Torrent, off int64, p []byte) (n int, err error) {
110         for len(p) != 0 {
111                 var nn int
112                 nn, err = blockingRead(ctx, fs, t, off, p)
113                 if err != nil {
114                         break
115                 }
116                 n += nn
117                 off += int64(nn)
118                 p = p[nn:]
119         }
120         return
121 }
122
123 func (fn fileNode) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
124         torrentfsReadRequests.Add(1)
125         if req.Dir {
126                 panic("read on directory")
127         }
128         size := req.Size
129         fileLeft := int64(fn.size) - req.Offset
130         if fileLeft < 0 {
131                 fileLeft = 0
132         }
133         if fileLeft < int64(size) {
134                 size = int(fileLeft)
135         }
136         resp.Data = resp.Data[:size]
137         if len(resp.Data) == 0 {
138                 return nil
139         }
140         torrentOff := fn.TorrentOffset + req.Offset
141         n, err := readFull(ctx, fn.FS, fn.t, torrentOff, resp.Data)
142         if err != nil {
143                 return err
144         }
145         if n != size {
146                 panic(fmt.Sprintf("%d < %d", n, size))
147         }
148         return nil
149 }
150
151 type dirNode struct {
152         node
153 }
154
155 var (
156         _ fusefs.HandleReadDirAller = dirNode{}
157         _ fusefs.HandleReader       = fileNode{}
158 )
159
160 func isSubPath(parent, child string) bool {
161         if !strings.HasPrefix(child, parent) {
162                 return false
163         }
164         s := child[len(parent):]
165         if len(s) == 0 {
166                 return false
167         }
168         return s[0] == '/'
169 }
170
171 func (dn dirNode) ReadDirAll(ctx context.Context) (des []fuse.Dirent, err error) {
172         names := map[string]bool{}
173         for _, fi := range dn.metadata.Files {
174                 if !isSubPath(dn.path, strings.Join(fi.Path, "/")) {
175                         continue
176                 }
177                 name := fi.Path[len(dn.path)]
178                 if names[name] {
179                         continue
180                 }
181                 names[name] = true
182                 de := fuse.Dirent{
183                         Name: name,
184                 }
185                 if len(fi.Path) == len(dn.path)+1 {
186                         de.Type = fuse.DT_File
187                 } else {
188                         de.Type = fuse.DT_Dir
189                 }
190                 des = append(des, de)
191         }
192         return
193 }
194
195 func (dn dirNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) {
196         var torrentOffset int64
197         for _, fi := range dn.metadata.Files {
198                 if !isSubPath(dn.path, strings.Join(fi.Path, "/")) {
199                         torrentOffset += fi.Length
200                         continue
201                 }
202                 if fi.Path[len(dn.path)] != name {
203                         torrentOffset += fi.Length
204                         continue
205                 }
206                 __node := dn.node
207                 __node.path = path.Join(__node.path, name)
208                 if len(fi.Path) == len(dn.path)+1 {
209                         _node = fileNode{
210                                 node:          __node,
211                                 size:          uint64(fi.Length),
212                                 TorrentOffset: torrentOffset,
213                         }
214                 } else {
215                         _node = dirNode{__node}
216                 }
217                 break
218         }
219         if _node == nil {
220                 err = fuse.ENOENT
221         }
222         return
223 }
224
225 func (dn dirNode) Attr(ctx context.Context, attr *fuse.Attr) error {
226         attr.Mode = os.ModeDir | defaultMode
227         return nil
228 }
229
230 func (me rootNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) {
231         for _, t := range me.fs.Client.Torrents() {
232                 info := t.Info()
233                 if t.Name() != name || info == nil {
234                         continue
235                 }
236                 __node := node{
237                         metadata: info,
238                         FS:       me.fs,
239                         t:        t,
240                 }
241                 if !info.IsDir() {
242                         _node = fileNode{__node, uint64(info.Length), 0}
243                 } else {
244                         _node = dirNode{__node}
245                 }
246                 break
247         }
248         if _node == nil {
249                 err = fuse.ENOENT
250         }
251         return
252 }
253
254 func (me rootNode) ReadDirAll(ctx context.Context) (dirents []fuse.Dirent, err error) {
255         for _, t := range me.fs.Client.Torrents() {
256                 info := t.Info()
257                 if info == nil {
258                         continue
259                 }
260                 dirents = append(dirents, fuse.Dirent{
261                         Name: info.Name,
262                         Type: func() fuse.DirentType {
263                                 if !info.IsDir() {
264                                         return fuse.DT_File
265                                 } else {
266                                         return fuse.DT_Dir
267                                 }
268                         }(),
269                 })
270         }
271         return
272 }
273
274 func (rootNode) Attr(ctx context.Context, attr *fuse.Attr) error {
275         attr.Mode = os.ModeDir
276         return nil
277 }
278
279 // TODO(anacrolix): Why should rootNode implement this?
280 func (me rootNode) Forget() {
281         me.fs.Destroy()
282 }
283
284 func (tfs *TorrentFS) Root() (fusefs.Node, error) {
285         return rootNode{tfs}, nil
286 }
287
288 func (me *TorrentFS) Destroy() {
289         me.mu.Lock()
290         select {
291         case <-me.destroyed:
292         default:
293                 close(me.destroyed)
294         }
295         me.mu.Unlock()
296 }
297
298 func New(cl *torrent.Client) *TorrentFS {
299         fs := &TorrentFS{
300                 Client:    cl,
301                 destroyed: make(chan struct{}),
302         }
303         fs.event.L = &fs.mu
304         return fs
305 }