cmd/torrent-pick/main.go | 3 +-- example_test.go | 14 ++++---------- file.go | 6 ++++++ file_reader.go | 18 ++++++++++++++++++ fs/file_handle.go | 6 +++--- fs/filenode.go | 11 ++++------- fs/torrentfs.go | 54 +++++++++++++++++++++++++---------------------------- reader.go | 52 ++++++++++++++++++++++++++-------------------------- reader_test.go | 2 +- t.go | 14 +++++++------- torrent.go | 2 +- diff --git a/cmd/torrent-pick/main.go b/cmd/torrent-pick/main.go index be590b8b4401bf1a97d80afb09b86663b384f083..8ff22455f9f7b8ed35e8be3a1a1d70e86d1eab48 100644 --- a/cmd/torrent-pick/main.go +++ b/cmd/torrent-pick/main.go @@ -15,7 +15,6 @@ "strings" "time" _ "github.com/anacrolix/envpprof" - "github.com/anacrolix/missinggo" "github.com/dustin/go-humanize" "github.com/jessevdk/go-flags" @@ -163,7 +162,7 @@ for _, file := range t.Files() { if file.DisplayPath() != rootGroup.Pick { continue } - srcReader := missinggo.NewSectionReadSeeker(t.NewReader(), file.Offset(), file.Length()) + srcReader := file.NewReader() io.Copy(dstWriter, srcReader) return } diff --git a/example_test.go b/example_test.go index 5c58e4d93a59d91cfa351904e99e31a9c4ee662c..54cb7190689081ac8020a24aeca054897a79ce13 100644 --- a/example_test.go +++ b/example_test.go @@ -3,8 +3,6 @@ import ( "log" - "github.com/anacrolix/missinggo" - "github.com/anacrolix/torrent" ) @@ -19,13 +17,9 @@ log.Print("ermahgerd, torrent downloaded") } func Example_fileReader() { - var ( - t *torrent.Torrent - f torrent.File - ) - r := t.NewReader() - defer r.Close() - // Access the parts of the torrent pertaining to f. Data will be + var f torrent.File + // Accesses the parts of the torrent pertaining to f. Data will be // downloaded as required, per the configuration of the torrent.Reader. - _ = missinggo.NewSectionReadSeeker(r, f.Offset(), f.Length()) + r := f.NewReader() + defer r.Close() } diff --git a/file.go b/file.go index 720e4fb510e37396bd715b8afee172cb03616754..e71cee6cde8bc3e24f7ae91df1206e505507e05c 100644 --- a/file.go +++ b/file.go @@ -3,6 +3,7 @@ import ( "strings" + "github.com/anacrolix/missinggo" "github.com/anacrolix/torrent/metainfo" ) @@ -99,3 +100,8 @@ func (f *File) Cancel() { f.t.CancelPieces(f.exclusivePieces()) } + +func (f *File) NewReader() Reader { + tr := f.t.NewReader() + return fileReader{missinggo.NewSectionReadSeeker(tr, f.Offset(), f.Length()), tr} +} diff --git a/file_reader.go b/file_reader.go new file mode 100644 index 0000000000000000000000000000000000000000..dde43620e8cf8e20ae50bec15e1b5d255e64e33f --- /dev/null +++ b/file_reader.go @@ -0,0 +1,18 @@ +package torrent + +import ( + "io" + + "github.com/anacrolix/missinggo" +) + +type fileReaderInherited interface { + io.Closer + SetReadahead(int64) + SetResponsive() +} + +type fileReader struct { + missinggo.ReadSeekContexter + fileReaderInherited +} diff --git a/fs/file_handle.go b/fs/file_handle.go index 3405cce7f1c2c5bf5f25b4c228b7f4f251b616d4..41517296b2a0680ff4e59b2c5d49b3607fd8fbc7 100644 --- a/fs/file_handle.go +++ b/fs/file_handle.go @@ -13,7 +13,7 @@ ) type fileHandle struct { fn fileNode - r *torrent.Reader + r torrent.Reader } var _ interface { @@ -26,11 +26,11 @@ torrentfsReadRequests.Add(1) if req.Dir { panic("read on directory") } - pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, io.SeekStart) + pos, err := me.r.Seek(req.Offset, io.SeekStart) if err != nil { panic(err) } - if pos != me.fn.TorrentOffset+req.Offset { + if pos != req.Offset { panic("seek failed") } resp.Data = resp.Data[:req.Size] diff --git a/fs/filenode.go b/fs/filenode.go index db8507559c900c8fd42ad476e35805bf3967870c..d0e4fe3fbb49bc333dc84aaafc5a285e6772ee5f 100644 --- a/fs/filenode.go +++ b/fs/filenode.go @@ -1,17 +1,15 @@ package torrentfs import ( - "io" - "bazil.org/fuse" fusefs "bazil.org/fuse/fs" + "github.com/anacrolix/torrent" "golang.org/x/net/context" ) type fileNode struct { node - size uint64 - TorrentOffset int64 + f *torrent.File } var ( @@ -19,13 +17,12 @@ _ fusefs.NodeOpener = fileNode{} ) func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error { - attr.Size = fn.size + attr.Size = uint64(fn.f.Length()) attr.Mode = defaultMode return nil } func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) { - r := fn.t.NewReader() - r.Seek(fn.TorrentOffset, io.SeekStart) + r := fn.f.NewReader() return fileHandle{fn, r}, nil } diff --git a/fs/torrentfs.go b/fs/torrentfs.go index 32ead7fb04e82a0d3c45c4737f0cc7d4182ddf2c..65cb42cef2efed3b1a6509c32804d29917b64daf 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -3,7 +3,6 @@ import ( "expvar" "os" - "path" "strings" "sync" @@ -67,11 +66,12 @@ } if !strings.HasPrefix(child, parent) { return false } - s := child[len(parent):] - if len(s) == 0 { + extra := child[len(parent):] + if len(extra) == 0 { return false } - return s[0] == '/' + // Not just a file with more stuff on the end. + return extra[0] == '/' } func (dn dirNode) ReadDirAll(ctx context.Context) (des []fuse.Dirent, err error) { @@ -98,34 +98,30 @@ } return } -func (dn dirNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) { - var torrentOffset int64 - for _, fi := range dn.metadata.Files { - if !isSubPath(dn.path, strings.Join(fi.Path, "/")) { - torrentOffset += fi.Length - continue +func (dn dirNode) Lookup(_ context.Context, name string) (fusefs.Node, error) { + dir := false + var file *torrent.File + fullPath := dn.path + "/" + name + for _, f := range dn.t.Files() { + if f.DisplayPath() == fullPath { + file = &f } - if fi.Path[len(dn.path)] != name { - torrentOffset += fi.Length - continue - } - __node := dn.node - __node.path = path.Join(__node.path, name) - if len(fi.Path) == len(dn.path)+1 { - _node = fileNode{ - node: __node, - size: uint64(fi.Length), - TorrentOffset: torrentOffset, - } - } else { - _node = dirNode{__node} + if isSubPath(fullPath, f.DisplayPath()) { + dir = true } - break + } + n := dn.node + n.path = fullPath + if dir && file != nil { + panic("both dir and file") } - if _node == nil { - err = fuse.ENOENT + if file != nil { + return fileNode{n, file}, nil } - return + if dir { + return dirNode{n}, nil + } + return nil, fuse.ENOENT } func (dn dirNode) Attr(ctx context.Context, attr *fuse.Attr) error { @@ -145,7 +141,7 @@ FS: rn.fs, t: t, } if !info.IsDir() { - _node = fileNode{__node, uint64(info.Length), 0} + _node = fileNode{__node, &t.Files()[0]} } else { _node = dirNode{__node} } diff --git a/reader.go b/reader.go index bdfeed04047fa2001668dea64e87353d2ee7abcc..76c39e968307f08af9728c5ff050427f639ed46f 100644 --- a/reader.go +++ b/reader.go @@ -10,6 +10,15 @@ "github.com/anacrolix/missinggo" "golang.org/x/net/context" ) +type Reader interface { + io.Reader + io.Seeker + io.Closer + missinggo.ReadContexter + SetReadahead(int64) + SetResponsive() +} + // Piece range by piece index, [begin, end). type pieceRange struct { begin, end int @@ -17,7 +26,7 @@ } // Accesses Torrent data via a Client. Reads block until the data is // available. Seeks and readahead also drive Client behaviour. -type Reader struct { +type reader struct { t *Torrent responsive bool // Ensure operations that change the position are exclusive, like Read() @@ -35,22 +44,22 @@ // and bubbled up to the Torrent only as required. pieces pieceRange } -var _ io.ReadCloser = &Reader{} +var _ io.ReadCloser = &reader{} // Don't wait for pieces to complete and be verified. Read calls return as // soon as they can when the underlying chunks become available. -func (r *Reader) SetResponsive() { +func (r *reader) SetResponsive() { r.responsive = true } -// Disable responsive mode. -func (r *Reader) SetNonResponsive() { +// Disable responsive mode. TODO: Remove? +func (r *reader) SetNonResponsive() { r.responsive = false } // Configure the number of bytes ahead of a read that should also be // prioritized in preparation for further reads. -func (r *Reader) SetReadahead(readahead int64) { +func (r *reader) SetReadahead(readahead int64) { r.mu.Lock() r.readahead = readahead r.mu.Unlock() @@ -59,12 +68,7 @@ defer r.t.cl.mu.Unlock() r.posChanged() } -// Return reader's current position. -func (r *Reader) CurrentPos() int64 { - return r.pos -} - -func (r *Reader) readable(off int64) (ret bool) { +func (r *reader) readable(off int64) (ret bool) { if r.t.closed.IsSet() { return true } @@ -79,7 +83,7 @@ return r.t.pieceComplete(int(req.Index)) } // How many bytes are available to read. Max is the most we could require. -func (r *Reader) available(off, max int64) (ret int64) { +func (r *reader) available(off, max int64) (ret int64) { for max > 0 { req, ok := r.t.offsetRequest(off) if !ok { @@ -100,7 +104,7 @@ } return } -func (r *Reader) waitReadable(off int64) { +func (r *reader) waitReadable(off int64) { // We may have been sent back here because we were told we could read but // it failed. r.t.cl.event.Wait() @@ -108,7 +112,7 @@ } // Calculates the pieces this reader wants downloaded, ignoring the cached // value at r.pieces. -func (r *Reader) piecesUncached() (ret pieceRange) { +func (r *reader) piecesUncached() (ret pieceRange) { ra := r.readahead if ra < 1 { ra = 1 @@ -117,11 +121,11 @@ ret.begin, ret.end = r.t.byteRegionPieces(r.pos, ra) return } -func (r *Reader) Read(b []byte) (n int, err error) { +func (r *reader) Read(b []byte) (n int, err error) { return r.ReadContext(context.Background(), b) } -func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { +func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { // This is set under the Client lock if the Context is canceled. var ctxErr error if ctx.Done() != nil { @@ -166,7 +170,7 @@ } // Wait until some data should be available to read. Tickles the client if it // isn't. Returns how much should be readable without blocking. -func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) { +func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) { r.t.cl.mu.Lock() defer r.t.cl.mu.Unlock() for !r.readable(pos) && *ctxErr == nil { @@ -176,7 +180,7 @@ return r.available(pos, wanted) } // Performs at most one successful read to torrent storage. -func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) { +func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) { if pos >= r.t.length { err = io.EOF return @@ -210,14 +214,14 @@ r.t.cl.mu.Unlock() } } -func (r *Reader) Close() error { +func (r *reader) Close() error { r.t.cl.mu.Lock() defer r.t.cl.mu.Unlock() r.t.deleteReader(r) return nil } -func (r *Reader) posChanged() { +func (r *reader) posChanged() { to := r.piecesUncached() from := r.pieces if to == from { @@ -227,7 +231,7 @@ r.pieces = to r.t.readerPosChanged(from, to) } -func (r *Reader) Seek(off int64, whence int) (ret int64, err error) { +func (r *reader) Seek(off int64, whence int) (ret int64, err error) { r.opMu.Lock() defer r.opMu.Unlock() @@ -248,7 +252,3 @@ r.posChanged() return } - -func (r *Reader) Torrent() *Torrent { - return r.t -} diff --git a/reader_test.go b/reader_test.go index f56e9473b9f6184f7433edbeed55b9ca0ecf0705..5378efc5c515ea7a69d4c8dd986eb5d7e48069ae 100644 --- a/reader_test.go +++ b/reader_test.go @@ -18,7 +18,7 @@ tt, err := cl.AddTorrent(testutil.GreetingMetaInfo()) require.NoError(t, err) defer tt.Drop() ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond)) - r := tt.NewReader() + r := tt.Files()[0].NewReader() defer r.Close() _, err = r.ReadContext(ctx, make([]byte, 1)) require.EqualValues(t, context.DeadlineExceeded, err) diff --git a/t.go b/t.go index cc69d8e3c7ac9c04c756af316bd81c81fc1c0d08..f687cb59fe963b85fde06a63dee00f4f6f9b99e8 100644 --- a/t.go +++ b/t.go @@ -31,14 +31,14 @@ } // Returns a Reader bound to the torrent's data. All read calls block until // the data requested is actually available. -func (t *Torrent) NewReader() (ret *Reader) { - ret = &Reader{ +func (t *Torrent) NewReader() Reader { + r := reader{ mu: &t.cl.mu, t: t, readahead: 5 * 1024 * 1024, } - t.addReader(ret) - return + t.addReader(&r) + return &r } // Returns the state of pieces of the torrent. They are grouped into runs of @@ -133,17 +133,17 @@ defer t.cl.mu.Unlock() return t.newMetaInfo() } -func (t *Torrent) addReader(r *Reader) { +func (t *Torrent) addReader(r *reader) { t.cl.mu.Lock() defer t.cl.mu.Unlock() if t.readers == nil { - t.readers = make(map[*Reader]struct{}) + t.readers = make(map[*reader]struct{}) } t.readers[r] = struct{}{} r.posChanged() } -func (t *Torrent) deleteReader(r *Reader) { +func (t *Torrent) deleteReader(r *reader) { delete(t.readers, r) t.readersChanged() } diff --git a/torrent.go b/torrent.go index 46575942b4367383f365db9d42d06d8fa8605e35..3cb32a13928c49732b44bb6be1718523f5a455a8 100644 --- a/torrent.go +++ b/torrent.go @@ -107,7 +107,7 @@ // Set when .Info is obtained. gotMetainfo missinggo.Event - readers map[*Reader]struct{} + readers map[*reader]struct{} readerNowPieces bitmap.Bitmap readerReadaheadPieces bitmap.Bitmap