From b7a8bb7570a5c667b5fb6d29bf0b4b142eaedbdb Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 2 Oct 2015 00:09:04 +1000 Subject: [PATCH] Simplify the torrent Data interface None of the methods are optional anymore. Removed the nasty wrappers for some data implementations. Moved data.Data back into the main torrent package. Should make it much easier to understand for people implementing their own Data implementations. --- client.go | 37 +++++-------------------------------- client_test.go | 15 ++++++++------- data.go | 15 +++++++++++++++ data/blob/store.go | 7 ++++--- data/data.go | 21 --------------------- data/file/file.go | 18 +++++++++++++++--- data/mmap/mmap.go | 26 +++++++++++++++++++++++--- fs/torrentfs_test.go | 6 ++---- stateless.go | 21 --------------------- torrent.go | 24 +++++------------------- 10 files changed, 77 insertions(+), 113 deletions(-) create mode 100644 data.go delete mode 100644 data/data.go delete mode 100644 stateless.go diff --git a/client.go b/client.go index 898ee23a..d2835726 100644 --- a/client.go +++ b/client.go @@ -32,7 +32,6 @@ import ( "github.com/edsrzf/mmap-go" "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/data" filePkg "github.com/anacrolix/torrent/data/file" "github.com/anacrolix/torrent/dht" "github.com/anacrolix/torrent/internal/pieceordering" @@ -253,40 +252,14 @@ func (cl *Client) WriteStatus(_w io.Writer) { } } -// A Data that implements this has a streaming interface that should be -// preferred over ReadAt. For example, the data is stored in blocks on the -// network and have a fixed cost to open. -type SectionOpener interface { - // Open a ReadCloser at the given offset into torrent data. n is how many - // bytes we intend to read. - OpenSection(off, n int64) (io.ReadCloser, error) -} - -func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) { +func dataReadAt(d Data, b []byte, off int64) (n int, err error) { // defer func() { // if err == io.ErrUnexpectedEOF && n != 0 { // err = nil // } // }() // log.Println("data read at", len(b), off) -again: - if ra, ok := d.(io.ReaderAt); ok { - return ra.ReadAt(b, off) - } - if so, ok := d.(SectionOpener); ok { - var rc io.ReadCloser - rc, err = so.OpenSection(off, int64(len(b))) - if err != nil { - return - } - defer rc.Close() - return io.ReadFull(rc, b) - } - if dp, ok := super(d); ok { - d = dp.(data.Data) - goto again - } - panic(fmt.Sprintf("can't read from %T", d)) + return d.ReadAt(b, off) } // Calculates the number of pieces to set to Readahead priority, after the @@ -474,7 +447,7 @@ func NewClient(cfg *Config) (cl *Client, err error) { cl = &Client{ halfOpenLimit: socketsPerTorrent, config: *cfg, - torrentDataOpener: func(md *metainfo.Info) data.Data { + torrentDataOpener: func(md *metainfo.Info) Data { return filePkg.TorrentData(md, cfg.DataDir) }, dopplegangerAddrs: make(map[string]struct{}), @@ -1934,7 +1907,7 @@ func (cl *Client) startTorrent(t *torrent) { } // Storage cannot be changed once it's set. -func (cl *Client) setStorage(t *torrent, td data.Data) (err error) { +func (cl *Client) setStorage(t *torrent, td Data) (err error) { err = t.setStorage(td) cl.event.Broadcast() if err != nil { @@ -1944,7 +1917,7 @@ func (cl *Client) setStorage(t *torrent, td data.Data) (err error) { return } -type TorrentDataOpener func(*metainfo.Info) data.Data +type TorrentDataOpener func(*metainfo.Info) Data func (cl *Client) setMetaData(t *torrent, md *metainfo.Info, bytes []byte) (err error) { err = t.setMetadata(md, bytes, &cl.mu) diff --git a/client_test.go b/client_test.go index fb797b2b..49d0965f 100644 --- a/client_test.go +++ b/client_test.go @@ -22,7 +22,6 @@ import ( "github.com/stretchr/testify/require" "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/data" "github.com/anacrolix/torrent/data/blob" "github.com/anacrolix/torrent/dht" "github.com/anacrolix/torrent/internal/testutil" @@ -267,7 +266,10 @@ func TestClientTransfer(t *testing.T) { // cfg.TorrentDataOpener = func(info *metainfo.Info) (data.Data, error) { // return blob.TorrentData(info, leecherDataDir), nil // } - cfg.TorrentDataOpener = blob.NewStore(leecherDataDir).OpenTorrent + blobStore := blob.NewStore(leecherDataDir) + cfg.TorrentDataOpener = func(info *metainfo.Info) Data { + return blobStore.OpenTorrent(info) + } leecher, _ := NewClient(&cfg) defer leecher.Close() leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { @@ -402,8 +404,9 @@ func TestMergingTrackersByAddingSpecs(t *testing.T) { assert.EqualValues(t, T.Trackers[1][0].URL(), "udp://b") } -type badData struct { -} +type badData struct{} + +func (me badData) Close() {} func (me badData) WriteAt(b []byte, off int64) (int, error) { return 0, nil @@ -430,12 +433,10 @@ func (me badData) ReadAt(b []byte, off int64) (n int, err error) { return } -var _ StatefulData = badData{} - // We read from a piece which is marked completed, but is missing data. func TestCompletedPieceWrongSize(t *testing.T) { cfg := TestingConfig - cfg.TorrentDataOpener = func(*metainfo.Info) data.Data { + cfg.TorrentDataOpener = func(*metainfo.Info) Data { return badData{} } cl, _ := NewClient(&cfg) diff --git a/data.go b/data.go new file mode 100644 index 00000000..803d8476 --- /dev/null +++ b/data.go @@ -0,0 +1,15 @@ +package torrent + +import "io" + +// Represents data storage for a Torrent. +type Data interface { + ReadAt(p []byte, off int64) (n int, err error) + Close() + WriteAt(p []byte, off int64) (n int, err error) + WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) + // We believe the piece data will pass a hash check. + PieceCompleted(index int) error + // Returns true if the piece is complete. + PieceComplete(index int) bool +} diff --git a/data/blob/store.go b/data/blob/store.go index 96eda1fe..385d2ee5 100644 --- a/data/blob/store.go +++ b/data/blob/store.go @@ -1,3 +1,4 @@ +// Implements torrent data storage as per-piece files. package blob import ( @@ -14,8 +15,8 @@ import ( "time" "github.com/anacrolix/missinggo" + "github.com/anacrolix/torrent" - dataPkg "github.com/anacrolix/torrent/data" "github.com/anacrolix/torrent/metainfo" ) @@ -32,7 +33,7 @@ type store struct { completed map[[20]byte]struct{} } -func (me *store) OpenTorrent(info *metainfo.Info) dataPkg.Data { +func (me *store) OpenTorrent(info *metainfo.Info) torrent.Data { return &data{info, me} } @@ -44,7 +45,7 @@ func Capacity(bytes int64) StoreOption { } } -func NewStore(baseDir string, opt ...StoreOption) dataPkg.Store { +func NewStore(baseDir string, opt ...StoreOption) *store { s := &store{baseDir, -1, sync.Mutex{}, nil} for _, o := range opt { o(s) diff --git a/data/data.go b/data/data.go deleted file mode 100644 index f783279d..00000000 --- a/data/data.go +++ /dev/null @@ -1,21 +0,0 @@ -package data - -import ( - "io" - - "github.com/anacrolix/torrent/metainfo" -) - -type Store interface { - OpenTorrent(*metainfo.Info) Data -} - -// Represents data storage for a Torrent. Additional optional interfaces to -// implement are io.Closer, io.ReaderAt, StatefulData, and SectionOpener. -type Data interface { - // OpenSection(off, n int64) (io.ReadCloser, error) - // ReadAt(p []byte, off int64) (n int, err error) - // Close() - WriteAt(p []byte, off int64) (n int, err error) - WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) -} diff --git a/data/file/file.go b/data/file/file.go index 0c30bc50..bf77db2a 100644 --- a/data/file/file.go +++ b/data/file/file.go @@ -9,12 +9,24 @@ import ( ) type data struct { - info *metainfo.Info - loc string + info *metainfo.Info + loc string + completed []bool } func TorrentData(md *metainfo.Info, location string) data { - return data{md, location} + return data{md, location, make([]bool, md.NumPieces())} +} + +func (me data) Close() {} + +func (me data) PieceComplete(piece int) bool { + return me.completed[piece] +} + +func (me data) PieceCompleted(piece int) error { + me.completed[piece] = true + return nil } func (me data) ReadAt(p []byte, off int64) (n int, err error) { diff --git a/data/mmap/mmap.go b/data/mmap/mmap.go index 7a0324b3..ec4236d6 100644 --- a/data/mmap/mmap.go +++ b/data/mmap/mmap.go @@ -11,12 +11,28 @@ import ( "github.com/anacrolix/torrent/mmap_span" ) -func TorrentData(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, err error) { - mms = &mmap_span.MMapSpan{} +type torrentData struct { + // Supports non-torrent specific data operations for the torrent.Data + // interface. + mmap_span.MMapSpan + + completed []bool +} + +func (me *torrentData) PieceComplete(piece int) bool { + return me.completed[piece] +} + +func (me *torrentData) PieceCompleted(piece int) error { + me.completed[piece] = true + return nil +} + +func TorrentData(md *metainfo.Info, location string) (ret *torrentData, err error) { + var mms mmap_span.MMapSpan defer func() { if err != nil { mms.Close() - mms = nil } }() for _, miFile := range md.UpvertedFiles() { @@ -63,5 +79,9 @@ func TorrentData(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, e return } } + ret = &torrentData{ + MMapSpan: mms, + completed: make([]bool, md.NumPieces()), + } return } diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index 8ae89e1c..559e7988 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -15,16 +15,14 @@ import ( "testing" "time" - _ "github.com/anacrolix/envpprof" - "bazil.org/fuse" fusefs "bazil.org/fuse/fs" + _ "github.com/anacrolix/envpprof" "github.com/anacrolix/missinggo" "github.com/stretchr/testify/assert" netContext "golang.org/x/net/context" "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/data" "github.com/anacrolix/torrent/data/mmap" "github.com/anacrolix/torrent/internal/testutil" "github.com/anacrolix/torrent/metainfo" @@ -205,7 +203,7 @@ func TestDownloadOnDemand(t *testing.T) { NoDefaultBlocklist: true, - TorrentDataOpener: func(info *metainfo.Info) data.Data { + TorrentDataOpener: func(info *metainfo.Info) Data { ret, _ := mmap.TorrentData(info, filepath.Join(layout.BaseDir, "download")) return ret }, diff --git a/stateless.go b/stateless.go deleted file mode 100644 index 99fc1e07..00000000 --- a/stateless.go +++ /dev/null @@ -1,21 +0,0 @@ -package torrent - -import "github.com/anacrolix/torrent/data" - -type statelessDataWrapper struct { - data.Data - complete []bool -} - -func (me *statelessDataWrapper) PieceComplete(piece int) bool { - return me.complete[piece] -} - -func (me *statelessDataWrapper) PieceCompleted(piece int) error { - me.complete[piece] = true - return nil -} - -func (me *statelessDataWrapper) Super() interface{} { - return me.Data -} diff --git a/torrent.go b/torrent.go index 82d97a49..cb130007 100644 --- a/torrent.go +++ b/torrent.go @@ -17,7 +17,6 @@ import ( "github.com/bradfitz/iter" "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/data" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/tracker" @@ -45,15 +44,6 @@ type peersKey struct { Port int } -// Data maintains per-piece persistent state. -type StatefulData interface { - data.Data - // We believe the piece data will pass a hash check. - PieceCompleted(index int) error - // Returns true if the piece is complete. - PieceComplete(index int) bool -} - // Is not aware of Client. Maintains state of torrent for with-in a Client. type torrent struct { stateMu sync.Mutex @@ -75,7 +65,7 @@ type torrent struct { // get this from the info dict. length int64 - data StatefulData + data Data // The info dict. Nil if we don't have it (yet). Info *metainfo.Info @@ -272,15 +262,11 @@ func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker s return } -func (t *torrent) setStorage(td data.Data) (err error) { - if c, ok := t.data.(io.Closer); ok { - c.Close() - } - if sd, ok := td.(StatefulData); ok { - t.data = sd - } else { - t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())} +func (t *torrent) setStorage(td Data) (err error) { + if t.data != nil { + t.data.Close() } + t.data = td return } -- 2.48.1