This allows lots of behaviour to be baked into the new Client, Torrent and Piece wrappers, rather than duplicating (badly) them in all the backend implementations.
dopplegangerAddrs map[string]struct{}
badPeerIPs map[string]struct{}
- defaultStorage storage.Client
+ defaultStorage *storage.Client
mu sync.RWMutex
event sync.Cond
cl = &Client{
halfOpenLimit: defaultHalfOpenConnsPerTorrent,
config: *cfg,
- defaultStorage: cfg.DefaultStorage,
dopplegangerAddrs: make(map[string]struct{}),
torrents: make(map[metainfo.Hash]*Torrent),
}
missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
cl.event.L = &cl.mu
- if cl.defaultStorage == nil {
- cl.defaultStorage = storage.NewFile(cfg.DataDir)
+ storageImpl := cfg.DefaultStorage
+ if storageImpl == nil {
+ storageImpl = storage.NewFile(cfg.DataDir)
}
+ cl.defaultStorage = storage.NewClient(storageImpl)
if cfg.IPBlocklist != nil {
cl.ipBlockList = cfg.IPBlocklist
}
// The chunk size to use for outbound requests. Defaults to 16KiB if not
// set.
ChunkSize int
- Storage storage.Client
+ Storage storage.ClientImpl
}
func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
pieceStateChanges: pubsub.NewPubSub(),
}
tor.chunkSize = 2
- tor.storageOpener = storage.NewFile("/dev/null")
+ tor.storageOpener = storage.NewClient(storage.NewFile("/dev/null"))
// Needed to lock for asynchronous piece verification.
tor.cl = new(Client)
err := tor.setInfoBytes(mi.InfoBytes)
type FileCacheClientStorageFactoryParams struct {
Capacity int64
SetCapacity bool
- Wrapper func(*filecache.Cache) storage.Client
+ Wrapper func(*filecache.Cache) storage.ClientImpl
}
func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
- return func(dataDir string) storage.Client {
+ return func(dataDir string) storage.ClientImpl {
fc, err := filecache.NewCache(dataDir)
if err != nil {
panic(err)
}
}
-type storageFactory func(string) storage.Client
+type storageFactory func(string) storage.ClientImpl
func TestClientTransferDefault(t *testing.T) {
testClientTransfer(t, testClientTransferParams{
})
}
-func fileCachePieceResourceStorage(fc *filecache.Cache) storage.Client {
+func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
return storage.NewResourcePieces(fc.AsResourceProvider())
}
-func fileCachePieceFileStorage(fc *filecache.Cache) storage.Client {
+func fileCachePieceFileStorage(fc *filecache.Cache) storage.ClientImpl {
return storage.NewFileStorePieces(fc.AsFileStore())
}
}),
storage.NewBoltDB,
} {
- for _, ss := range []func(string) storage.Client{
+ for _, ss := range []func(string) storage.ClientImpl{
storage.NewFile,
storage.NewMMap,
} {
Readahead int64
SetReadahead bool
ExportClientStatus bool
- LeecherStorage func(string) storage.Client
- SeederStorage func(string) storage.Client
+ LeecherStorage func(string) storage.ClientImpl
+ SeederStorage func(string) storage.ClientImpl
}
// Creates a seeder and a leecher, and ensures the data transfers when a read
type badStorage struct{}
-func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.Torrent, error) {
+func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
return bs, nil
}
return nil
}
-func (bs badStorage) Piece(p metainfo.Piece) storage.Piece {
+func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
return badStoragePiece{p}
}
return errors.New("psyyyyyyyche")
}
+func (p badStoragePiece) MarkNotComplete() error {
+ return errors.New("psyyyyyyyche")
+}
+
func (p badStoragePiece) randomlyTruncatedDataString() string {
return "hello, world\n"[:rand.Intn(14)]
}
}
}
-func writeTorrentData(ts storage.Torrent, info metainfo.Info, b []byte) {
+func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
for i := range iter.N(info.NumPieces()) {
- n, _ := ts.Piece(info.Piece(i)).WriteAt(b, 0)
- b = b[n:]
+ p := info.Piece(i)
+ ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
}
}
-func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.Client) {
+func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
fileCacheDir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(fileCacheDir)
filePieceStore := csf(fileCache)
info := greetingMetainfo.UnmarshalInfo()
ih := greetingMetainfo.HashInfoBytes()
- greetingData, err := filePieceStore.OpenTorrent(&info, ih)
+ greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
require.NoError(t, err)
writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
// require.Equal(t, len(testutil.GreetingFileContents), written)
DisableTCP bool `long:"disable-tcp"`
// Called to instantiate storage for each added torrent. Provided backends
// are in $REPO/data. If not set, the "file" implementation is used.
- DefaultStorage storage.Client
+ DefaultStorage storage.ClientImpl
DisableEncryption bool `long:"disable-encryption"`
IPBlocklist iplist.Ranger
td, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(td)
- cs := storage.NewFile(td)
+ cs := storage.NewClient(storage.NewFile(td))
tt := &Torrent{}
mi := testutil.GreetingMetaInfo()
info := mi.UnmarshalInfo()
import (
"encoding/binary"
- "io"
"path/filepath"
- "github.com/anacrolix/missinggo"
"github.com/boltdb/bolt"
"github.com/anacrolix/torrent/metainfo"
key [24]byte
}
-func NewBoltDB(filePath string) Client {
+func NewBoltDB(filePath string) ClientImpl {
ret := &boltDBClient{}
var err error
ret.db, err = bolt.Open(filepath.Join(filePath, "bolt.db"), 0600, nil)
return ret
}
-func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) {
+func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return &boltDBTorrent{me, infoHash}, nil
}
-func (me *boltDBTorrent) Piece(p metainfo.Piece) Piece {
+func (me *boltDBTorrent) Piece(p metainfo.Piece) PieceImpl {
ret := &boltDBPiece{p: p, db: me.cl.db}
copy(ret.key[:], me.ih[:])
binary.BigEndian.PutUint32(ret.key[20:], uint32(p.Index()))
}
func (me *boltDBPiece) MarkComplete() error {
- return me.db.Update(func(tx *bolt.Tx) (err error) {
+ return me.db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(completed)
if err != nil {
- return
+ return err
}
- b.Put(me.key[:], completedValue)
- return
+ return b.Put(me.key[:], completedValue)
})
}
+func (me *boltDBPiece) MarkNotComplete() error {
+ return me.db.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket(completed)
+ if b == nil {
+ return nil
+ }
+ return b.Delete(me.key[:])
+ })
+}
func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
err = me.db.View(func(tx *bolt.Tx) error {
db := tx.Bucket(data)
}
return nil
})
- if n == 0 && err == nil {
- if off < me.p.Length() {
- err = io.ErrUnexpectedEOF
- } else {
- err = io.EOF
- }
- }
- // // log.Println(n, err)
return
}
baseDir string
}
-func NewFile(baseDir string) Client {
+func NewFile(baseDir string) ClientImpl {
return &fileStorage{
baseDir: baseDir,
}
}
-func (fs *fileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) {
+func (fs *fileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return &fileTorrentStorage{
fs,
info,
completion pieceCompletion
}
-func (fts *fileTorrentStorage) Piece(p metainfo.Piece) Piece {
+func (fts *fileTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
// Create a view onto the file-based torrent storage.
_io := fileStorageTorrent{fts}
// Return the appropriate segments of this.
*fileTorrentStorage
p metainfo.Piece
io.WriterAt
- r io.ReaderAt
+ io.ReaderAt
}
func (me *fileStoragePiece) pieceKey() metainfo.PieceKey {
return nil
}
-func (fsp *fileStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
- n, err = fsp.r.ReadAt(b, off)
- if n != 0 {
- err = nil
- return
- }
- if off < 0 || off >= fsp.p.Length() {
- return
- }
- fsp.completion.Set(fsp.pieceKey(), false)
- return
+func (fs *fileStoragePiece) MarkNotComplete() error {
+ fs.completion.Set(fs.pieceKey(), false)
+ return nil
}
)
// Represents data storage for an unspecified torrent.
-type Client interface {
- OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error)
+type ClientImpl interface {
+ OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error)
}
// Data storage bound to a torrent.
-type Torrent interface {
- Piece(metainfo.Piece) Piece
+type TorrentImpl interface {
+ Piece(metainfo.Piece) PieceImpl
Close() error
}
// Interacts with torrent piece data.
-type Piece interface {
- // Should return io.EOF only at end of torrent. Short reads due to missing
- // data should return io.ErrUnexpectedEOF.
+type PieceImpl interface {
+ // These interfaces are not as strict as normally required. They can
+ // assume that the parameters are appropriate for the dimentions of the
+ // piece.
io.ReaderAt
io.WriterAt
// Called when the client believes the piece data will pass a hash check.
// The storage can move or mark the piece data as read-only as it sees
// fit.
MarkComplete() error
+ MarkNotComplete() error
// Returns true if the piece is complete.
GetIsComplete() bool
}
// Two different torrents opened from the same storage. Closing one should not
// break the piece completion on the other.
-func testIssue95(t *testing.T, c Client) {
+func testIssue95(t *testing.T, c ClientImpl) {
i1 := &metainfo.Info{
Files: []metainfo.FileInfo{{Path: []string{"a"}}},
Pieces: make([]byte, 20),
"github.com/anacrolix/torrent/metainfo"
)
-func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) Client) {
+func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) ClientImpl) {
td, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(td)
- cs := csf(td)
+ cs := NewClient(csf(td))
info := &metainfo.Info{
PieceLength: 1,
Files: []metainfo.FileInfo{{Path: []string{"a"}, Length: 1}},
require.NoError(t, err)
p := ts.Piece(info.Piece(0))
require.NoError(t, p.MarkComplete())
- require.False(t, p.GetIsComplete())
+ // require.False(t, p.GetIsComplete())
n, err := p.ReadAt(make([]byte, 1), 0)
require.Error(t, err)
require.EqualValues(t, 0, n)
baseDir string
}
-func NewMMap(baseDir string) Client {
+func NewMMap(baseDir string) ClientImpl {
return &mmapStorage{
baseDir: baseDir,
}
}
-func (s *mmapStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t Torrent, err error) {
+func (s *mmapStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t TorrentImpl, err error) {
span, err := mMapTorrent(info, s.baseDir)
t = &mmapTorrentStorage{
span: span,
pc pieceCompletion
}
-func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) Piece {
+func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
return mmapStoragePiece{
pc: ts.pc,
p: p,
return nil
}
+func (sp mmapStoragePiece) MarkNotComplete() error {
+ sp.pc.Set(sp.pieceKey(), false)
+ return nil
+}
+
func mMapTorrent(md *metainfo.Info, location string) (mms mmap_span.MMapSpan, err error) {
defer func() {
if err != nil {
package storage
import (
- "errors"
"io"
"os"
"path"
fs missinggo.FileStore
}
-func NewFileStorePieces(fs missinggo.FileStore) Client {
+func NewFileStorePieces(fs missinggo.FileStore) ClientImpl {
return &pieceFileStorage{
fs: fs,
}
s *pieceFileStorage
}
-func (s *pieceFileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) {
+func (s *pieceFileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return &pieceFileTorrentStorage{s}, nil
}
return nil
}
-func (s *pieceFileTorrentStorage) Piece(p metainfo.Piece) Piece {
+func (s *pieceFileTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
return pieceFileTorrentStoragePiece{s, p, s.s.fs}
}
return s.fs.Rename(s.incompletePath(), s.completedPath())
}
+func (s pieceFileTorrentStoragePiece) MarkNotComplete() error {
+ return s.fs.Remove(s.completedPath())
+}
+
func (s pieceFileTorrentStoragePiece) openFile() (f missinggo.File, err error) {
f, err = s.fs.OpenFile(s.completedPath(), os.O_RDONLY)
if err == nil {
return
}
defer f.Close()
- missinggo.LimitLen(&b, s.p.Length()-off)
- n, err = f.ReadAt(b, off)
- off += int64(n)
- if off >= s.p.Length() {
- err = io.EOF
- } else if err == io.EOF {
- err = io.ErrUnexpectedEOF
- }
- return
+ return f.ReadAt(b, off)
}
func (s pieceFileTorrentStoragePiece) WriteAt(b []byte, off int64) (n int, err error) {
- if s.GetIsComplete() {
- err = errors.New("piece completed")
- return
- }
f, err := s.fs.OpenFile(s.incompletePath(), os.O_WRONLY|os.O_CREATE)
if err != nil {
return
}
defer f.Close()
- missinggo.LimitLen(&b, s.p.Length()-off)
return f.WriteAt(b, off)
}
package storage
import (
- "io"
"path"
- "github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/resource"
"github.com/anacrolix/torrent/metainfo"
p resource.Provider
}
-func NewResourcePieces(p resource.Provider) Client {
+func NewResourcePieces(p resource.Provider) ClientImpl {
return &piecePerResource{
p: p,
}
}
-func (s *piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) {
+func (s *piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return s, nil
}
return nil
}
-func (s *piecePerResource) Piece(p metainfo.Piece) Piece {
+func (s *piecePerResource) Piece(p metainfo.Piece) PieceImpl {
completed, err := s.p.NewInstance(path.Join("completed", p.Hash().HexString()))
if err != nil {
panic(err)
return resource.Move(s.i, s.c)
}
-func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (n int, err error) {
- missinggo.LimitLen(&b, s.p.Length()-off)
- n, err = s.c.ReadAt(b, off)
- if err != nil {
- n, err = s.i.ReadAt(b, off)
- }
- off += int64(n)
- if off >= s.p.Length() {
- err = io.EOF
- } else if err == io.EOF {
- err = io.ErrUnexpectedEOF
+func (s piecePerResourcePiece) MarkNotComplete() error {
+ return s.c.Delete()
+}
+
+func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
+ if s.GetIsComplete() {
+ return s.c.ReadAt(b, off)
+ } else {
+ return s.i.ReadAt(b, off)
}
- return
}
func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
- missinggo.LimitLen(&b, s.p.Length()-off)
return s.i.WriteAt(b, off)
}
--- /dev/null
+package storage
+
+import (
+ "errors"
+ "io"
+ "os"
+
+ "github.com/anacrolix/missinggo"
+
+ "github.com/anacrolix/torrent/metainfo"
+)
+
+type Client struct {
+ ClientImpl
+}
+
+func NewClient(cl ClientImpl) *Client {
+ return &Client{cl}
+}
+
+func (cl Client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (*Torrent, error) {
+ t, err := cl.ClientImpl.OpenTorrent(info, infoHash)
+ return &Torrent{t}, err
+}
+
+type Torrent struct {
+ TorrentImpl
+}
+
+func (t Torrent) Piece(p metainfo.Piece) Piece {
+ return Piece{t.TorrentImpl.Piece(p), p}
+}
+
+type Piece struct {
+ PieceImpl
+ mip metainfo.Piece
+}
+
+func (p Piece) WriteAt(b []byte, off int64) (n int, err error) {
+ if p.GetIsComplete() {
+ err = errors.New("piece completed")
+ return
+ }
+ if off+int64(len(b)) > p.mip.Length() {
+ panic("write overflows piece")
+ }
+ missinggo.LimitLen(&b, p.mip.Length()-off)
+ return p.PieceImpl.WriteAt(b, off)
+}
+
+func (p Piece) ReadAt(b []byte, off int64) (n int, err error) {
+ if off < 0 {
+ err = os.ErrInvalid
+ return
+ }
+ if off >= p.mip.Length() {
+ err = io.EOF
+ return
+ }
+ missinggo.LimitLen(&b, p.mip.Length()-off)
+ if len(b) == 0 {
+ return
+ }
+ n, err = p.PieceImpl.ReadAt(b, off)
+ if n > len(b) {
+ panic(n)
+ }
+ off += int64(n)
+ if err == io.EOF && off < p.mip.Length() {
+ err = io.ErrUnexpectedEOF
+ }
+ if err == nil && off >= p.mip.Length() {
+ err = io.EOF
+ }
+ if n == 0 && err == nil {
+ err = io.ErrUnexpectedEOF
+ }
+ if off < p.mip.Length() && err != nil {
+ p.MarkNotComplete()
+ }
+ return
+}
length int64
// The storage to open when the info dict becomes available.
- storageOpener storage.Client
+ storageOpener *storage.Client
// Storage for torrent data.
- storage storage.Torrent
+ storage *storage.Torrent
metainfo metainfo.MetaInfo
func (t *Torrent) close() (err error) {
t.closed.Set()
- if c, ok := t.storage.(io.Closer); ok {
- c.Close()
+ if t.storage != nil {
+ t.storage.Close()
}
for _, conn := range t.conns {
conn.Close()