"time"
_ "github.com/anacrolix/envpprof"
- "github.com/anacrolix/missinggo"
"github.com/dustin/go-humanize"
"github.com/jessevdk/go-flags"
if file.DisplayPath() != rootGroup.Pick {
continue
}
- srcReader := missinggo.NewSectionReadSeeker(t.NewReader(), file.Offset(), file.Length())
+ srcReader := file.NewReader()
io.Copy(dstWriter, srcReader)
return
}
import (
"log"
- "github.com/anacrolix/missinggo"
-
"github.com/anacrolix/torrent"
)
}
func Example_fileReader() {
- var (
- t *torrent.Torrent
- f torrent.File
- )
- r := t.NewReader()
- defer r.Close()
- // Access the parts of the torrent pertaining to f. Data will be
+ var f torrent.File
+ // Accesses the parts of the torrent pertaining to f. Data will be
// downloaded as required, per the configuration of the torrent.Reader.
- _ = missinggo.NewSectionReadSeeker(r, f.Offset(), f.Length())
+ r := f.NewReader()
+ defer r.Close()
}
import (
"strings"
+ "github.com/anacrolix/missinggo"
"github.com/anacrolix/torrent/metainfo"
)
func (f *File) Cancel() {
f.t.CancelPieces(f.exclusivePieces())
}
+
+func (f *File) NewReader() Reader {
+ tr := f.t.NewReader()
+ return fileReader{missinggo.NewSectionReadSeeker(tr, f.Offset(), f.Length()), tr}
+}
--- /dev/null
+package torrent
+
+import (
+ "io"
+
+ "github.com/anacrolix/missinggo"
+)
+
+type fileReaderInherited interface {
+ io.Closer
+ SetReadahead(int64)
+ SetResponsive()
+}
+
+type fileReader struct {
+ missinggo.ReadSeekContexter
+ fileReaderInherited
+}
type fileHandle struct {
fn fileNode
- r *torrent.Reader
+ r torrent.Reader
}
var _ interface {
if req.Dir {
panic("read on directory")
}
- pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, io.SeekStart)
+ pos, err := me.r.Seek(req.Offset, io.SeekStart)
if err != nil {
panic(err)
}
- if pos != me.fn.TorrentOffset+req.Offset {
+ if pos != req.Offset {
panic("seek failed")
}
resp.Data = resp.Data[:req.Size]
package torrentfs
import (
- "io"
-
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
+ "github.com/anacrolix/torrent"
"golang.org/x/net/context"
)
type fileNode struct {
node
- size uint64
- TorrentOffset int64
+ f *torrent.File
}
var (
)
func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
- attr.Size = fn.size
+ attr.Size = uint64(fn.f.Length())
attr.Mode = defaultMode
return nil
}
func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) {
- r := fn.t.NewReader()
- r.Seek(fn.TorrentOffset, io.SeekStart)
+ r := fn.f.NewReader()
return fileHandle{fn, r}, nil
}
import (
"expvar"
"os"
- "path"
"strings"
"sync"
if !strings.HasPrefix(child, parent) {
return false
}
- s := child[len(parent):]
- if len(s) == 0 {
+ extra := child[len(parent):]
+ if len(extra) == 0 {
return false
}
- return s[0] == '/'
+ // Not just a file with more stuff on the end.
+ return extra[0] == '/'
}
func (dn dirNode) ReadDirAll(ctx context.Context) (des []fuse.Dirent, err error) {
return
}
-func (dn dirNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) {
- var torrentOffset int64
- for _, fi := range dn.metadata.Files {
- if !isSubPath(dn.path, strings.Join(fi.Path, "/")) {
- torrentOffset += fi.Length
- continue
+func (dn dirNode) Lookup(_ context.Context, name string) (fusefs.Node, error) {
+ dir := false
+ var file *torrent.File
+ fullPath := dn.path + "/" + name
+ for _, f := range dn.t.Files() {
+ if f.DisplayPath() == fullPath {
+ file = &f
}
- if fi.Path[len(dn.path)] != name {
- torrentOffset += fi.Length
- continue
- }
- __node := dn.node
- __node.path = path.Join(__node.path, name)
- if len(fi.Path) == len(dn.path)+1 {
- _node = fileNode{
- node: __node,
- size: uint64(fi.Length),
- TorrentOffset: torrentOffset,
- }
- } else {
- _node = dirNode{__node}
+ if isSubPath(fullPath, f.DisplayPath()) {
+ dir = true
}
- break
}
- if _node == nil {
- err = fuse.ENOENT
+ n := dn.node
+ n.path = fullPath
+ if dir && file != nil {
+ panic("both dir and file")
}
- return
+ if file != nil {
+ return fileNode{n, file}, nil
+ }
+ if dir {
+ return dirNode{n}, nil
+ }
+ return nil, fuse.ENOENT
}
func (dn dirNode) Attr(ctx context.Context, attr *fuse.Attr) error {
t: t,
}
if !info.IsDir() {
- _node = fileNode{__node, uint64(info.Length), 0}
+ _node = fileNode{__node, &t.Files()[0]}
} else {
_node = dirNode{__node}
}
"golang.org/x/net/context"
)
+type Reader interface {
+ io.Reader
+ io.Seeker
+ io.Closer
+ missinggo.ReadContexter
+ SetReadahead(int64)
+ SetResponsive()
+}
+
// Piece range by piece index, [begin, end).
type pieceRange struct {
begin, end int
// Accesses Torrent data via a Client. Reads block until the data is
// available. Seeks and readahead also drive Client behaviour.
-type Reader struct {
+type reader struct {
t *Torrent
responsive bool
// Ensure operations that change the position are exclusive, like Read()
pieces pieceRange
}
-var _ io.ReadCloser = &Reader{}
+var _ io.ReadCloser = &reader{}
// Don't wait for pieces to complete and be verified. Read calls return as
// soon as they can when the underlying chunks become available.
-func (r *Reader) SetResponsive() {
+func (r *reader) SetResponsive() {
r.responsive = true
}
-// Disable responsive mode.
-func (r *Reader) SetNonResponsive() {
+// Disable responsive mode. TODO: Remove?
+func (r *reader) SetNonResponsive() {
r.responsive = false
}
// Configure the number of bytes ahead of a read that should also be
// prioritized in preparation for further reads.
-func (r *Reader) SetReadahead(readahead int64) {
+func (r *reader) SetReadahead(readahead int64) {
r.mu.Lock()
r.readahead = readahead
r.mu.Unlock()
r.posChanged()
}
-// Return reader's current position.
-func (r *Reader) CurrentPos() int64 {
- return r.pos
-}
-
-func (r *Reader) readable(off int64) (ret bool) {
+func (r *reader) readable(off int64) (ret bool) {
if r.t.closed.IsSet() {
return true
}
}
// How many bytes are available to read. Max is the most we could require.
-func (r *Reader) available(off, max int64) (ret int64) {
+func (r *reader) available(off, max int64) (ret int64) {
for max > 0 {
req, ok := r.t.offsetRequest(off)
if !ok {
return
}
-func (r *Reader) waitReadable(off int64) {
+func (r *reader) waitReadable(off int64) {
// We may have been sent back here because we were told we could read but
// it failed.
r.t.cl.event.Wait()
// Calculates the pieces this reader wants downloaded, ignoring the cached
// value at r.pieces.
-func (r *Reader) piecesUncached() (ret pieceRange) {
+func (r *reader) piecesUncached() (ret pieceRange) {
ra := r.readahead
if ra < 1 {
ra = 1
return
}
-func (r *Reader) Read(b []byte) (n int, err error) {
+func (r *reader) Read(b []byte) (n int, err error) {
return r.ReadContext(context.Background(), b)
}
-func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
+func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
// This is set under the Client lock if the Context is canceled.
var ctxErr error
if ctx.Done() != nil {
// Wait until some data should be available to read. Tickles the client if it
// isn't. Returns how much should be readable without blocking.
-func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
+func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
r.t.cl.mu.Lock()
defer r.t.cl.mu.Unlock()
for !r.readable(pos) && *ctxErr == nil {
}
// Performs at most one successful read to torrent storage.
-func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
+func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
if pos >= r.t.length {
err = io.EOF
return
}
}
-func (r *Reader) Close() error {
+func (r *reader) Close() error {
r.t.cl.mu.Lock()
defer r.t.cl.mu.Unlock()
r.t.deleteReader(r)
return nil
}
-func (r *Reader) posChanged() {
+func (r *reader) posChanged() {
to := r.piecesUncached()
from := r.pieces
if to == from {
r.t.readerPosChanged(from, to)
}
-func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
+func (r *reader) Seek(off int64, whence int) (ret int64, err error) {
r.opMu.Lock()
defer r.opMu.Unlock()
r.posChanged()
return
}
-
-func (r *Reader) Torrent() *Torrent {
- return r.t
-}
require.NoError(t, err)
defer tt.Drop()
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond))
- r := tt.NewReader()
+ r := tt.Files()[0].NewReader()
defer r.Close()
_, err = r.ReadContext(ctx, make([]byte, 1))
require.EqualValues(t, context.DeadlineExceeded, err)
// Returns a Reader bound to the torrent's data. All read calls block until
// the data requested is actually available.
-func (t *Torrent) NewReader() (ret *Reader) {
- ret = &Reader{
+func (t *Torrent) NewReader() Reader {
+ r := reader{
mu: &t.cl.mu,
t: t,
readahead: 5 * 1024 * 1024,
}
- t.addReader(ret)
- return
+ t.addReader(&r)
+ return &r
}
// Returns the state of pieces of the torrent. They are grouped into runs of
return t.newMetaInfo()
}
-func (t *Torrent) addReader(r *Reader) {
+func (t *Torrent) addReader(r *reader) {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
if t.readers == nil {
- t.readers = make(map[*Reader]struct{})
+ t.readers = make(map[*reader]struct{})
}
t.readers[r] = struct{}{}
r.posChanged()
}
-func (t *Torrent) deleteReader(r *Reader) {
+func (t *Torrent) deleteReader(r *reader) {
delete(t.readers, r)
t.readersChanged()
}
// Set when .Info is obtained.
gotMetainfo missinggo.Event
- readers map[*Reader]struct{}
+ readers map[*reader]struct{}
readerNowPieces bitmap.Bitmap
readerReadaheadPieces bitmap.Bitmap