From: Matt Joiner Date: Sun, 9 May 2021 13:40:44 +0000 (+1000) Subject: Rework storage.TorrentImpl to support shared capacity key X-Git-Tag: v1.29.0~31^2~54 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=5f8471e21b69e6a632ec2d74183c30c88ab6d294;p=btrtrc.git Rework storage.TorrentImpl to support shared capacity key --- diff --git a/bad_storage.go b/bad_storage.go index f984c639..fc15beb9 100644 --- a/bad_storage.go +++ b/bad_storage.go @@ -15,11 +15,9 @@ type badStorage struct{} var _ storage.ClientImpl = badStorage{} func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) { - return bs, nil -} - -func (bs badStorage) Close() error { - return nil + return storage.TorrentImpl{ + Piece: bs.Piece, + }, nil } func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl { diff --git a/peerconn_test.go b/peerconn_test.go index 7057e77d..18fc98ef 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -98,7 +98,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { ts := &torrentStorage{} t := &Torrent{ cl: cl, - storage: &storage.Torrent{TorrentImpl: ts}, + storage: &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}, pieceStateChanges: pubsub.NewPubSub(), } require.NoError(b, t.setInfo(&metainfo.Info{ diff --git a/storage/bolt.go b/storage/bolt.go index 5104c682..025c5665 100644 --- a/storage/bolt.go +++ b/storage/bolt.go @@ -43,7 +43,11 @@ func (me *boltClient) Close() error { } func (me *boltClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { - return &boltTorrent{me, infoHash}, nil + t := &boltTorrent{me, infoHash} + return TorrentImpl{ + Piece: t.Piece, + Close: t.Close, + }, nil } func (me *boltTorrent) Piece(p metainfo.Piece) PieceImpl { diff --git a/storage/file.go b/storage/file.go index 1a273a34..d51a7f36 100644 --- a/storage/file.go +++ b/storage/file.go @@ -67,14 +67,16 @@ func (me *fileClientImpl) Close() error { return me.pc.Close() } -func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { +func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) { dir := fs.pathMaker(fs.baseDir, info, infoHash) upvertedFiles := info.UpvertedFiles() files := make([]file, 0, len(upvertedFiles)) for i, fileInfo := range upvertedFiles { - s, err := ToSafeFilePath(append([]string{info.Name}, fileInfo.Path...)...) + var s string + s, err = ToSafeFilePath(append([]string{info.Name}, fileInfo.Path...)...) if err != nil { - return nil, fmt.Errorf("file %v has unsafe path %q: %w", i, fileInfo.Path, err) + err = fmt.Errorf("file %v has unsafe path %q: %w", i, fileInfo.Path, err) + return } f := file{ path: filepath.Join(dir, s), @@ -83,16 +85,21 @@ func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Has if f.length == 0 { err = CreateNativeZeroLengthFile(f.path) if err != nil { - return nil, fmt.Errorf("creating zero length file: %w", err) + err = fmt.Errorf("creating zero length file: %w", err) + return } } files = append(files, f) } - return &fileTorrentImpl{ + t := &fileTorrentImpl{ files, segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)), infoHash, fs.pc, + } + return TorrentImpl{ + Piece: t.Piece, + Close: t.Close, }, nil } diff --git a/storage/interface.go b/storage/interface.go index 869556f8..c48c6e34 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -17,9 +17,11 @@ type ClientImpl interface { } // Data storage bound to a torrent. -type TorrentImpl interface { - Piece(metainfo.Piece) PieceImpl - Close() error +type TorrentImpl struct { + Piece func(metainfo.Piece) PieceImpl + Close func() error + // Storages that share the same value, will provide a pointer to the same function. + Capacity *func() *int64 } // Interacts with torrent piece data. Optional interfaces to implement include io.WriterTo, such as diff --git a/storage/mmap.go b/storage/mmap.go index c4e5b09e..3d996d9e 100644 --- a/storage/mmap.go +++ b/storage/mmap.go @@ -30,14 +30,14 @@ func NewMMapWithCompletion(baseDir string, completion PieceCompletion) *mmapClie } } -func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t TorrentImpl, err error) { +func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) { span, err := mMapTorrent(info, s.baseDir) - t = &mmapTorrentStorage{ + t := &mmapTorrentStorage{ infoHash: infoHash, span: span, pc: s.pc, } - return + return TorrentImpl{Piece: t.Piece, Close: t.Close}, err } func (s *mmapClientImpl) Close() error { diff --git a/storage/piece-resource.go b/storage/piece-resource.go index d56280bc..ec3848df 100644 --- a/storage/piece-resource.go +++ b/storage/piece-resource.go @@ -26,6 +26,7 @@ type ResourcePiecesOpts struct { // Sized puts require being able to stream from a statement executed on another connection. // Without them, we buffer the entire read and then put that. NoSizedPuts bool + Capacity *int64 } func NewResourcePieces(p PieceProvider) ClientImpl { @@ -49,10 +50,11 @@ func (piecePerResourceTorrentImpl) Close() error { } func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { - return piecePerResourceTorrentImpl{ + t := piecePerResourceTorrentImpl{ s, make([]sync.RWMutex, info.NumPieces()), - }, nil + } + return TorrentImpl{Piece: t.Piece, Close: t.Close}, nil } func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl { diff --git a/storage/sqlite/direct.go b/storage/sqlite/direct.go index d51e1321..7748f921 100644 --- a/storage/sqlite/direct.go +++ b/storage/sqlite/direct.go @@ -61,9 +61,24 @@ func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, er if opts.BlobFlushInterval != 0 { cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc) } + cl.capacity = cl.getCapacity return cl, nil } +func (cl *client) getCapacity() (ret *int64) { + cl.l.Lock() + defer cl.l.Unlock() + err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error { + ret = new(int64) + *ret = stmt.ColumnInt64(0) + return nil + }) + if err != nil { + panic(err) + } + return +} + type client struct { l sync.Mutex conn conn @@ -71,6 +86,7 @@ type client struct { blobFlusher *time.Timer opts NewDirectStorageOpts closed bool + capacity func() *int64 } func (c *client) blobFlusherFunc() { @@ -91,7 +107,8 @@ func (c *client) flushBlobs() { } func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { - return torrent{c}, nil + t := torrent{c} + return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil } func (c *client) Close() error { diff --git a/test/issue377_test.go b/test/issue377_test.go index bd8c4357..7456e9c4 100644 --- a/test/issue377_test.go +++ b/test/issue377_test.go @@ -124,7 +124,7 @@ func (me *diskFullStorage) Close() error { } func (d *diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { - return d, nil + return storage.TorrentImpl{Piece: d.Piece, Close: d.Close}, nil } type pieceImpl struct { diff --git a/test/transfer_test.go b/test/transfer_test.go index e58bb53b..857cd44f 100644 --- a/test/transfer_test.go +++ b/test/transfer_test.go @@ -192,7 +192,6 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { type fileCacheClientStorageFactoryParams struct { Capacity int64 SetCapacity bool - Wrapper func(*filecache.Cache) storage.ClientImplCloser } func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory { @@ -201,10 +200,22 @@ func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) st if err != nil { panic(err) } + var sharedCapacity *int64 if ps.SetCapacity { + sharedCapacity = &ps.Capacity fc.SetCapacity(ps.Capacity) } - return ps.Wrapper(fc) + return struct { + storage.ClientImpl + io.Closer + }{ + storage.NewResourcePiecesOpts( + fc.AsResourceProvider(), + storage.ResourcePiecesOpts{ + Capacity: sharedCapacity, + }), + ioutil.NopCloser(nil), + } } } @@ -212,17 +223,13 @@ type storageFactory func(string) storage.ClientImplCloser func TestClientTransferDefault(t *testing.T) { testClientTransfer(t, testClientTransferParams{ - LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{ - Wrapper: fileCachePieceResourceStorage, - }), + LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), }) } func TestClientTransferDefaultNoMetadata(t *testing.T) { testClientTransfer(t, testClientTransferParams{ - LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{ - Wrapper: fileCachePieceResourceStorage, - }), + LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), LeecherStartsWithoutMetadata: true, }) } @@ -244,16 +251,6 @@ func TestClientTransferRateLimitedDownload(t *testing.T) { }) } -func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser { - return struct { - storage.ClientImpl - io.Closer - }{ - storage.NewResourcePieces(fc.AsResourceProvider()), - ioutil.NopCloser(nil), - } -} - func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) { testClientTransfer(t, testClientTransferParams{ LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{ @@ -261,7 +258,6 @@ func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int // Going below the piece length means it can't complete a piece so // that it can be hashed. Capacity: 5, - Wrapper: fileCachePieceResourceStorage, }), SetReadahead: setReadahead, // Can't readahead too far or the cache will thrash and drop data we @@ -324,9 +320,7 @@ func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase { func TestClientTransferVarious(t *testing.T) { // Leecher storage for _, ls := range []leecherStorageTestCase{ - {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{ - Wrapper: fileCachePieceResourceStorage, - }), 0}, + {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), 0}, {"Boltdb", storage.NewBoltDB, 0}, {"SqliteDirect", func(s string) storage.ClientImplCloser { path := filepath.Join(s, "sqlite3.db") diff --git a/torrent.go b/torrent.go index e8cf36e4..b6a6f2b7 100644 --- a/torrent.go +++ b/torrent.go @@ -760,7 +760,9 @@ func (t *Torrent) close() (err error) { func() { t.storageLock.Lock() defer t.storageLock.Unlock() - t.storage.Close() + if f := t.storage.Close; f != nil { + f() + } }() } t.iterPeers(func(p *Peer) {