]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add File.NewReader
authorMatt Joiner <anacrolix@gmail.com>
Sat, 6 Jan 2018 05:37:13 +0000 (16:37 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 6 Jan 2018 05:37:13 +0000 (16:37 +1100)
Fixes #222

cmd/torrent-pick/main.go
example_test.go
file.go
file_reader.go [new file with mode: 0644]
fs/file_handle.go
fs/filenode.go
fs/torrentfs.go
reader.go
reader_test.go
t.go
torrent.go

index be590b8b4401bf1a97d80afb09b86663b384f083..8ff22455f9f7b8ed35e8be3a1a1d70e86d1eab48 100644 (file)
@@ -15,7 +15,6 @@ import (
        "time"
 
        _ "github.com/anacrolix/envpprof"
-       "github.com/anacrolix/missinggo"
        "github.com/dustin/go-humanize"
        "github.com/jessevdk/go-flags"
 
@@ -163,7 +162,7 @@ func main() {
                                if file.DisplayPath() != rootGroup.Pick {
                                        continue
                                }
-                               srcReader := missinggo.NewSectionReadSeeker(t.NewReader(), file.Offset(), file.Length())
+                               srcReader := file.NewReader()
                                io.Copy(dstWriter, srcReader)
                                return
                        }
index 5c58e4d93a59d91cfa351904e99e31a9c4ee662c..54cb7190689081ac8020a24aeca054897a79ce13 100644 (file)
@@ -3,8 +3,6 @@ package torrent_test
 import (
        "log"
 
-       "github.com/anacrolix/missinggo"
-
        "github.com/anacrolix/torrent"
 )
 
@@ -19,13 +17,9 @@ func Example() {
 }
 
 func Example_fileReader() {
-       var (
-               t *torrent.Torrent
-               f torrent.File
-       )
-       r := t.NewReader()
-       defer r.Close()
-       // Access the parts of the torrent pertaining to f. Data will be
+       var f torrent.File
+       // Accesses the parts of the torrent pertaining to f. Data will be
        // downloaded as required, per the configuration of the torrent.Reader.
-       _ = missinggo.NewSectionReadSeeker(r, f.Offset(), f.Length())
+       r := f.NewReader()
+       defer r.Close()
 }
diff --git a/file.go b/file.go
index 720e4fb510e37396bd715b8afee172cb03616754..e71cee6cde8bc3e24f7ae91df1206e505507e05c 100644 (file)
--- a/file.go
+++ b/file.go
@@ -3,6 +3,7 @@ package torrent
 import (
        "strings"
 
+       "github.com/anacrolix/missinggo"
        "github.com/anacrolix/torrent/metainfo"
 )
 
@@ -99,3 +100,8 @@ func (f *File) exclusivePieces() (begin, end int) {
 func (f *File) Cancel() {
        f.t.CancelPieces(f.exclusivePieces())
 }
+
+func (f *File) NewReader() Reader {
+       tr := f.t.NewReader()
+       return fileReader{missinggo.NewSectionReadSeeker(tr, f.Offset(), f.Length()), tr}
+}
diff --git a/file_reader.go b/file_reader.go
new file mode 100644 (file)
index 0000000..dde4362
--- /dev/null
@@ -0,0 +1,18 @@
+package torrent
+
+import (
+       "io"
+
+       "github.com/anacrolix/missinggo"
+)
+
+type fileReaderInherited interface {
+       io.Closer
+       SetReadahead(int64)
+       SetResponsive()
+}
+
+type fileReader struct {
+       missinggo.ReadSeekContexter
+       fileReaderInherited
+}
index 3405cce7f1c2c5bf5f25b4c228b7f4f251b616d4..41517296b2a0680ff4e59b2c5d49b3607fd8fbc7 100644 (file)
@@ -13,7 +13,7 @@ import (
 
 type fileHandle struct {
        fn fileNode
-       r  *torrent.Reader
+       r  torrent.Reader
 }
 
 var _ interface {
@@ -26,11 +26,11 @@ func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse
        if req.Dir {
                panic("read on directory")
        }
-       pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, io.SeekStart)
+       pos, err := me.r.Seek(req.Offset, io.SeekStart)
        if err != nil {
                panic(err)
        }
-       if pos != me.fn.TorrentOffset+req.Offset {
+       if pos != req.Offset {
                panic("seek failed")
        }
        resp.Data = resp.Data[:req.Size]
index db8507559c900c8fd42ad476e35805bf3967870c..d0e4fe3fbb49bc333dc84aaafc5a285e6772ee5f 100644 (file)
@@ -1,17 +1,15 @@
 package torrentfs
 
 import (
-       "io"
-
        "bazil.org/fuse"
        fusefs "bazil.org/fuse/fs"
+       "github.com/anacrolix/torrent"
        "golang.org/x/net/context"
 )
 
 type fileNode struct {
        node
-       size          uint64
-       TorrentOffset int64
+       f *torrent.File
 }
 
 var (
@@ -19,13 +17,12 @@ var (
 )
 
 func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
-       attr.Size = fn.size
+       attr.Size = uint64(fn.f.Length())
        attr.Mode = defaultMode
        return nil
 }
 
 func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) {
-       r := fn.t.NewReader()
-       r.Seek(fn.TorrentOffset, io.SeekStart)
+       r := fn.f.NewReader()
        return fileHandle{fn, r}, nil
 }
index 32ead7fb04e82a0d3c45c4737f0cc7d4182ddf2c..65cb42cef2efed3b1a6509c32804d29917b64daf 100644 (file)
@@ -3,7 +3,6 @@ package torrentfs
 import (
        "expvar"
        "os"
-       "path"
        "strings"
        "sync"
 
@@ -67,11 +66,12 @@ func isSubPath(parent, child string) bool {
        if !strings.HasPrefix(child, parent) {
                return false
        }
-       s := child[len(parent):]
-       if len(s) == 0 {
+       extra := child[len(parent):]
+       if len(extra) == 0 {
                return false
        }
-       return s[0] == '/'
+       // Not just a file with more stuff on the end.
+       return extra[0] == '/'
 }
 
 func (dn dirNode) ReadDirAll(ctx context.Context) (des []fuse.Dirent, err error) {
@@ -98,34 +98,30 @@ func (dn dirNode) ReadDirAll(ctx context.Context) (des []fuse.Dirent, err error)
        return
 }
 
-func (dn dirNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) {
-       var torrentOffset int64
-       for _, fi := range dn.metadata.Files {
-               if !isSubPath(dn.path, strings.Join(fi.Path, "/")) {
-                       torrentOffset += fi.Length
-                       continue
+func (dn dirNode) Lookup(_ context.Context, name string) (fusefs.Node, error) {
+       dir := false
+       var file *torrent.File
+       fullPath := dn.path + "/" + name
+       for _, f := range dn.t.Files() {
+               if f.DisplayPath() == fullPath {
+                       file = &f
                }
-               if fi.Path[len(dn.path)] != name {
-                       torrentOffset += fi.Length
-                       continue
-               }
-               __node := dn.node
-               __node.path = path.Join(__node.path, name)
-               if len(fi.Path) == len(dn.path)+1 {
-                       _node = fileNode{
-                               node:          __node,
-                               size:          uint64(fi.Length),
-                               TorrentOffset: torrentOffset,
-                       }
-               } else {
-                       _node = dirNode{__node}
+               if isSubPath(fullPath, f.DisplayPath()) {
+                       dir = true
                }
-               break
        }
-       if _node == nil {
-               err = fuse.ENOENT
+       n := dn.node
+       n.path = fullPath
+       if dir && file != nil {
+               panic("both dir and file")
        }
-       return
+       if file != nil {
+               return fileNode{n, file}, nil
+       }
+       if dir {
+               return dirNode{n}, nil
+       }
+       return nil, fuse.ENOENT
 }
 
 func (dn dirNode) Attr(ctx context.Context, attr *fuse.Attr) error {
@@ -145,7 +141,7 @@ func (rn rootNode) Lookup(ctx context.Context, name string) (_node fusefs.Node,
                        t:        t,
                }
                if !info.IsDir() {
-                       _node = fileNode{__node, uint64(info.Length), 0}
+                       _node = fileNode{__node, &t.Files()[0]}
                } else {
                        _node = dirNode{__node}
                }
index bdfeed04047fa2001668dea64e87353d2ee7abcc..76c39e968307f08af9728c5ff050427f639ed46f 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -10,6 +10,15 @@ import (
        "golang.org/x/net/context"
 )
 
+type Reader interface {
+       io.Reader
+       io.Seeker
+       io.Closer
+       missinggo.ReadContexter
+       SetReadahead(int64)
+       SetResponsive()
+}
+
 // Piece range by piece index, [begin, end).
 type pieceRange struct {
        begin, end int
@@ -17,7 +26,7 @@ type pieceRange struct {
 
 // Accesses Torrent data via a Client. Reads block until the data is
 // available. Seeks and readahead also drive Client behaviour.
-type Reader struct {
+type reader struct {
        t          *Torrent
        responsive bool
        // Ensure operations that change the position are exclusive, like Read()
@@ -35,22 +44,22 @@ type Reader struct {
        pieces pieceRange
 }
 
-var _ io.ReadCloser = &Reader{}
+var _ io.ReadCloser = &reader{}
 
 // Don't wait for pieces to complete and be verified. Read calls return as
 // soon as they can when the underlying chunks become available.
-func (r *Reader) SetResponsive() {
+func (r *reader) SetResponsive() {
        r.responsive = true
 }
 
-// Disable responsive mode.
-func (r *Reader) SetNonResponsive() {
+// Disable responsive mode. TODO: Remove?
+func (r *reader) SetNonResponsive() {
        r.responsive = false
 }
 
 // Configure the number of bytes ahead of a read that should also be
 // prioritized in preparation for further reads.
-func (r *Reader) SetReadahead(readahead int64) {
+func (r *reader) SetReadahead(readahead int64) {
        r.mu.Lock()
        r.readahead = readahead
        r.mu.Unlock()
@@ -59,12 +68,7 @@ func (r *Reader) SetReadahead(readahead int64) {
        r.posChanged()
 }
 
-// Return reader's current position.
-func (r *Reader) CurrentPos() int64 {
-       return r.pos
-}
-
-func (r *Reader) readable(off int64) (ret bool) {
+func (r *reader) readable(off int64) (ret bool) {
        if r.t.closed.IsSet() {
                return true
        }
@@ -79,7 +83,7 @@ func (r *Reader) readable(off int64) (ret bool) {
 }
 
 // How many bytes are available to read. Max is the most we could require.
-func (r *Reader) available(off, max int64) (ret int64) {
+func (r *reader) available(off, max int64) (ret int64) {
        for max > 0 {
                req, ok := r.t.offsetRequest(off)
                if !ok {
@@ -100,7 +104,7 @@ func (r *Reader) available(off, max int64) (ret int64) {
        return
 }
 
-func (r *Reader) waitReadable(off int64) {
+func (r *reader) waitReadable(off int64) {
        // We may have been sent back here because we were told we could read but
        // it failed.
        r.t.cl.event.Wait()
@@ -108,7 +112,7 @@ func (r *Reader) waitReadable(off int64) {
 
 // Calculates the pieces this reader wants downloaded, ignoring the cached
 // value at r.pieces.
-func (r *Reader) piecesUncached() (ret pieceRange) {
+func (r *reader) piecesUncached() (ret pieceRange) {
        ra := r.readahead
        if ra < 1 {
                ra = 1
@@ -117,11 +121,11 @@ func (r *Reader) piecesUncached() (ret pieceRange) {
        return
 }
 
-func (r *Reader) Read(b []byte) (n int, err error) {
+func (r *reader) Read(b []byte) (n int, err error) {
        return r.ReadContext(context.Background(), b)
 }
 
-func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
+func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
        // This is set under the Client lock if the Context is canceled.
        var ctxErr error
        if ctx.Done() != nil {
@@ -166,7 +170,7 @@ func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
 
 // Wait until some data should be available to read. Tickles the client if it
 // isn't. Returns how much should be readable without blocking.
-func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
+func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
        r.t.cl.mu.Lock()
        defer r.t.cl.mu.Unlock()
        for !r.readable(pos) && *ctxErr == nil {
@@ -176,7 +180,7 @@ func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
 }
 
 // Performs at most one successful read to torrent storage.
-func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
+func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
        if pos >= r.t.length {
                err = io.EOF
                return
@@ -210,14 +214,14 @@ func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro
        }
 }
 
-func (r *Reader) Close() error {
+func (r *reader) Close() error {
        r.t.cl.mu.Lock()
        defer r.t.cl.mu.Unlock()
        r.t.deleteReader(r)
        return nil
 }
 
-func (r *Reader) posChanged() {
+func (r *reader) posChanged() {
        to := r.piecesUncached()
        from := r.pieces
        if to == from {
@@ -227,7 +231,7 @@ func (r *Reader) posChanged() {
        r.t.readerPosChanged(from, to)
 }
 
-func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
+func (r *reader) Seek(off int64, whence int) (ret int64, err error) {
        r.opMu.Lock()
        defer r.opMu.Unlock()
 
@@ -248,7 +252,3 @@ func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
        r.posChanged()
        return
 }
-
-func (r *Reader) Torrent() *Torrent {
-       return r.t
-}
index f56e9473b9f6184f7433edbeed55b9ca0ecf0705..5378efc5c515ea7a69d4c8dd986eb5d7e48069ae 100644 (file)
@@ -18,7 +18,7 @@ func TestReaderReadContext(t *testing.T) {
        require.NoError(t, err)
        defer tt.Drop()
        ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond))
-       r := tt.NewReader()
+       r := tt.Files()[0].NewReader()
        defer r.Close()
        _, err = r.ReadContext(ctx, make([]byte, 1))
        require.EqualValues(t, context.DeadlineExceeded, err)
diff --git a/t.go b/t.go
index cc69d8e3c7ac9c04c756af316bd81c81fc1c0d08..f687cb59fe963b85fde06a63dee00f4f6f9b99e8 100644 (file)
--- a/t.go
+++ b/t.go
@@ -31,14 +31,14 @@ func (t *Torrent) Info() *metainfo.Info {
 
 // Returns a Reader bound to the torrent's data. All read calls block until
 // the data requested is actually available.
-func (t *Torrent) NewReader() (ret *Reader) {
-       ret = &Reader{
+func (t *Torrent) NewReader() Reader {
+       r := reader{
                mu:        &t.cl.mu,
                t:         t,
                readahead: 5 * 1024 * 1024,
        }
-       t.addReader(ret)
-       return
+       t.addReader(&r)
+       return &r
 }
 
 // Returns the state of pieces of the torrent. They are grouped into runs of
@@ -133,17 +133,17 @@ func (t *Torrent) Metainfo() metainfo.MetaInfo {
        return t.newMetaInfo()
 }
 
-func (t *Torrent) addReader(r *Reader) {
+func (t *Torrent) addReader(r *reader) {
        t.cl.mu.Lock()
        defer t.cl.mu.Unlock()
        if t.readers == nil {
-               t.readers = make(map[*Reader]struct{})
+               t.readers = make(map[*reader]struct{})
        }
        t.readers[r] = struct{}{}
        r.posChanged()
 }
 
-func (t *Torrent) deleteReader(r *Reader) {
+func (t *Torrent) deleteReader(r *reader) {
        delete(t.readers, r)
        t.readersChanged()
 }
index 46575942b4367383f365db9d42d06d8fa8605e35..3cb32a13928c49732b44bb6be1718523f5a455a8 100644 (file)
@@ -107,7 +107,7 @@ type Torrent struct {
        // Set when .Info is obtained.
        gotMetainfo missinggo.Event
 
-       readers               map[*Reader]struct{}
+       readers               map[*reader]struct{}
        readerNowPieces       bitmap.Bitmap
        readerReadaheadPieces bitmap.Bitmap