None of the methods are optional anymore. Removed the nasty wrappers for some data implementations. Moved data.Data back into the main torrent package. Should make it much easier to understand for people implementing their own Data implementations.
"github.com/edsrzf/mmap-go"
"github.com/anacrolix/torrent/bencode"
- "github.com/anacrolix/torrent/data"
filePkg "github.com/anacrolix/torrent/data/file"
"github.com/anacrolix/torrent/dht"
"github.com/anacrolix/torrent/internal/pieceordering"
}
}
-// A Data that implements this has a streaming interface that should be
-// preferred over ReadAt. For example, the data is stored in blocks on the
-// network and have a fixed cost to open.
-type SectionOpener interface {
- // Open a ReadCloser at the given offset into torrent data. n is how many
- // bytes we intend to read.
- OpenSection(off, n int64) (io.ReadCloser, error)
-}
-
-func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
+func dataReadAt(d Data, b []byte, off int64) (n int, err error) {
// defer func() {
// if err == io.ErrUnexpectedEOF && n != 0 {
// err = nil
// }
// }()
// log.Println("data read at", len(b), off)
-again:
- if ra, ok := d.(io.ReaderAt); ok {
- return ra.ReadAt(b, off)
- }
- if so, ok := d.(SectionOpener); ok {
- var rc io.ReadCloser
- rc, err = so.OpenSection(off, int64(len(b)))
- if err != nil {
- return
- }
- defer rc.Close()
- return io.ReadFull(rc, b)
- }
- if dp, ok := super(d); ok {
- d = dp.(data.Data)
- goto again
- }
- panic(fmt.Sprintf("can't read from %T", d))
+ return d.ReadAt(b, off)
}
// Calculates the number of pieces to set to Readahead priority, after the
cl = &Client{
halfOpenLimit: socketsPerTorrent,
config: *cfg,
- torrentDataOpener: func(md *metainfo.Info) data.Data {
+ torrentDataOpener: func(md *metainfo.Info) Data {
return filePkg.TorrentData(md, cfg.DataDir)
},
dopplegangerAddrs: make(map[string]struct{}),
}
// Storage cannot be changed once it's set.
-func (cl *Client) setStorage(t *torrent, td data.Data) (err error) {
+func (cl *Client) setStorage(t *torrent, td Data) (err error) {
err = t.setStorage(td)
cl.event.Broadcast()
if err != nil {
return
}
-type TorrentDataOpener func(*metainfo.Info) data.Data
+type TorrentDataOpener func(*metainfo.Info) Data
func (cl *Client) setMetaData(t *torrent, md *metainfo.Info, bytes []byte) (err error) {
err = t.setMetadata(md, bytes, &cl.mu)
"github.com/stretchr/testify/require"
"github.com/anacrolix/torrent/bencode"
- "github.com/anacrolix/torrent/data"
"github.com/anacrolix/torrent/data/blob"
"github.com/anacrolix/torrent/dht"
"github.com/anacrolix/torrent/internal/testutil"
// cfg.TorrentDataOpener = func(info *metainfo.Info) (data.Data, error) {
// return blob.TorrentData(info, leecherDataDir), nil
// }
- cfg.TorrentDataOpener = blob.NewStore(leecherDataDir).OpenTorrent
+ blobStore := blob.NewStore(leecherDataDir)
+ cfg.TorrentDataOpener = func(info *metainfo.Info) Data {
+ return blobStore.OpenTorrent(info)
+ }
leecher, _ := NewClient(&cfg)
defer leecher.Close()
leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
assert.EqualValues(t, T.Trackers[1][0].URL(), "udp://b")
}
-type badData struct {
-}
+type badData struct{}
+
+func (me badData) Close() {}
func (me badData) WriteAt(b []byte, off int64) (int, error) {
return 0, nil
return
}
-var _ StatefulData = badData{}
-
// We read from a piece which is marked completed, but is missing data.
func TestCompletedPieceWrongSize(t *testing.T) {
cfg := TestingConfig
- cfg.TorrentDataOpener = func(*metainfo.Info) data.Data {
+ cfg.TorrentDataOpener = func(*metainfo.Info) Data {
return badData{}
}
cl, _ := NewClient(&cfg)
--- /dev/null
+package torrent
+
+import "io"
+
+// Represents data storage for a Torrent.
+type Data interface {
+ ReadAt(p []byte, off int64) (n int, err error)
+ Close()
+ WriteAt(p []byte, off int64) (n int, err error)
+ WriteSectionTo(w io.Writer, off, n int64) (written int64, err error)
+ // We believe the piece data will pass a hash check.
+ PieceCompleted(index int) error
+ // Returns true if the piece is complete.
+ PieceComplete(index int) bool
+}
+// Implements torrent data storage as per-piece files.
package blob
import (
"time"
"github.com/anacrolix/missinggo"
+ "github.com/anacrolix/torrent"
- dataPkg "github.com/anacrolix/torrent/data"
"github.com/anacrolix/torrent/metainfo"
)
completed map[[20]byte]struct{}
}
-func (me *store) OpenTorrent(info *metainfo.Info) dataPkg.Data {
+func (me *store) OpenTorrent(info *metainfo.Info) torrent.Data {
return &data{info, me}
}
}
}
-func NewStore(baseDir string, opt ...StoreOption) dataPkg.Store {
+func NewStore(baseDir string, opt ...StoreOption) *store {
s := &store{baseDir, -1, sync.Mutex{}, nil}
for _, o := range opt {
o(s)
+++ /dev/null
-package data
-
-import (
- "io"
-
- "github.com/anacrolix/torrent/metainfo"
-)
-
-type Store interface {
- OpenTorrent(*metainfo.Info) Data
-}
-
-// Represents data storage for a Torrent. Additional optional interfaces to
-// implement are io.Closer, io.ReaderAt, StatefulData, and SectionOpener.
-type Data interface {
- // OpenSection(off, n int64) (io.ReadCloser, error)
- // ReadAt(p []byte, off int64) (n int, err error)
- // Close()
- WriteAt(p []byte, off int64) (n int, err error)
- WriteSectionTo(w io.Writer, off, n int64) (written int64, err error)
-}
)
type data struct {
- info *metainfo.Info
- loc string
+ info *metainfo.Info
+ loc string
+ completed []bool
}
func TorrentData(md *metainfo.Info, location string) data {
- return data{md, location}
+ return data{md, location, make([]bool, md.NumPieces())}
+}
+
+func (me data) Close() {}
+
+func (me data) PieceComplete(piece int) bool {
+ return me.completed[piece]
+}
+
+func (me data) PieceCompleted(piece int) error {
+ me.completed[piece] = true
+ return nil
}
func (me data) ReadAt(p []byte, off int64) (n int, err error) {
"github.com/anacrolix/torrent/mmap_span"
)
-func TorrentData(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, err error) {
- mms = &mmap_span.MMapSpan{}
+type torrentData struct {
+ // Supports non-torrent specific data operations for the torrent.Data
+ // interface.
+ mmap_span.MMapSpan
+
+ completed []bool
+}
+
+func (me *torrentData) PieceComplete(piece int) bool {
+ return me.completed[piece]
+}
+
+func (me *torrentData) PieceCompleted(piece int) error {
+ me.completed[piece] = true
+ return nil
+}
+
+func TorrentData(md *metainfo.Info, location string) (ret *torrentData, err error) {
+ var mms mmap_span.MMapSpan
defer func() {
if err != nil {
mms.Close()
- mms = nil
}
}()
for _, miFile := range md.UpvertedFiles() {
return
}
}
+ ret = &torrentData{
+ MMapSpan: mms,
+ completed: make([]bool, md.NumPieces()),
+ }
return
}
"testing"
"time"
- _ "github.com/anacrolix/envpprof"
-
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
+ _ "github.com/anacrolix/envpprof"
"github.com/anacrolix/missinggo"
"github.com/stretchr/testify/assert"
netContext "golang.org/x/net/context"
"github.com/anacrolix/torrent"
- "github.com/anacrolix/torrent/data"
"github.com/anacrolix/torrent/data/mmap"
"github.com/anacrolix/torrent/internal/testutil"
"github.com/anacrolix/torrent/metainfo"
NoDefaultBlocklist: true,
- TorrentDataOpener: func(info *metainfo.Info) data.Data {
+ TorrentDataOpener: func(info *metainfo.Info) Data {
ret, _ := mmap.TorrentData(info, filepath.Join(layout.BaseDir, "download"))
return ret
},
+++ /dev/null
-package torrent
-
-import "github.com/anacrolix/torrent/data"
-
-type statelessDataWrapper struct {
- data.Data
- complete []bool
-}
-
-func (me *statelessDataWrapper) PieceComplete(piece int) bool {
- return me.complete[piece]
-}
-
-func (me *statelessDataWrapper) PieceCompleted(piece int) error {
- me.complete[piece] = true
- return nil
-}
-
-func (me *statelessDataWrapper) Super() interface{} {
- return me.Data
-}
"github.com/bradfitz/iter"
"github.com/anacrolix/torrent/bencode"
- "github.com/anacrolix/torrent/data"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/tracker"
Port int
}
-// Data maintains per-piece persistent state.
-type StatefulData interface {
- data.Data
- // We believe the piece data will pass a hash check.
- PieceCompleted(index int) error
- // Returns true if the piece is complete.
- PieceComplete(index int) bool
-}
-
// Is not aware of Client. Maintains state of torrent for with-in a Client.
type torrent struct {
stateMu sync.Mutex
// get this from the info dict.
length int64
- data StatefulData
+ data Data
// The info dict. Nil if we don't have it (yet).
Info *metainfo.Info
return
}
-func (t *torrent) setStorage(td data.Data) (err error) {
- if c, ok := t.data.(io.Closer); ok {
- c.Close()
- }
- if sd, ok := td.(StatefulData); ok {
- t.data = sd
- } else {
- t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())}
+func (t *torrent) setStorage(td Data) (err error) {
+ if t.data != nil {
+ t.data.Close()
}
+ t.data = td
return
}