]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Maintain a torrent.Reader for each file handle
authorMatt Joiner <anacrolix@gmail.com>
Sun, 27 Aug 2017 15:42:02 +0000 (01:42 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 27 Aug 2017 15:42:02 +0000 (01:42 +1000)
This means that readahead will work much better. Addresses https://github.com/anacrolix/torrent/issues/182.

fs/file_handle.go
fs/filenode.go
fs/torrentfs.go
reader.go
reader_test.go

index 11a5d74d2a10cb524fb08a2c567e45ca6162fc21..592153c374667fdb3d95ca5a0d4d348f62cb9a93 100644 (file)
@@ -2,7 +2,11 @@ package torrentfs
 
 import (
        "context"
-       "fmt"
+       "io"
+       "os"
+
+       "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/torrent"
 
        "bazil.org/fuse"
        "bazil.org/fuse/fs"
@@ -10,34 +14,63 @@ import (
 
 type fileHandle struct {
        fn fileNode
+       r  *torrent.Reader
 }
 
-var _ fs.HandleReader = fileHandle{}
+var _ interface {
+       fs.HandleReader
+       fs.HandleReleaser
+} = fileHandle{}
 
 func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
        torrentfsReadRequests.Add(1)
        if req.Dir {
                panic("read on directory")
        }
-       size := req.Size
-       fileLeft := int64(me.fn.size) - req.Offset
-       if fileLeft < 0 {
-               fileLeft = 0
-       }
-       if fileLeft < int64(size) {
-               size = int(fileLeft)
-       }
-       resp.Data = resp.Data[:size]
-       if len(resp.Data) == 0 {
-               return nil
-       }
-       torrentOff := me.fn.TorrentOffset + req.Offset
-       n, err := readFull(ctx, me.fn.FS, me.fn.t, torrentOff, resp.Data)
+       pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, os.SEEK_SET)
        if err != nil {
-               return err
+               panic(err)
        }
-       if n != size {
-               panic(fmt.Sprintf("%d < %d", n, size))
+       if pos != me.fn.TorrentOffset+req.Offset {
+               panic("seek failed")
        }
-       return nil
+       resp.Data = resp.Data[:req.Size]
+       readDone := make(chan struct{})
+       ctx, cancel := context.WithCancel(ctx)
+       var readErr error
+       go func() {
+               defer close(readDone)
+               me.fn.FS.mu.Lock()
+               me.fn.FS.blockedReads++
+               me.fn.FS.event.Broadcast()
+               me.fn.FS.mu.Unlock()
+               var n int
+               r := missinggo.ContextedReader{me.r, ctx}
+               n, readErr = r.Read(resp.Data)
+               if readErr == io.EOF {
+                       readErr = nil
+               }
+               resp.Data = resp.Data[:n]
+       }()
+       defer func() {
+               <-readDone
+               me.fn.FS.mu.Lock()
+               me.fn.FS.blockedReads--
+               me.fn.FS.event.Broadcast()
+               me.fn.FS.mu.Unlock()
+       }()
+       defer cancel()
+
+       select {
+       case <-readDone:
+               return readErr
+       case <-me.fn.FS.destroyed:
+               return fuse.EIO
+       case <-ctx.Done():
+               return fuse.EINTR
+       }
+}
+
+func (me fileHandle) Release(context.Context, *fuse.ReleaseRequest) error {
+       return me.r.Close()
 }
index 2c83fbf1946435cb405277dd7979c0dad98b188c..301d92d391458f7f5bf68049114f205bb4d3e847 100644 (file)
@@ -1,6 +1,8 @@
 package torrentfs
 
 import (
+       "os"
+
        "bazil.org/fuse"
        fusefs "bazil.org/fuse/fs"
        "golang.org/x/net/context"
@@ -23,5 +25,7 @@ func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
 }
 
 func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) {
-       return fileHandle{fn}, nil
+       r := fn.t.NewReader()
+       r.Seek(fn.TorrentOffset, os.SEEK_SET)
+       return fileHandle{fn, r}, nil
 }
index 0d9244b803c2a054bf6a8dc7178ff76d82c5267f..edd644a4394f60c0499abf88aaf1885ac3d0caa0 100644 (file)
@@ -2,7 +2,6 @@ package torrentfs
 
 import (
        "expvar"
-       "io"
        "os"
        "path"
        "strings"
@@ -57,56 +56,6 @@ func (n *node) fsPath() string {
        return "/" + n.metadata.Name + "/" + n.path
 }
 
-func blockingRead(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) {
-       fs.mu.Lock()
-       fs.blockedReads++
-       fs.event.Broadcast()
-       fs.mu.Unlock()
-       var (
-               _n   int
-               _err error
-       )
-       readDone := make(chan struct{})
-       go func() {
-               defer close(readDone)
-               r := t.NewReader()
-               defer r.Close()
-               _, _err = r.Seek(off, os.SEEK_SET)
-               if _err != nil {
-                       return
-               }
-               _n, _err = io.ReadFull(r, p)
-       }()
-       select {
-       case <-readDone:
-               n = _n
-               err = _err
-       case <-fs.destroyed:
-               err = fuse.EIO
-       case <-ctx.Done():
-               err = fuse.EINTR
-       }
-       fs.mu.Lock()
-       fs.blockedReads--
-       fs.event.Broadcast()
-       fs.mu.Unlock()
-       return
-}
-
-func readFull(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) {
-       for len(p) != 0 {
-               var nn int
-               nn, err = blockingRead(ctx, fs, t, off, p)
-               if err != nil {
-                       break
-               }
-               n += nn
-               off += int64(nn)
-               p = p[nn:]
-       }
-       return
-}
-
 type dirNode struct {
        node
 }
index 33ed4017dbc46e70ed49a552317b432d3ee244b1..7da3b48a11782d9b7dc2e2e1cc44a05af68c0365 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -119,10 +119,10 @@ func (r *Reader) piecesUncached() (ret pieceRange) {
 }
 
 func (r *Reader) Read(b []byte) (n int, err error) {
-       return r.ReadContext(b, context.Background())
+       return r.ReadContext(context.Background(), b)
 }
 
-func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
+func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
        // This is set under the Client lock if the Context is canceled.
        var ctxErr error
        if ctx.Done() != nil {
index dcda4152f3c1f1824e8c956428fe805bbc6af6b4..f56e9473b9f6184f7433edbeed55b9ca0ecf0705 100644 (file)
@@ -20,6 +20,6 @@ func TestReaderReadContext(t *testing.T) {
        ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond))
        r := tt.NewReader()
        defer r.Close()
-       _, err = r.ReadContext(make([]byte, 1), ctx)
+       _, err = r.ReadContext(ctx, make([]byte, 1))
        require.EqualValues(t, context.DeadlineExceeded, err)
 }