]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework storage interfaces to make them simpler to implement
authorMatt Joiner <anacrolix@gmail.com>
Fri, 2 Sep 2016 05:10:57 +0000 (15:10 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 2 Sep 2016 05:10:57 +0000 (15:10 +1000)
This allows lots of behaviour to be baked into the new Client, Torrent and Piece wrappers, rather than duplicating (badly) them in all the backend implementations.

15 files changed:
client.go
client_test.go
config.go
issue97_test.go
storage/boltdb.go
storage/file.go
storage/file_storage_piece.go
storage/interface.go
storage/issue95_test.go
storage/issue96_test.go
storage/mmap.go
storage/piece_file.go
storage/piece_resource.go
storage/wrappers.go [new file with mode: 0644]
torrent.go

index 4de098f34c5d8e4836db08bcfa4a67ecdf597146..98c9b12d770bbc288ef29c2c082c135d8c939525 100644 (file)
--- a/client.go
+++ b/client.go
@@ -74,7 +74,7 @@ type Client struct {
        dopplegangerAddrs map[string]struct{}
        badPeerIPs        map[string]struct{}
 
-       defaultStorage storage.Client
+       defaultStorage *storage.Client
 
        mu     sync.RWMutex
        event  sync.Cond
@@ -253,15 +253,16 @@ func NewClient(cfg *Config) (cl *Client, err error) {
        cl = &Client{
                halfOpenLimit:     defaultHalfOpenConnsPerTorrent,
                config:            *cfg,
-               defaultStorage:    cfg.DefaultStorage,
                dopplegangerAddrs: make(map[string]struct{}),
                torrents:          make(map[metainfo.Hash]*Torrent),
        }
        missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
        cl.event.L = &cl.mu
-       if cl.defaultStorage == nil {
-               cl.defaultStorage = storage.NewFile(cfg.DataDir)
+       storageImpl := cfg.DefaultStorage
+       if storageImpl == nil {
+               storageImpl = storage.NewFile(cfg.DataDir)
        }
+       cl.defaultStorage = storage.NewClient(storageImpl)
        if cfg.IPBlocklist != nil {
                cl.ipBlockList = cfg.IPBlocklist
        }
@@ -1417,7 +1418,7 @@ type TorrentSpec struct {
        // The chunk size to use for outbound requests. Defaults to 16KiB if not
        // set.
        ChunkSize int
-       Storage   storage.Client
+       Storage   storage.ClientImpl
 }
 
 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
index e85cb7c2ab409eeae418c35052dd7bf39d418257..9c9cb9f5e9367c22757ae47c35daa31032d67c6c 100644 (file)
@@ -92,7 +92,7 @@ func TestTorrentInitialState(t *testing.T) {
                pieceStateChanges: pubsub.NewPubSub(),
        }
        tor.chunkSize = 2
-       tor.storageOpener = storage.NewFile("/dev/null")
+       tor.storageOpener = storage.NewClient(storage.NewFile("/dev/null"))
        // Needed to lock for asynchronous piece verification.
        tor.cl = new(Client)
        err := tor.setInfoBytes(mi.InfoBytes)
@@ -241,11 +241,11 @@ func TestAddDropManyTorrents(t *testing.T) {
 type FileCacheClientStorageFactoryParams struct {
        Capacity    int64
        SetCapacity bool
-       Wrapper     func(*filecache.Cache) storage.Client
+       Wrapper     func(*filecache.Cache) storage.ClientImpl
 }
 
 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
-       return func(dataDir string) storage.Client {
+       return func(dataDir string) storage.ClientImpl {
                fc, err := filecache.NewCache(dataDir)
                if err != nil {
                        panic(err)
@@ -257,7 +257,7 @@ func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) st
        }
 }
 
-type storageFactory func(string) storage.Client
+type storageFactory func(string) storage.ClientImpl
 
 func TestClientTransferDefault(t *testing.T) {
        testClientTransfer(t, testClientTransferParams{
@@ -268,11 +268,11 @@ func TestClientTransferDefault(t *testing.T) {
        })
 }
 
-func fileCachePieceResourceStorage(fc *filecache.Cache) storage.Client {
+func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
        return storage.NewResourcePieces(fc.AsResourceProvider())
 }
 
-func fileCachePieceFileStorage(fc *filecache.Cache) storage.Client {
+func fileCachePieceFileStorage(fc *filecache.Cache) storage.ClientImpl {
        return storage.NewFileStorePieces(fc.AsFileStore())
 }
 
@@ -303,7 +303,7 @@ func TestClientTransferVarious(t *testing.T) {
                }),
                storage.NewBoltDB,
        } {
-               for _, ss := range []func(string) storage.Client{
+               for _, ss := range []func(string) storage.ClientImpl{
                        storage.NewFile,
                        storage.NewMMap,
                } {
@@ -332,8 +332,8 @@ type testClientTransferParams struct {
        Readahead          int64
        SetReadahead       bool
        ExportClientStatus bool
-       LeecherStorage     func(string) storage.Client
-       SeederStorage      func(string) storage.Client
+       LeecherStorage     func(string) storage.ClientImpl
+       SeederStorage      func(string) storage.ClientImpl
 }
 
 // Creates a seeder and a leecher, and ensures the data transfers when a read
@@ -493,7 +493,7 @@ func TestMergingTrackersByAddingSpecs(t *testing.T) {
 
 type badStorage struct{}
 
-func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.Torrent, error) {
+func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
        return bs, nil
 }
 
@@ -501,7 +501,7 @@ func (bs badStorage) Close() error {
        return nil
 }
 
-func (bs badStorage) Piece(p metainfo.Piece) storage.Piece {
+func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
        return badStoragePiece{p}
 }
 
@@ -521,6 +521,10 @@ func (p badStoragePiece) MarkComplete() error {
        return errors.New("psyyyyyyyche")
 }
 
+func (p badStoragePiece) MarkNotComplete() error {
+       return errors.New("psyyyyyyyche")
+}
+
 func (p badStoragePiece) randomlyTruncatedDataString() string {
        return "hello, world\n"[:rand.Intn(14)]
 }
@@ -709,14 +713,14 @@ func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
        }
 }
 
-func writeTorrentData(ts storage.Torrent, info metainfo.Info, b []byte) {
+func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
        for i := range iter.N(info.NumPieces()) {
-               n, _ := ts.Piece(info.Piece(i)).WriteAt(b, 0)
-               b = b[n:]
+               p := info.Piece(i)
+               ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
        }
 }
 
-func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.Client) {
+func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
        fileCacheDir, err := ioutil.TempDir("", "")
        require.NoError(t, err)
        defer os.RemoveAll(fileCacheDir)
@@ -727,7 +731,7 @@ func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf
        filePieceStore := csf(fileCache)
        info := greetingMetainfo.UnmarshalInfo()
        ih := greetingMetainfo.HashInfoBytes()
-       greetingData, err := filePieceStore.OpenTorrent(&info, ih)
+       greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
        require.NoError(t, err)
        writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
        // require.Equal(t, len(testutil.GreetingFileContents), written)
index 9f645b0ae6f331c8ee6bf5b38a440b13c72ed7aa..9ea764161736b77816e8cc98e3d3958da2e756ba 100644 (file)
--- a/config.go
+++ b/config.go
@@ -35,7 +35,7 @@ type Config struct {
        DisableTCP bool `long:"disable-tcp"`
        // Called to instantiate storage for each added torrent. Provided backends
        // are in $REPO/data. If not set, the "file" implementation is used.
-       DefaultStorage    storage.Client
+       DefaultStorage    storage.ClientImpl
        DisableEncryption bool `long:"disable-encryption"`
 
        IPBlocklist iplist.Ranger
index 1e6373cc64fe7e760b0432b403fa07e8b01068aa..2695e5b70972698d070f8f9f06e418522b028490 100644 (file)
@@ -15,7 +15,7 @@ func TestHashPieceAfterStorageClosed(t *testing.T) {
        td, err := ioutil.TempDir("", "")
        require.NoError(t, err)
        defer os.RemoveAll(td)
-       cs := storage.NewFile(td)
+       cs := storage.NewClient(storage.NewFile(td))
        tt := &Torrent{}
        mi := testutil.GreetingMetaInfo()
        info := mi.UnmarshalInfo()
index b7c40645c9ec674f83d1719192eabe54a095b2ce..140dc2341a00ba312da7814d2963e5dc06bf9326 100644 (file)
@@ -2,10 +2,8 @@ package storage
 
 import (
        "encoding/binary"
-       "io"
        "path/filepath"
 
-       "github.com/anacrolix/missinggo"
        "github.com/boltdb/bolt"
 
        "github.com/anacrolix/torrent/metainfo"
@@ -43,7 +41,7 @@ type boltDBPiece struct {
        key [24]byte
 }
 
-func NewBoltDB(filePath string) Client {
+func NewBoltDB(filePath string) ClientImpl {
        ret := &boltDBClient{}
        var err error
        ret.db, err = bolt.Open(filepath.Join(filePath, "bolt.db"), 0600, nil)
@@ -53,11 +51,11 @@ func NewBoltDB(filePath string) Client {
        return ret
 }
 
-func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) {
+func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
        return &boltDBTorrent{me, infoHash}, nil
 }
 
-func (me *boltDBTorrent) Piece(p metainfo.Piece) Piece {
+func (me *boltDBTorrent) Piece(p metainfo.Piece) PieceImpl {
        ret := &boltDBPiece{p: p, db: me.cl.db}
        copy(ret.key[:], me.ih[:])
        binary.BigEndian.PutUint32(ret.key[20:], uint32(p.Index()))
@@ -82,16 +80,24 @@ func (me *boltDBPiece) GetIsComplete() (complete bool) {
 }
 
 func (me *boltDBPiece) MarkComplete() error {
-       return me.db.Update(func(tx *bolt.Tx) (err error) {
+       return me.db.Update(func(tx *bolt.Tx) error {
                b, err := tx.CreateBucketIfNotExists(completed)
                if err != nil {
-                       return
+                       return err
                }
-               b.Put(me.key[:], completedValue)
-               return
+               return b.Put(me.key[:], completedValue)
        })
 }
 
+func (me *boltDBPiece) MarkNotComplete() error {
+       return me.db.Update(func(tx *bolt.Tx) error {
+               b := tx.Bucket(completed)
+               if b == nil {
+                       return nil
+               }
+               return b.Delete(me.key[:])
+       })
+}
 func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
        err = me.db.View(func(tx *bolt.Tx) error {
                db := tx.Bucket(data)
@@ -114,14 +120,6 @@ func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
                }
                return nil
        })
-       if n == 0 && err == nil {
-               if off < me.p.Length() {
-                       err = io.ErrUnexpectedEOF
-               } else {
-                       err = io.EOF
-               }
-       }
-       // // log.Println(n, err)
        return
 }
 
index bae53134d054b9eae5c121430b610184d1bbb91a..3e686aa26f58634a48832f9b323d5191f1b76743 100644 (file)
@@ -17,13 +17,13 @@ type fileStorage struct {
        baseDir string
 }
 
-func NewFile(baseDir string) Client {
+func NewFile(baseDir string) ClientImpl {
        return &fileStorage{
                baseDir: baseDir,
        }
 }
 
-func (fs *fileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) {
+func (fs *fileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
        return &fileTorrentStorage{
                fs,
                info,
@@ -40,7 +40,7 @@ type fileTorrentStorage struct {
        completion pieceCompletion
 }
 
-func (fts *fileTorrentStorage) Piece(p metainfo.Piece) Piece {
+func (fts *fileTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
        // Create a view onto the file-based torrent storage.
        _io := fileStorageTorrent{fts}
        // Return the appropriate segments of this.
index 915d5d0d1b637c76eec8956b880f2448ed70fdbd..5dce1ea391988ccea655084d706bd115ec4c6ac2 100644 (file)
@@ -11,7 +11,7 @@ type fileStoragePiece struct {
        *fileTorrentStorage
        p metainfo.Piece
        io.WriterAt
-       io.ReaderAt
+       io.ReaderAt
 }
 
 func (me *fileStoragePiece) pieceKey() metainfo.PieceKey {
@@ -45,15 +45,7 @@ func (fs *fileStoragePiece) MarkComplete() error {
        return nil
 }
 
-func (fsp *fileStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
-       n, err = fsp.r.ReadAt(b, off)
-       if n != 0 {
-               err = nil
-               return
-       }
-       if off < 0 || off >= fsp.p.Length() {
-               return
-       }
-       fsp.completion.Set(fsp.pieceKey(), false)
-       return
+func (fs *fileStoragePiece) MarkNotComplete() error {
+       fs.completion.Set(fs.pieceKey(), false)
+       return nil
 }
index 24716e0cba1f8011fbd63d86169953a9271b9120..132d4c856dd559cf9e4f46c0ffb8b969b1c0dd7a 100644 (file)
@@ -7,26 +7,28 @@ import (
 )
 
 // Represents data storage for an unspecified torrent.
-type Client interface {
-       OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error)
+type ClientImpl interface {
+       OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error)
 }
 
 // Data storage bound to a torrent.
-type Torrent interface {
-       Piece(metainfo.Piece) Piece
+type TorrentImpl interface {
+       Piece(metainfo.Piece) PieceImpl
        Close() error
 }
 
 // Interacts with torrent piece data.
-type Piece interface {
-       // Should return io.EOF only at end of torrent. Short reads due to missing
-       // data should return io.ErrUnexpectedEOF.
+type PieceImpl interface {
+       // These interfaces are not as strict as normally required. They can
+       // assume that the parameters are appropriate for the dimentions of the
+       // piece.
        io.ReaderAt
        io.WriterAt
        // Called when the client believes the piece data will pass a hash check.
        // The storage can move or mark the piece data as read-only as it sees
        // fit.
        MarkComplete() error
+       MarkNotComplete() error
        // Returns true if the piece is complete.
        GetIsComplete() bool
 }
index 39072cdbd219448d963bcfe4ed377ecf741e8235..bb2c2fe9a3f19ec350db6a271e2f03d436800a64 100644 (file)
@@ -14,7 +14,7 @@ import (
 
 // Two different torrents opened from the same storage. Closing one should not
 // break the piece completion on the other.
-func testIssue95(t *testing.T, c Client) {
+func testIssue95(t *testing.T, c ClientImpl) {
        i1 := &metainfo.Info{
                Files:  []metainfo.FileInfo{{Path: []string{"a"}}},
                Pieces: make([]byte, 20),
index 3339b9b0f1bc92b3f80defc27484831113e981fa..b7267ba4ca76aaffae289ef1dbd58f33302772d8 100644 (file)
@@ -10,11 +10,11 @@ import (
        "github.com/anacrolix/torrent/metainfo"
 )
 
-func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) Client) {
+func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) ClientImpl) {
        td, err := ioutil.TempDir("", "")
        require.NoError(t, err)
        defer os.RemoveAll(td)
-       cs := csf(td)
+       cs := NewClient(csf(td))
        info := &metainfo.Info{
                PieceLength: 1,
                Files:       []metainfo.FileInfo{{Path: []string{"a"}, Length: 1}},
@@ -23,7 +23,7 @@ func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) Client) {
        require.NoError(t, err)
        p := ts.Piece(info.Piece(0))
        require.NoError(t, p.MarkComplete())
-       require.False(t, p.GetIsComplete())
+       // require.False(t, p.GetIsComplete())
        n, err := p.ReadAt(make([]byte, 1), 0)
        require.Error(t, err)
        require.EqualValues(t, 0, n)
index 3d176f361ceded306dadcf1da0fbc240c76150c6..c8b4c2eaf369c31b9ba3755ca545736bcb7c6b8a 100644 (file)
@@ -17,13 +17,13 @@ type mmapStorage struct {
        baseDir string
 }
 
-func NewMMap(baseDir string) Client {
+func NewMMap(baseDir string) ClientImpl {
        return &mmapStorage{
                baseDir: baseDir,
        }
 }
 
-func (s *mmapStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t Torrent, err error) {
+func (s *mmapStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t TorrentImpl, err error) {
        span, err := mMapTorrent(info, s.baseDir)
        t = &mmapTorrentStorage{
                span: span,
@@ -37,7 +37,7 @@ type mmapTorrentStorage struct {
        pc   pieceCompletion
 }
 
-func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) Piece {
+func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
        return mmapStoragePiece{
                pc:       ts.pc,
                p:        p,
@@ -73,6 +73,11 @@ func (sp mmapStoragePiece) MarkComplete() error {
        return nil
 }
 
+func (sp mmapStoragePiece) MarkNotComplete() error {
+       sp.pc.Set(sp.pieceKey(), false)
+       return nil
+}
+
 func mMapTorrent(md *metainfo.Info, location string) (mms mmap_span.MMapSpan, err error) {
        defer func() {
                if err != nil {
index 36fe0664351c91984f0f9995817a67657cabc903..77db0496f9b8af46451e624112a39793b0a426a3 100644 (file)
@@ -1,7 +1,6 @@
 package storage
 
 import (
-       "errors"
        "io"
        "os"
        "path"
@@ -15,7 +14,7 @@ type pieceFileStorage struct {
        fs missinggo.FileStore
 }
 
-func NewFileStorePieces(fs missinggo.FileStore) Client {
+func NewFileStorePieces(fs missinggo.FileStore) ClientImpl {
        return &pieceFileStorage{
                fs: fs,
        }
@@ -25,7 +24,7 @@ type pieceFileTorrentStorage struct {
        s *pieceFileStorage
 }
 
-func (s *pieceFileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) {
+func (s *pieceFileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
        return &pieceFileTorrentStorage{s}, nil
 }
 
@@ -33,7 +32,7 @@ func (s *pieceFileTorrentStorage) Close() error {
        return nil
 }
 
-func (s *pieceFileTorrentStorage) Piece(p metainfo.Piece) Piece {
+func (s *pieceFileTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
        return pieceFileTorrentStoragePiece{s, p, s.s.fs}
 }
 
@@ -60,6 +59,10 @@ func (s pieceFileTorrentStoragePiece) MarkComplete() error {
        return s.fs.Rename(s.incompletePath(), s.completedPath())
 }
 
+func (s pieceFileTorrentStoragePiece) MarkNotComplete() error {
+       return s.fs.Remove(s.completedPath())
+}
+
 func (s pieceFileTorrentStoragePiece) openFile() (f missinggo.File, err error) {
        f, err = s.fs.OpenFile(s.completedPath(), os.O_RDONLY)
        if err == nil {
@@ -85,27 +88,14 @@ func (s pieceFileTorrentStoragePiece) ReadAt(b []byte, off int64) (n int, err er
                return
        }
        defer f.Close()
-       missinggo.LimitLen(&b, s.p.Length()-off)
-       n, err = f.ReadAt(b, off)
-       off += int64(n)
-       if off >= s.p.Length() {
-               err = io.EOF
-       } else if err == io.EOF {
-               err = io.ErrUnexpectedEOF
-       }
-       return
+       return f.ReadAt(b, off)
 }
 
 func (s pieceFileTorrentStoragePiece) WriteAt(b []byte, off int64) (n int, err error) {
-       if s.GetIsComplete() {
-               err = errors.New("piece completed")
-               return
-       }
        f, err := s.fs.OpenFile(s.incompletePath(), os.O_WRONLY|os.O_CREATE)
        if err != nil {
                return
        }
        defer f.Close()
-       missinggo.LimitLen(&b, s.p.Length()-off)
        return f.WriteAt(b, off)
 }
index 99d9e3bcb65af41917d06b18ab9b9388e23578b0..e46d923e92b5edfbd4ab7018b2242cea04dd57b5 100644 (file)
@@ -1,10 +1,8 @@
 package storage
 
 import (
-       "io"
        "path"
 
-       "github.com/anacrolix/missinggo"
        "github.com/anacrolix/missinggo/resource"
 
        "github.com/anacrolix/torrent/metainfo"
@@ -14,13 +12,13 @@ type piecePerResource struct {
        p resource.Provider
 }
 
-func NewResourcePieces(p resource.Provider) Client {
+func NewResourcePieces(p resource.Provider) ClientImpl {
        return &piecePerResource{
                p: p,
        }
 }
 
-func (s *piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) {
+func (s *piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
        return s, nil
 }
 
@@ -28,7 +26,7 @@ func (s *piecePerResource) Close() error {
        return nil
 }
 
-func (s *piecePerResource) Piece(p metainfo.Piece) Piece {
+func (s *piecePerResource) Piece(p metainfo.Piece) PieceImpl {
        completed, err := s.p.NewInstance(path.Join("completed", p.Hash().HexString()))
        if err != nil {
                panic(err)
@@ -59,22 +57,18 @@ func (s piecePerResourcePiece) MarkComplete() error {
        return resource.Move(s.i, s.c)
 }
 
-func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (n int, err error) {
-       missinggo.LimitLen(&b, s.p.Length()-off)
-       n, err = s.c.ReadAt(b, off)
-       if err != nil {
-               n, err = s.i.ReadAt(b, off)
-       }
-       off += int64(n)
-       if off >= s.p.Length() {
-               err = io.EOF
-       } else if err == io.EOF {
-               err = io.ErrUnexpectedEOF
+func (s piecePerResourcePiece) MarkNotComplete() error {
+       return s.c.Delete()
+}
+
+func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
+       if s.GetIsComplete() {
+               return s.c.ReadAt(b, off)
+       } else {
+               return s.i.ReadAt(b, off)
        }
-       return
 }
 
 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
-       missinggo.LimitLen(&b, s.p.Length()-off)
        return s.i.WriteAt(b, off)
 }
diff --git a/storage/wrappers.go b/storage/wrappers.go
new file mode 100644 (file)
index 0000000..8e90f0f
--- /dev/null
@@ -0,0 +1,82 @@
+package storage
+
+import (
+       "errors"
+       "io"
+       "os"
+
+       "github.com/anacrolix/missinggo"
+
+       "github.com/anacrolix/torrent/metainfo"
+)
+
+type Client struct {
+       ClientImpl
+}
+
+func NewClient(cl ClientImpl) *Client {
+       return &Client{cl}
+}
+
+func (cl Client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (*Torrent, error) {
+       t, err := cl.ClientImpl.OpenTorrent(info, infoHash)
+       return &Torrent{t}, err
+}
+
+type Torrent struct {
+       TorrentImpl
+}
+
+func (t Torrent) Piece(p metainfo.Piece) Piece {
+       return Piece{t.TorrentImpl.Piece(p), p}
+}
+
+type Piece struct {
+       PieceImpl
+       mip metainfo.Piece
+}
+
+func (p Piece) WriteAt(b []byte, off int64) (n int, err error) {
+       if p.GetIsComplete() {
+               err = errors.New("piece completed")
+               return
+       }
+       if off+int64(len(b)) > p.mip.Length() {
+               panic("write overflows piece")
+       }
+       missinggo.LimitLen(&b, p.mip.Length()-off)
+       return p.PieceImpl.WriteAt(b, off)
+}
+
+func (p Piece) ReadAt(b []byte, off int64) (n int, err error) {
+       if off < 0 {
+               err = os.ErrInvalid
+               return
+       }
+       if off >= p.mip.Length() {
+               err = io.EOF
+               return
+       }
+       missinggo.LimitLen(&b, p.mip.Length()-off)
+       if len(b) == 0 {
+               return
+       }
+       n, err = p.PieceImpl.ReadAt(b, off)
+       if n > len(b) {
+               panic(n)
+       }
+       off += int64(n)
+       if err == io.EOF && off < p.mip.Length() {
+               err = io.ErrUnexpectedEOF
+       }
+       if err == nil && off >= p.mip.Length() {
+               err = io.EOF
+       }
+       if n == 0 && err == nil {
+               err = io.ErrUnexpectedEOF
+       }
+       if off < p.mip.Length() && err != nil {
+               p.MarkNotComplete()
+       }
+       return
+}
index 5c592f420ad3c4e044dba36500ff80ee958949ea..767f36bff760a2da8f4a486a3e5cd269489027e6 100644 (file)
@@ -57,9 +57,9 @@ type Torrent struct {
        length int64
 
        // The storage to open when the info dict becomes available.
-       storageOpener storage.Client
+       storageOpener *storage.Client
        // Storage for torrent data.
-       storage storage.Torrent
+       storage *storage.Torrent
 
        metainfo metainfo.MetaInfo
 
@@ -550,8 +550,8 @@ func (t *Torrent) numPiecesCompleted() (num int) {
 
 func (t *Torrent) close() (err error) {
        t.closed.Set()
-       if c, ok := t.storage.(io.Closer); ok {
-               c.Close()
+       if t.storage != nil {
+               t.storage.Close()
        }
        for _, conn := range t.conns {
                conn.Close()