]> Sergey Matveev's repositories - btrtrc.git/blob - fs/torrentfs.go
Piece priorities, torrent read interface and many fixes
[btrtrc.git] / fs / torrentfs.go
1 package torrentfs
2
3 import (
4         "expvar"
5         "fmt"
6         "log"
7         "os"
8         "strings"
9         "sync"
10         "time"
11
12         "bazil.org/fuse"
13         fusefs "bazil.org/fuse/fs"
14         "bitbucket.org/anacrolix/go.torrent"
15 )
16
17 const (
18         defaultMode = 0555
19 )
20
21 var (
22         torrentfsReadRequests        = expvar.NewInt("torrentfsReadRequests")
23         torrentfsDelayedReadRequests = expvar.NewInt("torrentfsDelayedReadRequests")
24         interruptedReads             = expvar.NewInt("interruptedReads")
25 )
26
27 type TorrentFS struct {
28         Client    *torrent.Client
29         destroyed chan struct{}
30         mu        sync.Mutex
31 }
32
33 var (
34         _ fusefs.FSDestroyer = &TorrentFS{}
35         _ fusefs.FSIniter    = &TorrentFS{}
36 )
37
38 func (fs *TorrentFS) Init(req *fuse.InitRequest, resp *fuse.InitResponse, intr fusefs.Intr) fuse.Error {
39         log.Print(req)
40         log.Print(resp)
41         resp.MaxReadahead = req.MaxReadahead
42         resp.Flags |= fuse.InitAsyncRead
43         return nil
44 }
45
46 var _ fusefs.NodeForgetter = rootNode{}
47
48 type rootNode struct {
49         fs *TorrentFS
50 }
51
52 type node struct {
53         path     []string
54         metadata *torrent.MetaInfo
55         FS       *TorrentFS
56         t        torrent.Torrent
57 }
58
59 type fileNode struct {
60         node
61         size          uint64
62         TorrentOffset int64
63 }
64
65 func (fn fileNode) Attr() (attr fuse.Attr) {
66         attr.Size = fn.size
67         attr.Mode = defaultMode
68         return
69 }
70
71 func (n *node) fsPath() string {
72         return "/" + strings.Join(append([]string{n.metadata.Name}, n.path...), "/")
73 }
74
75 func blockingRead(fs *TorrentFS, t torrent.Torrent, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) {
76         var (
77                 _n   int
78                 _err fuse.Error
79         )
80         readDone := make(chan struct{})
81         go func() {
82                 _n, _err = t.ReadAt(p, off)
83                 close(readDone)
84         }()
85         select {
86         case <-readDone:
87                 n = _n
88                 err = _err
89         case <-fs.destroyed:
90                 err = fuse.EIO
91         case <-intr:
92                 err = fuse.EINTR
93         }
94         return
95 }
96
97 func readFull(fs *TorrentFS, t torrent.Torrent, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) {
98         for len(p) != 0 {
99                 var nn int
100                 nn, err = blockingRead(fs, t, off, p, intr)
101                 if err != nil {
102                         break
103                 }
104                 n += nn
105                 off += int64(nn)
106                 p = p[nn:]
107         }
108         return
109 }
110
111 func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fusefs.Intr) fuse.Error {
112         torrentfsReadRequests.Add(1)
113         started := time.Now()
114         if req.Dir {
115                 panic("read on directory")
116         }
117         defer func() {
118                 ms := time.Now().Sub(started).Nanoseconds() / 1000000
119                 if ms < 20 {
120                         return
121                 }
122                 log.Printf("torrentfs read took %dms", ms)
123         }()
124         size := req.Size
125         fileLeft := int64(fn.size) - req.Offset
126         if fileLeft < 0 {
127                 fileLeft = 0
128         }
129         if fileLeft < int64(size) {
130                 size = int(fileLeft)
131         }
132         resp.Data = resp.Data[:size]
133         if len(resp.Data) == 0 {
134                 return nil
135         }
136         torrentOff := fn.TorrentOffset + req.Offset
137         n, err := readFull(fn.FS, fn.t, torrentOff, resp.Data, intr)
138         if err != nil {
139                 return err
140         }
141         if n != size {
142                 panic(fmt.Sprintf("%d < %d", n, size))
143         }
144         return nil
145 }
146
147 type dirNode struct {
148         node
149 }
150
151 var (
152         _ fusefs.HandleReadDirer = dirNode{}
153         _ fusefs.HandleReader    = fileNode{}
154 )
155
156 func isSubPath(parent, child []string) bool {
157         if len(child) <= len(parent) {
158                 return false
159         }
160         for i := range parent {
161                 if parent[i] != child[i] {
162                         return false
163                 }
164         }
165         return true
166 }
167
168 func (dn dirNode) ReadDir(intr fusefs.Intr) (des []fuse.Dirent, err fuse.Error) {
169         names := map[string]bool{}
170         for _, fi := range dn.metadata.Files {
171                 if !isSubPath(dn.path, fi.Path) {
172                         continue
173                 }
174                 name := fi.Path[len(dn.path)]
175                 if names[name] {
176                         continue
177                 }
178                 names[name] = true
179                 de := fuse.Dirent{
180                         Name: name,
181                 }
182                 if len(fi.Path) == len(dn.path)+1 {
183                         de.Type = fuse.DT_File
184                 } else {
185                         de.Type = fuse.DT_Dir
186                 }
187                 des = append(des, de)
188         }
189         return
190 }
191
192 func (dn dirNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) {
193         var torrentOffset int64
194         for _, fi := range dn.metadata.Files {
195                 if !isSubPath(dn.path, fi.Path) {
196                         torrentOffset += fi.Length
197                         continue
198                 }
199                 if fi.Path[len(dn.path)] != name {
200                         torrentOffset += fi.Length
201                         continue
202                 }
203                 __node := dn.node
204                 __node.path = append(__node.path, name)
205                 if len(fi.Path) == len(dn.path)+1 {
206                         _node = fileNode{
207                                 node:          __node,
208                                 size:          uint64(fi.Length),
209                                 TorrentOffset: torrentOffset,
210                         }
211                 } else {
212                         _node = dirNode{__node}
213                 }
214                 break
215         }
216         if _node == nil {
217                 err = fuse.ENOENT
218         }
219         return
220 }
221
222 func (dn dirNode) Attr() (attr fuse.Attr) {
223         attr.Mode = os.ModeDir | defaultMode
224         return
225 }
226
227 func (me rootNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) {
228         for _, t := range me.fs.Client.Torrents() {
229                 if t.Name() != name || t.Info == nil {
230                         continue
231                 }
232                 __node := node{
233                         metadata: t.Info,
234                         FS:       me.fs,
235                         t:        t,
236                 }
237                 if t.Info.SingleFile() {
238                         _node = fileNode{__node, uint64(t.Info.Length), 0}
239                 } else {
240                         _node = dirNode{__node}
241                 }
242                 break
243         }
244         if _node == nil {
245                 err = fuse.ENOENT
246         }
247         return
248 }
249
250 func (me rootNode) ReadDir(intr fusefs.Intr) (dirents []fuse.Dirent, err fuse.Error) {
251         for _, t := range me.fs.Client.Torrents() {
252                 if t.Info == nil {
253                         continue
254                 }
255                 dirents = append(dirents, fuse.Dirent{
256                         Name: t.Info.Name,
257                         Type: func() fuse.DirentType {
258                                 if t.Info.SingleFile() {
259                                         return fuse.DT_File
260                                 } else {
261                                         return fuse.DT_Dir
262                                 }
263                         }(),
264                 })
265         }
266         return
267 }
268
269 func (rootNode) Attr() fuse.Attr {
270         return fuse.Attr{
271                 Mode: os.ModeDir,
272         }
273 }
274
275 // TODO(anacrolix): Why should rootNode implement this?
276 func (me rootNode) Forget() {
277         me.fs.Destroy()
278 }
279
280 func (tfs *TorrentFS) Root() (fusefs.Node, fuse.Error) {
281         return rootNode{tfs}, nil
282 }
283
284 func (me *TorrentFS) Destroy() {
285         me.mu.Lock()
286         select {
287         case <-me.destroyed:
288         default:
289                 close(me.destroyed)
290         }
291         me.mu.Unlock()
292 }
293
294 func New(cl *torrent.Client) *TorrentFS {
295         fs := &TorrentFS{
296                 Client:    cl,
297                 destroyed: make(chan struct{}),
298         }
299         return fs
300 }