From: Matt Joiner Date: Sun, 27 Aug 2017 15:42:02 +0000 (+1000) Subject: Maintain a torrent.Reader for each file handle X-Git-Tag: v1.0.0~426^2 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=7d55f573f537e9b08290216cc119f9c2b489ca79;p=btrtrc.git Maintain a torrent.Reader for each file handle This means that readahead will work much better. Addresses https://github.com/anacrolix/torrent/issues/182. --- diff --git a/fs/file_handle.go b/fs/file_handle.go index 11a5d74d..592153c3 100644 --- a/fs/file_handle.go +++ b/fs/file_handle.go @@ -2,7 +2,11 @@ package torrentfs import ( "context" - "fmt" + "io" + "os" + + "github.com/anacrolix/missinggo" + "github.com/anacrolix/torrent" "bazil.org/fuse" "bazil.org/fuse/fs" @@ -10,34 +14,63 @@ import ( type fileHandle struct { fn fileNode + r *torrent.Reader } -var _ fs.HandleReader = fileHandle{} +var _ interface { + fs.HandleReader + fs.HandleReleaser +} = fileHandle{} func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { torrentfsReadRequests.Add(1) if req.Dir { panic("read on directory") } - size := req.Size - fileLeft := int64(me.fn.size) - req.Offset - if fileLeft < 0 { - fileLeft = 0 - } - if fileLeft < int64(size) { - size = int(fileLeft) - } - resp.Data = resp.Data[:size] - if len(resp.Data) == 0 { - return nil - } - torrentOff := me.fn.TorrentOffset + req.Offset - n, err := readFull(ctx, me.fn.FS, me.fn.t, torrentOff, resp.Data) + pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, os.SEEK_SET) if err != nil { - return err + panic(err) } - if n != size { - panic(fmt.Sprintf("%d < %d", n, size)) + if pos != me.fn.TorrentOffset+req.Offset { + panic("seek failed") } - return nil + resp.Data = resp.Data[:req.Size] + readDone := make(chan struct{}) + ctx, cancel := context.WithCancel(ctx) + var readErr error + go func() { + defer close(readDone) + me.fn.FS.mu.Lock() + me.fn.FS.blockedReads++ + me.fn.FS.event.Broadcast() + me.fn.FS.mu.Unlock() + var n int + r := missinggo.ContextedReader{me.r, ctx} + n, readErr = r.Read(resp.Data) + if readErr == io.EOF { + readErr = nil + } + resp.Data = resp.Data[:n] + }() + defer func() { + <-readDone + me.fn.FS.mu.Lock() + me.fn.FS.blockedReads-- + me.fn.FS.event.Broadcast() + me.fn.FS.mu.Unlock() + }() + defer cancel() + + select { + case <-readDone: + return readErr + case <-me.fn.FS.destroyed: + return fuse.EIO + case <-ctx.Done(): + return fuse.EINTR + } +} + +func (me fileHandle) Release(context.Context, *fuse.ReleaseRequest) error { + return me.r.Close() } diff --git a/fs/filenode.go b/fs/filenode.go index 2c83fbf1..301d92d3 100644 --- a/fs/filenode.go +++ b/fs/filenode.go @@ -1,6 +1,8 @@ package torrentfs import ( + "os" + "bazil.org/fuse" fusefs "bazil.org/fuse/fs" "golang.org/x/net/context" @@ -23,5 +25,7 @@ func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error { } func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) { - return fileHandle{fn}, nil + r := fn.t.NewReader() + r.Seek(fn.TorrentOffset, os.SEEK_SET) + return fileHandle{fn, r}, nil } diff --git a/fs/torrentfs.go b/fs/torrentfs.go index 0d9244b8..edd644a4 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -2,7 +2,6 @@ package torrentfs import ( "expvar" - "io" "os" "path" "strings" @@ -57,56 +56,6 @@ func (n *node) fsPath() string { return "/" + n.metadata.Name + "/" + n.path } -func blockingRead(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) { - fs.mu.Lock() - fs.blockedReads++ - fs.event.Broadcast() - fs.mu.Unlock() - var ( - _n int - _err error - ) - readDone := make(chan struct{}) - go func() { - defer close(readDone) - r := t.NewReader() - defer r.Close() - _, _err = r.Seek(off, os.SEEK_SET) - if _err != nil { - return - } - _n, _err = io.ReadFull(r, p) - }() - select { - case <-readDone: - n = _n - err = _err - case <-fs.destroyed: - err = fuse.EIO - case <-ctx.Done(): - err = fuse.EINTR - } - fs.mu.Lock() - fs.blockedReads-- - fs.event.Broadcast() - fs.mu.Unlock() - return -} - -func readFull(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) { - for len(p) != 0 { - var nn int - nn, err = blockingRead(ctx, fs, t, off, p) - if err != nil { - break - } - n += nn - off += int64(nn) - p = p[nn:] - } - return -} - type dirNode struct { node } diff --git a/reader.go b/reader.go index 33ed4017..7da3b48a 100644 --- a/reader.go +++ b/reader.go @@ -119,10 +119,10 @@ func (r *Reader) piecesUncached() (ret pieceRange) { } func (r *Reader) Read(b []byte) (n int, err error) { - return r.ReadContext(b, context.Background()) + return r.ReadContext(context.Background(), b) } -func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) { +func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { // This is set under the Client lock if the Context is canceled. var ctxErr error if ctx.Done() != nil { diff --git a/reader_test.go b/reader_test.go index dcda4152..f56e9473 100644 --- a/reader_test.go +++ b/reader_test.go @@ -20,6 +20,6 @@ func TestReaderReadContext(t *testing.T) { ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond)) r := tt.NewReader() defer r.Close() - _, err = r.ReadContext(make([]byte, 1), ctx) + _, err = r.ReadContext(ctx, make([]byte, 1)) require.EqualValues(t, context.DeadlineExceeded, err) }