From 7d55f573f537e9b08290216cc119f9c2b489ca79 Mon Sep 17 00:00:00 2001
From: Matt Joiner <anacrolix@gmail.com>
Date: Mon, 28 Aug 2017 01:42:02 +1000
Subject: [PATCH] Maintain a torrent.Reader for each file handle

This means that readahead will work much better. Addresses https://github.com/anacrolix/torrent/issues/182.
---
 fs/file_handle.go | 73 ++++++++++++++++++++++++++++++++++-------------
 fs/filenode.go    |  6 +++-
 fs/torrentfs.go   | 51 ---------------------------------
 reader.go         |  4 +--
 reader_test.go    |  2 +-
 5 files changed, 61 insertions(+), 75 deletions(-)

diff --git a/fs/file_handle.go b/fs/file_handle.go
index 11a5d74d..592153c3 100644
--- a/fs/file_handle.go
+++ b/fs/file_handle.go
@@ -2,7 +2,11 @@ package torrentfs
 
 import (
 	"context"
-	"fmt"
+	"io"
+	"os"
+
+	"github.com/anacrolix/missinggo"
+	"github.com/anacrolix/torrent"
 
 	"bazil.org/fuse"
 	"bazil.org/fuse/fs"
@@ -10,34 +14,63 @@ import (
 
 type fileHandle struct {
 	fn fileNode
+	r  *torrent.Reader
 }
 
-var _ fs.HandleReader = fileHandle{}
+var _ interface {
+	fs.HandleReader
+	fs.HandleReleaser
+} = fileHandle{}
 
 func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
 	torrentfsReadRequests.Add(1)
 	if req.Dir {
 		panic("read on directory")
 	}
-	size := req.Size
-	fileLeft := int64(me.fn.size) - req.Offset
-	if fileLeft < 0 {
-		fileLeft = 0
-	}
-	if fileLeft < int64(size) {
-		size = int(fileLeft)
-	}
-	resp.Data = resp.Data[:size]
-	if len(resp.Data) == 0 {
-		return nil
-	}
-	torrentOff := me.fn.TorrentOffset + req.Offset
-	n, err := readFull(ctx, me.fn.FS, me.fn.t, torrentOff, resp.Data)
+	pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, os.SEEK_SET)
 	if err != nil {
-		return err
+		panic(err)
 	}
-	if n != size {
-		panic(fmt.Sprintf("%d < %d", n, size))
+	if pos != me.fn.TorrentOffset+req.Offset {
+		panic("seek failed")
 	}
-	return nil
+	resp.Data = resp.Data[:req.Size]
+	readDone := make(chan struct{})
+	ctx, cancel := context.WithCancel(ctx)
+	var readErr error
+	go func() {
+		defer close(readDone)
+		me.fn.FS.mu.Lock()
+		me.fn.FS.blockedReads++
+		me.fn.FS.event.Broadcast()
+		me.fn.FS.mu.Unlock()
+		var n int
+		r := missinggo.ContextedReader{me.r, ctx}
+		n, readErr = r.Read(resp.Data)
+		if readErr == io.EOF {
+			readErr = nil
+		}
+		resp.Data = resp.Data[:n]
+	}()
+	defer func() {
+		<-readDone
+		me.fn.FS.mu.Lock()
+		me.fn.FS.blockedReads--
+		me.fn.FS.event.Broadcast()
+		me.fn.FS.mu.Unlock()
+	}()
+	defer cancel()
+
+	select {
+	case <-readDone:
+		return readErr
+	case <-me.fn.FS.destroyed:
+		return fuse.EIO
+	case <-ctx.Done():
+		return fuse.EINTR
+	}
+}
+
+func (me fileHandle) Release(context.Context, *fuse.ReleaseRequest) error {
+	return me.r.Close()
 }
diff --git a/fs/filenode.go b/fs/filenode.go
index 2c83fbf1..301d92d3 100644
--- a/fs/filenode.go
+++ b/fs/filenode.go
@@ -1,6 +1,8 @@
 package torrentfs
 
 import (
+	"os"
+
 	"bazil.org/fuse"
 	fusefs "bazil.org/fuse/fs"
 	"golang.org/x/net/context"
@@ -23,5 +25,7 @@ func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
 }
 
 func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) {
-	return fileHandle{fn}, nil
+	r := fn.t.NewReader()
+	r.Seek(fn.TorrentOffset, os.SEEK_SET)
+	return fileHandle{fn, r}, nil
 }
diff --git a/fs/torrentfs.go b/fs/torrentfs.go
index 0d9244b8..edd644a4 100644
--- a/fs/torrentfs.go
+++ b/fs/torrentfs.go
@@ -2,7 +2,6 @@ package torrentfs
 
 import (
 	"expvar"
-	"io"
 	"os"
 	"path"
 	"strings"
@@ -57,56 +56,6 @@ func (n *node) fsPath() string {
 	return "/" + n.metadata.Name + "/" + n.path
 }
 
-func blockingRead(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) {
-	fs.mu.Lock()
-	fs.blockedReads++
-	fs.event.Broadcast()
-	fs.mu.Unlock()
-	var (
-		_n   int
-		_err error
-	)
-	readDone := make(chan struct{})
-	go func() {
-		defer close(readDone)
-		r := t.NewReader()
-		defer r.Close()
-		_, _err = r.Seek(off, os.SEEK_SET)
-		if _err != nil {
-			return
-		}
-		_n, _err = io.ReadFull(r, p)
-	}()
-	select {
-	case <-readDone:
-		n = _n
-		err = _err
-	case <-fs.destroyed:
-		err = fuse.EIO
-	case <-ctx.Done():
-		err = fuse.EINTR
-	}
-	fs.mu.Lock()
-	fs.blockedReads--
-	fs.event.Broadcast()
-	fs.mu.Unlock()
-	return
-}
-
-func readFull(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) {
-	for len(p) != 0 {
-		var nn int
-		nn, err = blockingRead(ctx, fs, t, off, p)
-		if err != nil {
-			break
-		}
-		n += nn
-		off += int64(nn)
-		p = p[nn:]
-	}
-	return
-}
-
 type dirNode struct {
 	node
 }
diff --git a/reader.go b/reader.go
index 33ed4017..7da3b48a 100644
--- a/reader.go
+++ b/reader.go
@@ -119,10 +119,10 @@ func (r *Reader) piecesUncached() (ret pieceRange) {
 }
 
 func (r *Reader) Read(b []byte) (n int, err error) {
-	return r.ReadContext(b, context.Background())
+	return r.ReadContext(context.Background(), b)
 }
 
-func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
+func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
 	// This is set under the Client lock if the Context is canceled.
 	var ctxErr error
 	if ctx.Done() != nil {
diff --git a/reader_test.go b/reader_test.go
index dcda4152..f56e9473 100644
--- a/reader_test.go
+++ b/reader_test.go
@@ -20,6 +20,6 @@ func TestReaderReadContext(t *testing.T) {
 	ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond))
 	r := tt.NewReader()
 	defer r.Close()
-	_, err = r.ReadContext(make([]byte, 1), ctx)
+	_, err = r.ReadContext(ctx, make([]byte, 1))
 	require.EqualValues(t, context.DeadlineExceeded, err)
 }
-- 
2.51.0