This means that readahead will work much better. Addresses https://github.com/anacrolix/torrent/issues/182.
import (
"context"
- "fmt"
+ "io"
+ "os"
+
+ "github.com/anacrolix/missinggo"
+ "github.com/anacrolix/torrent"
"bazil.org/fuse"
"bazil.org/fuse/fs"
type fileHandle struct {
fn fileNode
+ r *torrent.Reader
}
-var _ fs.HandleReader = fileHandle{}
+var _ interface {
+ fs.HandleReader
+ fs.HandleReleaser
+} = fileHandle{}
func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
torrentfsReadRequests.Add(1)
if req.Dir {
panic("read on directory")
}
- size := req.Size
- fileLeft := int64(me.fn.size) - req.Offset
- if fileLeft < 0 {
- fileLeft = 0
- }
- if fileLeft < int64(size) {
- size = int(fileLeft)
- }
- resp.Data = resp.Data[:size]
- if len(resp.Data) == 0 {
- return nil
- }
- torrentOff := me.fn.TorrentOffset + req.Offset
- n, err := readFull(ctx, me.fn.FS, me.fn.t, torrentOff, resp.Data)
+ pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, os.SEEK_SET)
if err != nil {
- return err
+ panic(err)
}
- if n != size {
- panic(fmt.Sprintf("%d < %d", n, size))
+ if pos != me.fn.TorrentOffset+req.Offset {
+ panic("seek failed")
}
- return nil
+ resp.Data = resp.Data[:req.Size]
+ readDone := make(chan struct{})
+ ctx, cancel := context.WithCancel(ctx)
+ var readErr error
+ go func() {
+ defer close(readDone)
+ me.fn.FS.mu.Lock()
+ me.fn.FS.blockedReads++
+ me.fn.FS.event.Broadcast()
+ me.fn.FS.mu.Unlock()
+ var n int
+ r := missinggo.ContextedReader{me.r, ctx}
+ n, readErr = r.Read(resp.Data)
+ if readErr == io.EOF {
+ readErr = nil
+ }
+ resp.Data = resp.Data[:n]
+ }()
+ defer func() {
+ <-readDone
+ me.fn.FS.mu.Lock()
+ me.fn.FS.blockedReads--
+ me.fn.FS.event.Broadcast()
+ me.fn.FS.mu.Unlock()
+ }()
+ defer cancel()
+
+ select {
+ case <-readDone:
+ return readErr
+ case <-me.fn.FS.destroyed:
+ return fuse.EIO
+ case <-ctx.Done():
+ return fuse.EINTR
+ }
+}
+
+func (me fileHandle) Release(context.Context, *fuse.ReleaseRequest) error {
+ return me.r.Close()
}
package torrentfs
import (
+ "os"
+
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"golang.org/x/net/context"
}
func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) {
- return fileHandle{fn}, nil
+ r := fn.t.NewReader()
+ r.Seek(fn.TorrentOffset, os.SEEK_SET)
+ return fileHandle{fn, r}, nil
}
import (
"expvar"
- "io"
"os"
"path"
"strings"
return "/" + n.metadata.Name + "/" + n.path
}
-func blockingRead(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) {
- fs.mu.Lock()
- fs.blockedReads++
- fs.event.Broadcast()
- fs.mu.Unlock()
- var (
- _n int
- _err error
- )
- readDone := make(chan struct{})
- go func() {
- defer close(readDone)
- r := t.NewReader()
- defer r.Close()
- _, _err = r.Seek(off, os.SEEK_SET)
- if _err != nil {
- return
- }
- _n, _err = io.ReadFull(r, p)
- }()
- select {
- case <-readDone:
- n = _n
- err = _err
- case <-fs.destroyed:
- err = fuse.EIO
- case <-ctx.Done():
- err = fuse.EINTR
- }
- fs.mu.Lock()
- fs.blockedReads--
- fs.event.Broadcast()
- fs.mu.Unlock()
- return
-}
-
-func readFull(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) {
- for len(p) != 0 {
- var nn int
- nn, err = blockingRead(ctx, fs, t, off, p)
- if err != nil {
- break
- }
- n += nn
- off += int64(nn)
- p = p[nn:]
- }
- return
-}
-
type dirNode struct {
node
}
}
func (r *Reader) Read(b []byte) (n int, err error) {
- return r.ReadContext(b, context.Background())
+ return r.ReadContext(context.Background(), b)
}
-func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
+func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
// This is set under the Client lock if the Context is canceled.
var ctxErr error
if ctx.Done() != nil {
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond))
r := tt.NewReader()
defer r.Close()
- _, err = r.ReadContext(make([]byte, 1), ctx)
+ _, err = r.ReadContext(ctx, make([]byte, 1))
require.EqualValues(t, context.DeadlineExceeded, err)
}