]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework storage.TorrentImpl to support shared capacity key
authorMatt Joiner <anacrolix@gmail.com>
Sun, 9 May 2021 13:40:44 +0000 (23:40 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 7 Jun 2021 03:01:39 +0000 (13:01 +1000)
bad_storage.go
peerconn_test.go
storage/bolt.go
storage/file.go
storage/interface.go
storage/mmap.go
storage/piece-resource.go
storage/sqlite/direct.go
test/issue377_test.go
test/transfer_test.go
torrent.go

index f984c639ccf3a428f4aa8ebf4b2b97383d343c52..fc15beb90762862c7e2f8b0c9fe39560d0a541eb 100644 (file)
@@ -15,11 +15,9 @@ type badStorage struct{}
 var _ storage.ClientImpl = badStorage{}
 
 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
-       return bs, nil
-}
-
-func (bs badStorage) Close() error {
-       return nil
+       return storage.TorrentImpl{
+               Piece: bs.Piece,
+       }, nil
 }
 
 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
index 7057e77dc9c3a1900c3a0356ec3939304fd5ded1..18fc98ef52925b91147bab7114851a3113cf3205 100644 (file)
@@ -98,7 +98,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
        ts := &torrentStorage{}
        t := &Torrent{
                cl:                cl,
-               storage:           &storage.Torrent{TorrentImpl: ts},
+               storage:           &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}},
                pieceStateChanges: pubsub.NewPubSub(),
        }
        require.NoError(b, t.setInfo(&metainfo.Info{
index 5104c68265097ef9fc22968c3e35fe2637de7aef..025c566524c89865c61dec7aa32567f006f0643f 100644 (file)
@@ -43,7 +43,11 @@ func (me *boltClient) Close() error {
 }
 
 func (me *boltClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
-       return &boltTorrent{me, infoHash}, nil
+       t := &boltTorrent{me, infoHash}
+       return TorrentImpl{
+               Piece: t.Piece,
+               Close: t.Close,
+       }, nil
 }
 
 func (me *boltTorrent) Piece(p metainfo.Piece) PieceImpl {
index 1a273a345f019df6fbdd213aea425c70c09eba77..d51a7f36f8d4f32aa90ac4e7be18a966b05d2e7b 100644 (file)
@@ -67,14 +67,16 @@ func (me *fileClientImpl) Close() error {
        return me.pc.Close()
 }
 
-func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
+func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
        dir := fs.pathMaker(fs.baseDir, info, infoHash)
        upvertedFiles := info.UpvertedFiles()
        files := make([]file, 0, len(upvertedFiles))
        for i, fileInfo := range upvertedFiles {
-               s, err := ToSafeFilePath(append([]string{info.Name}, fileInfo.Path...)...)
+               var s string
+               s, err = ToSafeFilePath(append([]string{info.Name}, fileInfo.Path...)...)
                if err != nil {
-                       return nil, fmt.Errorf("file %v has unsafe path %q: %w", i, fileInfo.Path, err)
+                       err = fmt.Errorf("file %v has unsafe path %q: %w", i, fileInfo.Path, err)
+                       return
                }
                f := file{
                        path:   filepath.Join(dir, s),
@@ -83,16 +85,21 @@ func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Has
                if f.length == 0 {
                        err = CreateNativeZeroLengthFile(f.path)
                        if err != nil {
-                               return nil, fmt.Errorf("creating zero length file: %w", err)
+                               err = fmt.Errorf("creating zero length file: %w", err)
+                               return
                        }
                }
                files = append(files, f)
        }
-       return &fileTorrentImpl{
+       t := &fileTorrentImpl{
                files,
                segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
                infoHash,
                fs.pc,
+       }
+       return TorrentImpl{
+               Piece: t.Piece,
+               Close: t.Close,
        }, nil
 }
 
index 869556f80a547265fad99ba61d919f77e57e59c1..c48c6e3405022665113682a324ef347fefd9bedb 100644 (file)
@@ -17,9 +17,11 @@ type ClientImpl interface {
 }
 
 // Data storage bound to a torrent.
-type TorrentImpl interface {
-       Piece(metainfo.Piece) PieceImpl
-       Close() error
+type TorrentImpl struct {
+       Piece func(metainfo.Piece) PieceImpl
+       Close func() error
+       // Storages that share the same value, will provide a pointer to the same function.
+       Capacity *func() *int64
 }
 
 // Interacts with torrent piece data. Optional interfaces to implement include io.WriterTo, such as
index c4e5b09e4579c482c27eabbbc7a3927b4526b221..3d996d9ef7790ff3b797c7060828447e0366133d 100644 (file)
@@ -30,14 +30,14 @@ func NewMMapWithCompletion(baseDir string, completion PieceCompletion) *mmapClie
        }
 }
 
-func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t TorrentImpl, err error) {
+func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
        span, err := mMapTorrent(info, s.baseDir)
-       t = &mmapTorrentStorage{
+       t := &mmapTorrentStorage{
                infoHash: infoHash,
                span:     span,
                pc:       s.pc,
        }
-       return
+       return TorrentImpl{Piece: t.Piece, Close: t.Close}, err
 }
 
 func (s *mmapClientImpl) Close() error {
index d56280bc5a2654806e6e6e4455d3bb584bda6ab9..ec3848df2111186259ce1cb76c9f6749aa8623f9 100644 (file)
@@ -26,6 +26,7 @@ type ResourcePiecesOpts struct {
        // Sized puts require being able to stream from a statement executed on another connection.
        // Without them, we buffer the entire read and then put that.
        NoSizedPuts bool
+       Capacity    *int64
 }
 
 func NewResourcePieces(p PieceProvider) ClientImpl {
@@ -49,10 +50,11 @@ func (piecePerResourceTorrentImpl) Close() error {
 }
 
 func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
-       return piecePerResourceTorrentImpl{
+       t := piecePerResourceTorrentImpl{
                s,
                make([]sync.RWMutex, info.NumPieces()),
-       }, nil
+       }
+       return TorrentImpl{Piece: t.Piece, Close: t.Close}, nil
 }
 
 func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
index d51e13215053a814f345bce31953cf8cb0926c9a..7748f92182780789be65f590c9389d119cbd91c3 100644 (file)
@@ -61,9 +61,24 @@ func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, er
        if opts.BlobFlushInterval != 0 {
                cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
        }
+       cl.capacity = cl.getCapacity
        return cl, nil
 }
 
+func (cl *client) getCapacity() (ret *int64) {
+       cl.l.Lock()
+       defer cl.l.Unlock()
+       err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error {
+               ret = new(int64)
+               *ret = stmt.ColumnInt64(0)
+               return nil
+       })
+       if err != nil {
+               panic(err)
+       }
+       return
+}
+
 type client struct {
        l           sync.Mutex
        conn        conn
@@ -71,6 +86,7 @@ type client struct {
        blobFlusher *time.Timer
        opts        NewDirectStorageOpts
        closed      bool
+       capacity    func() *int64
 }
 
 func (c *client) blobFlusherFunc() {
@@ -91,7 +107,8 @@ func (c *client) flushBlobs() {
 }
 
 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
-       return torrent{c}, nil
+       t := torrent{c}
+       return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil
 }
 
 func (c *client) Close() error {
index bd8c4357d5a5e5e9366266fec633184791e8207f..7456e9c452901bdfed35d0e9a15ca6ffa5690784 100644 (file)
@@ -124,7 +124,7 @@ func (me *diskFullStorage) Close() error {
 }
 
 func (d *diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
-       return d, nil
+       return storage.TorrentImpl{Piece: d.Piece, Close: d.Close}, nil
 }
 
 type pieceImpl struct {
index e58bb53bf056365071dc7e552c016dc2b549aa54..857cd44ff3a7b3a76c8b65472db67db256cfa211 100644 (file)
@@ -192,7 +192,6 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
 type fileCacheClientStorageFactoryParams struct {
        Capacity    int64
        SetCapacity bool
-       Wrapper     func(*filecache.Cache) storage.ClientImplCloser
 }
 
 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
@@ -201,10 +200,22 @@ func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) st
                if err != nil {
                        panic(err)
                }
+               var sharedCapacity *int64
                if ps.SetCapacity {
+                       sharedCapacity = &ps.Capacity
                        fc.SetCapacity(ps.Capacity)
                }
-               return ps.Wrapper(fc)
+               return struct {
+                       storage.ClientImpl
+                       io.Closer
+               }{
+                       storage.NewResourcePiecesOpts(
+                               fc.AsResourceProvider(),
+                               storage.ResourcePiecesOpts{
+                                       Capacity: sharedCapacity,
+                               }),
+                       ioutil.NopCloser(nil),
+               }
        }
 }
 
@@ -212,17 +223,13 @@ type storageFactory func(string) storage.ClientImplCloser
 
 func TestClientTransferDefault(t *testing.T) {
        testClientTransfer(t, testClientTransferParams{
-               LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
-                       Wrapper: fileCachePieceResourceStorage,
-               }),
+               LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
        })
 }
 
 func TestClientTransferDefaultNoMetadata(t *testing.T) {
        testClientTransfer(t, testClientTransferParams{
-               LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
-                       Wrapper: fileCachePieceResourceStorage,
-               }),
+               LeecherStorage:               newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
                LeecherStartsWithoutMetadata: true,
        })
 }
@@ -244,16 +251,6 @@ func TestClientTransferRateLimitedDownload(t *testing.T) {
        })
 }
 
-func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
-       return struct {
-               storage.ClientImpl
-               io.Closer
-       }{
-               storage.NewResourcePieces(fc.AsResourceProvider()),
-               ioutil.NopCloser(nil),
-       }
-}
-
 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
        testClientTransfer(t, testClientTransferParams{
                LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
@@ -261,7 +258,6 @@ func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int
                        // Going below the piece length means it can't complete a piece so
                        // that it can be hashed.
                        Capacity: 5,
-                       Wrapper:  fileCachePieceResourceStorage,
                }),
                SetReadahead: setReadahead,
                // Can't readahead too far or the cache will thrash and drop data we
@@ -324,9 +320,7 @@ func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase {
 func TestClientTransferVarious(t *testing.T) {
        // Leecher storage
        for _, ls := range []leecherStorageTestCase{
-               {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
-                       Wrapper: fileCachePieceResourceStorage,
-               }), 0},
+               {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), 0},
                {"Boltdb", storage.NewBoltDB, 0},
                {"SqliteDirect", func(s string) storage.ClientImplCloser {
                        path := filepath.Join(s, "sqlite3.db")
index e8cf36e47f27b01a1e1cf0e51cdda6835223055d..b6a6f2b7e76af18dae8caa955cb9317b472830ca 100644 (file)
@@ -760,7 +760,9 @@ func (t *Torrent) close() (err error) {
                func() {
                        t.storageLock.Lock()
                        defer t.storageLock.Unlock()
-                       t.storage.Close()
+                       if f := t.storage.Close; f != nil {
+                               f()
+                       }
                }()
        }
        t.iterPeers(func(p *Peer) {