var _ storage.ClientImpl = badStorage{}
func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
- return bs, nil
-}
-
-func (bs badStorage) Close() error {
- return nil
+ return storage.TorrentImpl{
+ Piece: bs.Piece,
+ }, nil
}
func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
ts := &torrentStorage{}
t := &Torrent{
cl: cl,
- storage: &storage.Torrent{TorrentImpl: ts},
+ storage: &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}},
pieceStateChanges: pubsub.NewPubSub(),
}
require.NoError(b, t.setInfo(&metainfo.Info{
}
func (me *boltClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
- return &boltTorrent{me, infoHash}, nil
+ t := &boltTorrent{me, infoHash}
+ return TorrentImpl{
+ Piece: t.Piece,
+ Close: t.Close,
+ }, nil
}
func (me *boltTorrent) Piece(p metainfo.Piece) PieceImpl {
return me.pc.Close()
}
-func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
+func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
dir := fs.pathMaker(fs.baseDir, info, infoHash)
upvertedFiles := info.UpvertedFiles()
files := make([]file, 0, len(upvertedFiles))
for i, fileInfo := range upvertedFiles {
- s, err := ToSafeFilePath(append([]string{info.Name}, fileInfo.Path...)...)
+ var s string
+ s, err = ToSafeFilePath(append([]string{info.Name}, fileInfo.Path...)...)
if err != nil {
- return nil, fmt.Errorf("file %v has unsafe path %q: %w", i, fileInfo.Path, err)
+ err = fmt.Errorf("file %v has unsafe path %q: %w", i, fileInfo.Path, err)
+ return
}
f := file{
path: filepath.Join(dir, s),
if f.length == 0 {
err = CreateNativeZeroLengthFile(f.path)
if err != nil {
- return nil, fmt.Errorf("creating zero length file: %w", err)
+ err = fmt.Errorf("creating zero length file: %w", err)
+ return
}
}
files = append(files, f)
}
- return &fileTorrentImpl{
+ t := &fileTorrentImpl{
files,
segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
infoHash,
fs.pc,
+ }
+ return TorrentImpl{
+ Piece: t.Piece,
+ Close: t.Close,
}, nil
}
}
// Data storage bound to a torrent.
-type TorrentImpl interface {
- Piece(metainfo.Piece) PieceImpl
- Close() error
+type TorrentImpl struct {
+ Piece func(metainfo.Piece) PieceImpl
+ Close func() error
+ // Storages that share the same value, will provide a pointer to the same function.
+ Capacity *func() *int64
}
// Interacts with torrent piece data. Optional interfaces to implement include io.WriterTo, such as
}
}
-func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t TorrentImpl, err error) {
+func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
span, err := mMapTorrent(info, s.baseDir)
- t = &mmapTorrentStorage{
+ t := &mmapTorrentStorage{
infoHash: infoHash,
span: span,
pc: s.pc,
}
- return
+ return TorrentImpl{Piece: t.Piece, Close: t.Close}, err
}
func (s *mmapClientImpl) Close() error {
// Sized puts require being able to stream from a statement executed on another connection.
// Without them, we buffer the entire read and then put that.
NoSizedPuts bool
+ Capacity *int64
}
func NewResourcePieces(p PieceProvider) ClientImpl {
}
func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
- return piecePerResourceTorrentImpl{
+ t := piecePerResourceTorrentImpl{
s,
make([]sync.RWMutex, info.NumPieces()),
- }, nil
+ }
+ return TorrentImpl{Piece: t.Piece, Close: t.Close}, nil
}
func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
if opts.BlobFlushInterval != 0 {
cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
}
+ cl.capacity = cl.getCapacity
return cl, nil
}
+func (cl *client) getCapacity() (ret *int64) {
+ cl.l.Lock()
+ defer cl.l.Unlock()
+ err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error {
+ ret = new(int64)
+ *ret = stmt.ColumnInt64(0)
+ return nil
+ })
+ if err != nil {
+ panic(err)
+ }
+ return
+}
+
type client struct {
l sync.Mutex
conn conn
blobFlusher *time.Timer
opts NewDirectStorageOpts
closed bool
+ capacity func() *int64
}
func (c *client) blobFlusherFunc() {
}
func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
- return torrent{c}, nil
+ t := torrent{c}
+ return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil
}
func (c *client) Close() error {
}
func (d *diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
- return d, nil
+ return storage.TorrentImpl{Piece: d.Piece, Close: d.Close}, nil
}
type pieceImpl struct {
type fileCacheClientStorageFactoryParams struct {
Capacity int64
SetCapacity bool
- Wrapper func(*filecache.Cache) storage.ClientImplCloser
}
func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
if err != nil {
panic(err)
}
+ var sharedCapacity *int64
if ps.SetCapacity {
+ sharedCapacity = &ps.Capacity
fc.SetCapacity(ps.Capacity)
}
- return ps.Wrapper(fc)
+ return struct {
+ storage.ClientImpl
+ io.Closer
+ }{
+ storage.NewResourcePiecesOpts(
+ fc.AsResourceProvider(),
+ storage.ResourcePiecesOpts{
+ Capacity: sharedCapacity,
+ }),
+ ioutil.NopCloser(nil),
+ }
}
}
func TestClientTransferDefault(t *testing.T) {
testClientTransfer(t, testClientTransferParams{
- LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
- Wrapper: fileCachePieceResourceStorage,
- }),
+ LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
})
}
func TestClientTransferDefaultNoMetadata(t *testing.T) {
testClientTransfer(t, testClientTransferParams{
- LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
- Wrapper: fileCachePieceResourceStorage,
- }),
+ LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
LeecherStartsWithoutMetadata: true,
})
}
})
}
-func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
- return struct {
- storage.ClientImpl
- io.Closer
- }{
- storage.NewResourcePieces(fc.AsResourceProvider()),
- ioutil.NopCloser(nil),
- }
-}
-
func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
testClientTransfer(t, testClientTransferParams{
LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
// Going below the piece length means it can't complete a piece so
// that it can be hashed.
Capacity: 5,
- Wrapper: fileCachePieceResourceStorage,
}),
SetReadahead: setReadahead,
// Can't readahead too far or the cache will thrash and drop data we
func TestClientTransferVarious(t *testing.T) {
// Leecher storage
for _, ls := range []leecherStorageTestCase{
- {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
- Wrapper: fileCachePieceResourceStorage,
- }), 0},
+ {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), 0},
{"Boltdb", storage.NewBoltDB, 0},
{"SqliteDirect", func(s string) storage.ClientImplCloser {
path := filepath.Join(s, "sqlite3.db")
func() {
t.storageLock.Lock()
defer t.storageLock.Unlock()
- t.storage.Close()
+ if f := t.storage.Close; f != nil {
+ f()
+ }
}()
}
t.iterPeers(func(p *Peer) {