]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework lots of option handling
authorMatt Joiner <anacrolix@gmail.com>
Wed, 5 May 2021 11:36:36 +0000 (21:36 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 5 May 2021 11:36:36 +0000 (21:36 +1000)
storage/mark-complete_test.go
storage/piece-resource.go
storage/sqlite/direct.go
storage/sqlite/sqlite-storage.go
storage/sqlite/sqlite-storage_test.go
storage/test/bench-piece-mark-complete.go
test/transfer_test.go

index 3f76f699644252e9cad771e84b9083b8ccd569b9..7e50832d2714dd51fdf2644a09c85ad6dfd3d861 100644 (file)
@@ -10,7 +10,7 @@ import (
 func BenchmarkMarkComplete(b *testing.B) {
        bench := func(b *testing.B, ci storage.ClientImpl) {
                test_storage.BenchmarkPieceMarkComplete(
-                       b, ci, test_storage.DefaultPieceSize, test_storage.DefaultNumPieces, test_storage.DefaultCapacity)
+                       b, ci, test_storage.DefaultPieceSize, test_storage.DefaultNumPieces, 0)
        }
        b.Run("File", func(b *testing.B) {
                ci := storage.NewFile(b.TempDir())
index 8df443896668e15459cb09a041cbcc89aed3f0e5..d56280bc5a2654806e6e6e4455d3bb584bda6ab9 100644 (file)
@@ -21,8 +21,11 @@ type piecePerResource struct {
 }
 
 type ResourcePiecesOpts struct {
+       // After marking a piece complete, don't bother deleting its incomplete blobs.
        LeaveIncompleteChunks bool
-       NoSizedPuts           bool
+       // Sized puts require being able to stream from a statement executed on another connection.
+       // Without them, we buffer the entire read and then put that.
+       NoSizedPuts bool
 }
 
 func NewResourcePieces(p PieceProvider) ClientImpl {
index f5d6179d6480157245f0876dacfafa430efa713e..9da0f701dbc1db3e3d98b67703c76b2fb2fcabde 100644 (file)
@@ -11,36 +11,41 @@ import (
 )
 
 type NewDirectStorageOpts struct {
-       NewPoolOpts
-       ProvOpts func(*ProviderOpts)
+       NewConnOpts
+       InitDbOpts
 }
 
 // A convenience function that creates a connection pool, resource provider, and a pieces storage
 // ClientImpl and returns them all with a Close attached.
 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
-       conns, provOpts, err := NewPool(opts.NewPoolOpts)
+       conn, err := newConn(opts.NewConnOpts)
        if err != nil {
                return
        }
-       if f := opts.ProvOpts; f != nil {
-               f(&provOpts)
+       journalMode := "delete"
+       if opts.Memory {
+               journalMode = "off"
        }
-       provOpts.BatchWrites = false
-       prov, err := NewProvider(conns, provOpts)
+       err = initConn(conn, InitConnOpts{
+               SetJournalMode: journalMode,
+               MmapSizeOk:     true,
+               MmapSize:       1 << 25,
+       })
+       if err != nil {
+               return
+       }
+       err = initDatabase(conn, opts.InitDbOpts)
        if err != nil {
-               conns.Close()
                return
        }
        return &client{
-               prov:  prov,
-               conn:  prov.pool.Get(nil),
+               conn:  conn,
                blobs: make(map[string]*sqlite.Blob),
        }, nil
 }
 
 type client struct {
        l     sync.Mutex
-       prov  *provider
        conn  conn
        blobs map[string]*sqlite.Blob
 }
@@ -53,8 +58,7 @@ func (c *client) Close() error {
        for _, b := range c.blobs {
                b.Close()
        }
-       c.prov.pool.Put(c.conn)
-       return c.prov.Close()
+       return c.conn.Close()
 }
 
 type torrent struct {
@@ -106,6 +110,7 @@ func (p2 piece) doAtIoWithBlob(
 ) (n int, err error) {
        p2.l.Lock()
        defer p2.l.Unlock()
+       //defer p2.blobWouldExpire()
        n, err = atIo(p2.getBlob())(p, off)
        var se sqlite.Error
        if !errors.As(err, &se) || se.Code != sqlite.SQLITE_ABORT {
index 49ea4d7495c8289c19907690096dbccc24fcfbe5..8c569314e2f35180537de5f5252061821e322d34 100644 (file)
@@ -25,7 +25,20 @@ import (
 
 type conn = *sqlite.Conn
 
-func initConn(conn conn, opts ProviderOpts) error {
+type InitConnOpts struct {
+       SetJournalMode string
+       MmapSizeOk     bool  // If false, a package-specific default will be used.
+       MmapSize       int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
+}
+
+func (me InitConnOpts) JournalMode() string {
+       if me.SetJournalMode != "" {
+               return me.SetJournalMode
+       }
+       return "wal"
+}
+
+func initConn(conn conn, opts InitConnOpts) error {
        // Recursive triggers are required because we need to trim the blob_meta size after trimming to
        // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
        err := sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
@@ -36,13 +49,22 @@ func initConn(conn conn, opts ProviderOpts) error {
        if err != nil {
                return err
        }
-       if opts.NoConcurrentBlobReads {
-               err = sqlitex.ExecTransient(conn, `pragma journal_mode=off`, nil)
+       if opts.SetJournalMode != "" {
+               err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
+                       ret := stmt.ColumnText(0)
+                       if ret != opts.SetJournalMode {
+                               panic(ret)
+                       }
+                       return nil
+               })
                if err != nil {
                        return err
                }
        }
-       if opts.MmapSizeOk {
+       if !opts.MmapSizeOk {
+               opts.MmapSize = 1 << 24 // 8 MiB
+       }
+       if opts.MmapSize >= 0 {
                err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
                if err != nil {
                        return err
@@ -152,6 +174,7 @@ func InitSchema(conn conn, pageSize int, triggers bool) error {
 
 type NewPiecesStorageOpts struct {
        NewPoolOpts
+       InitDbOpts
        ProvOpts    func(*ProviderOpts)
        StorageOpts func(*storage.ResourcePiecesOpts)
 }
@@ -159,10 +182,17 @@ type NewPiecesStorageOpts struct {
 // A convenience function that creates a connection pool, resource provider, and a pieces storage
 // ClientImpl and returns them all with a Close attached.
 func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
-       conns, provOpts, err := NewPool(opts.NewPoolOpts)
+       conns, err := NewPool(opts.NewPoolOpts)
+       if err != nil {
+               return
+       }
+       err = initPoolDatabase(conns, opts.InitDbOpts)
        if err != nil {
                return
        }
+       provOpts := ProviderOpts{
+               BatchWrites: conns.NumConns() > 1,
+       }
        if f := opts.ProvOpts; f != nil {
                f(&provOpts)
        }
@@ -171,8 +201,27 @@ func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, er
                conns.Close()
                return
        }
+       var (
+               journalMode string
+       )
+       withPoolConn(conns, func(c conn) {
+               err = sqlitex.Exec(c, "pragma journal_mode", func(stmt *sqlite.Stmt) error {
+                       journalMode = stmt.ColumnText(0)
+                       return nil
+               })
+       })
+       if err != nil {
+               err = fmt.Errorf("getting journal mode: %w", err)
+               prov.Close()
+               return
+       }
+       if journalMode == "" {
+               err = errors.New("didn't get journal mode")
+               prov.Close()
+               return
+       }
        storageOpts := storage.ResourcePiecesOpts{
-               NoSizedPuts: provOpts.NoConcurrentBlobReads,
+               NoSizedPuts: journalMode != "wal" || conns.NumConns() == 1,
        }
        if f := opts.StorageOpts; f != nil {
                f(&storageOpts)
@@ -188,16 +237,14 @@ func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, er
 }
 
 type NewPoolOpts struct {
-       // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
-       // private, temporary on-disk database will be created. This private database will be
-       // automatically deleted as soon as the database connection is closed."
-       Path     string
-       Memory   bool
+       NewConnOpts
+       InitConnOpts
        NumConns int
-       // Forces WAL, disables shared caching.
-       NoConcurrentBlobReads bool
-       DontInitSchema        bool
-       PageSize              int
+}
+
+type InitDbOpts struct {
+       DontInitSchema bool
+       PageSize       int
        // If non-zero, overrides the existing setting.
        Capacity int64
 }
@@ -205,13 +252,9 @@ type NewPoolOpts struct {
 // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
 // the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
 // top-level option type.
-type ProviderOpts struct {
-       NumConns int
-       // Concurrent blob reads require WAL.
-       NoConcurrentBlobReads bool
-       BatchWrites           bool
-       MmapSize              int64
-       MmapSizeOk            bool
+type PoolConf struct {
+       NumConns    int
+       JournalMode string
 }
 
 // Remove any capacity limits.
@@ -224,10 +267,18 @@ func SetCapacity(conn conn, cap int64) error {
        return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
 }
 
-func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
-       if opts.NumConns == 0 {
-               opts.NumConns = runtime.NumCPU()
-       }
+type NewConnOpts struct {
+       // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
+       // private, temporary on-disk database will be created. This private database will be
+       // automatically deleted as soon as the database connection is closed."
+       Path   string
+       Memory bool
+       // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
+       // and NumConns < 2.
+       NoConcurrentBlobReads bool
+}
+
+func newOpenUri(opts NewConnOpts) string {
        path := url.PathEscape(opts.Path)
        if opts.Memory {
                path = ":memory:"
@@ -236,26 +287,10 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
        if opts.NoConcurrentBlobReads || opts.Memory {
                values.Add("cache", "shared")
        }
-       uri := fmt.Sprintf("file:%s?%s", path, values.Encode())
-       conns, err := func() (ConnPool, error) {
-               switch opts.NumConns {
-               case 1:
-                       conn, err := sqlite.OpenConn(uri, 0)
-                       return &poolFromConn{conn: conn}, err
-               default:
-                       return sqlitex.Open(uri, 0, opts.NumConns)
-               }
-       }()
-       if err != nil {
-               return
-       }
-       defer func() {
-               if err != nil {
-                       conns.Close()
-               }
-       }()
-       conn := conns.Get(context.TODO())
-       defer conns.Put(conn)
+       return fmt.Sprintf("file:%s?%s", path, values.Encode())
+}
+
+func initDatabase(conn conn, opts InitDbOpts) (err error) {
        if !opts.DontInitSchema {
                if opts.PageSize == 0 {
                        opts.PageSize = 1 << 14
@@ -271,13 +306,55 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
                        return
                }
        }
-       return conns, ProviderOpts{
-               NumConns:              opts.NumConns,
-               NoConcurrentBlobReads: opts.NoConcurrentBlobReads || opts.Memory || opts.NumConns == 1,
-               BatchWrites:           opts.NumConns > 1,
-               MmapSize:              1 << 23, // 8 MiB
-               MmapSizeOk:            true,
-       }, nil
+       return
+}
+
+func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
+       withPoolConn(pool, func(c conn) {
+               err = initDatabase(c, opts)
+       })
+       return
+}
+
+func newConn(opts NewConnOpts) (conn, error) {
+       return sqlite.OpenConn(newOpenUri(opts), 0)
+}
+
+type poolWithNumConns struct {
+       *sqlitex.Pool
+       numConns int
+}
+
+func (me poolWithNumConns) NumConns() int {
+       return me.numConns
+}
+
+func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
+       if opts.NumConns == 0 {
+               opts.NumConns = runtime.NumCPU()
+       }
+       conns, err := func() (ConnPool, error) {
+               switch opts.NumConns {
+               case 1:
+                       conn, err := newConn(opts.NewConnOpts)
+                       return &poolFromConn{conn: conn}, err
+               default:
+                       _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), 0, opts.NumConns)
+                       return poolWithNumConns{_pool, opts.NumConns}, err
+               }
+       }()
+       if err != nil {
+               return
+       }
+       defer func() {
+               if err != nil {
+                       conns.Close()
+               }
+       }()
+       return conns, initPoolConns(nil, conns, InitPoolOpts{
+               NumConns:     opts.NumConns,
+               InitConnOpts: opts.InitConnOpts,
+       })
 }
 
 // Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
@@ -302,20 +379,17 @@ func (me *poolFromConn) Close() error {
        return me.conn.Close()
 }
 
+func (poolFromConn) NumConns() int { return 1 }
+
+type ProviderOpts struct {
+       BatchWrites bool
+}
+
 // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
 // the ConnPool (since it has to initialize all the connections anyway).
 func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
-       _, err = initPoolConns(context.TODO(), pool, opts)
-       if err != nil {
-               err = fmt.Errorf("initing pool conns: %w", err)
-               return
-       }
        prov := &provider{pool: pool, opts: opts}
        if opts.BatchWrites {
-               if opts.NumConns < 2 {
-                       err = errors.New("batch writes requires more than 1 conn")
-                       return
-               }
                writes := make(chan writeRequest)
                prov.writes = writes
                // This is retained for backwards compatibility. It may not be necessary.
@@ -327,7 +401,12 @@ func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
        return prov, nil
 }
 
-func initPoolConns(ctx context.Context, pool ConnPool, opts ProviderOpts) (numInited int, err error) {
+type InitPoolOpts struct {
+       NumConns int
+       InitConnOpts
+}
+
+func initPoolConns(ctx context.Context, pool ConnPool, opts InitPoolOpts) (err error) {
        var conns []conn
        defer func() {
                for _, c := range conns {
@@ -340,12 +419,11 @@ func initPoolConns(ctx context.Context, pool ConnPool, opts ProviderOpts) (numIn
                        break
                }
                conns = append(conns, conn)
-               err = initConn(conn, opts)
+               err = initConn(conn, opts.InitConnOpts)
                if err != nil {
                        err = fmt.Errorf("initing conn %v: %w", len(conns), err)
                        return
                }
-               numInited++
        }
        return
 }
@@ -354,6 +432,13 @@ type ConnPool interface {
        Get(context.Context) conn
        Put(conn)
        Close() error
+       NumConns() int
+}
+
+func withPoolConn(pool ConnPool, with func(conn)) {
+       c := pool.Get(nil)
+       defer pool.Put(c)
+       with(c)
 }
 
 type provider struct {
index d18369ed6316cdd6e564cfff511029f2c6f41045..f2e7ad290bbe3fd0184e104335d3b022bc1a0bf8 100644 (file)
@@ -19,14 +19,15 @@ import (
 
 func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
        opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
-       conns, provOpts, err := NewPool(opts)
-       require.NoError(t, err)
+       pool, err := NewPool(opts)
+       qt.Assert(t, err, qt.IsNil)
        // sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now.
-       //t.Cleanup(func() { conns.Close() })
-       prov, err := NewProvider(conns, provOpts)
+       //t.Cleanup(func() { pool.Close() })
+       qt.Assert(t, initPoolDatabase(pool, InitDbOpts{}), qt.IsNil)
+       prov, err := NewProvider(pool, ProviderOpts{BatchWrites: pool.NumConns() > 1})
        require.NoError(t, err)
        t.Cleanup(func() { prov.Close() })
-       return conns, prov
+       return pool, prov
 }
 
 func TestTextBlobSize(t *testing.T) {
@@ -70,52 +71,40 @@ func TestSimultaneousIncrementalBlob(t *testing.T) {
 func BenchmarkMarkComplete(b *testing.B) {
        const pieceSize = test_storage.DefaultPieceSize
        const capacity = test_storage.DefaultNumPieces * pieceSize / 2
+       runBench := func(b *testing.B, ci storage.ClientImpl) {
+               test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
+       }
        c := qt.New(b)
-       for _, storage := range []struct {
-               name  string
-               maker func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser
-       }{
-               {"SqliteDirect", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser {
-                       ci, err := NewDirectStorage(NewDirectStorageOpts{
-                               NewPoolOpts: newPoolOpts,
-                               ProvOpts:    provOpts,
+       for _, memory := range []bool{false, true} {
+               b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
+                       b.Run("Direct", func(b *testing.B) {
+                               var opts NewDirectStorageOpts
+                               opts.Memory = memory
+                               opts.Path = filepath.Join(b.TempDir(), "storage.db")
+                               opts.Capacity = capacity
+                               ci, err := NewDirectStorage(opts)
+                               c.Assert(err, qt.IsNil)
+                               defer ci.Close()
+                               runBench(b, ci)
                        })
-                       c.Assert(err, qt.IsNil)
-                       return ci
-               }},
-               {"SqlitePieceStorage", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser {
-                       ci, err := NewPiecesStorage(NewPiecesStorageOpts{
-                               NewPoolOpts: newPoolOpts,
-                               ProvOpts:    provOpts,
+                       b.Run("ResourcePieces", func(b *testing.B) {
+                               for _, batchWrites := range []bool{false, true} {
+                                       b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
+                                               var opts NewPiecesStorageOpts
+                                               opts.Path = filepath.Join(b.TempDir(), "storage.db")
+                                               //b.Logf("storage db path: %q", dbPath)
+                                               opts.Capacity = capacity
+                                               opts.Memory = memory
+                                               opts.ProvOpts = func(opts *ProviderOpts) {
+                                                       opts.BatchWrites = batchWrites
+                                               }
+                                               ci, err := NewPiecesStorage(opts)
+                                               c.Assert(err, qt.IsNil)
+                                               defer ci.Close()
+                                               runBench(b, ci)
+                                       })
+                               }
                        })
-                       c.Assert(err, qt.IsNil)
-                       return ci
-               }},
-       } {
-               b.Run(storage.name, func(b *testing.B) {
-                       for _, memory := range []bool{false, true} {
-                               b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
-                                       for _, batchWrites := range []bool{false, true} {
-                                               b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
-                                                       dbPath := filepath.Join(b.TempDir(), "storage.db")
-                                                       //b.Logf("storage db path: %q", dbPath)
-                                                       newPoolOpts := NewPoolOpts{
-                                                               Path:                  dbPath,
-                                                               Capacity:              capacity,
-                                                               NoConcurrentBlobReads: false,
-                                                               PageSize:              1 << 14,
-                                                               Memory:                memory,
-                                                       }
-                                                       provOpts := func(opts *ProviderOpts) {
-                                                               opts.BatchWrites = batchWrites
-                                                       }
-                                                       ci := storage.maker(newPoolOpts, provOpts)
-                                                       defer ci.Close()
-                                                       test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
-                                               })
-                                       }
-                               })
-                       }
                })
        }
 }
index 9ebf6ef749edca49ac6b1c3722f6bef23cfd518d..6e04702bc2c448d07b1f8384094b3fb691b513c9 100644 (file)
@@ -15,7 +15,6 @@ import (
 const (
        ChunkSize        = 1 << 14
        DefaultPieceSize = 2 << 20
-       DefaultCapacity  = 0
        DefaultNumPieces = 16
 )
 
index 0ae10060d63c5ba7d9c815a22c4ad108286f8366..e58bb53bf056365071dc7e552c016dc2b549aa54 100644 (file)
@@ -4,7 +4,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "os"
        "path/filepath"
        "runtime"
@@ -111,7 +110,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
                cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
        }
        cfg.Seed = false
-       cfg.Debug = true
+       //cfg.Debug = true
        if ps.ConfigureLeecher.Config != nil {
                ps.ConfigureLeecher.Config(cfg)
        }
@@ -313,13 +312,10 @@ type leecherStorageTestCase struct {
 func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase {
        return leecherStorageTestCase{
                fmt.Sprintf("SqliteFile,NumConns=%v", numConns),
-               sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts {
-                       return sqliteStorage.NewPiecesStorageOpts{
-                               NewPoolOpts: sqliteStorage.NewPoolOpts{
-                                       Path:     filepath.Join(dataDir, "sqlite.db"),
-                                       NumConns: numConns,
-                               },
-                       }
+               sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) {
+                       opts.Path = filepath.Join(dataDir, "sqlite.db")
+                       opts.NumConns = numConns
+                       return
                }),
                numConns,
        }
@@ -334,13 +330,9 @@ func TestClientTransferVarious(t *testing.T) {
                {"Boltdb", storage.NewBoltDB, 0},
                {"SqliteDirect", func(s string) storage.ClientImplCloser {
                        path := filepath.Join(s, "sqlite3.db")
-                       log.Print(path)
-                       cl, err := sqliteStorage.NewDirectStorage(sqliteStorage.NewDirectStorageOpts{
-                               NewPoolOpts: sqliteStorage.NewPoolOpts{
-                                       Path: path,
-                               },
-                               ProvOpts: nil,
-                       })
+                       var opts sqliteStorage.NewDirectStorageOpts
+                       opts.Path = path
+                       cl, err := sqliteStorage.NewDirectStorage(opts)
                        if err != nil {
                                panic(err)
                        }
@@ -350,12 +342,9 @@ func TestClientTransferVarious(t *testing.T) {
                sqliteLeecherStorageTestCase(2),
                // This should use a number of connections equal to the number of CPUs
                sqliteLeecherStorageTestCase(0),
-               {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts {
-                       return sqliteStorage.NewPiecesStorageOpts{
-                               NewPoolOpts: sqliteStorage.NewPoolOpts{
-                                       Memory: true,
-                               },
-                       }
+               {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) {
+                       opts.Memory = true
+                       return
                }), 0},
        } {
                t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {