]> Sergey Matveev's repositories - btrtrc.git/commitdiff
sqlite storage: Provide helpers and reasonable defaults
authorMatt Joiner <anacrolix@gmail.com>
Fri, 30 Oct 2020 01:20:54 +0000 (12:20 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 30 Oct 2020 01:20:54 +0000 (12:20 +1100)
storage/sqlite/sqlite-storage.go
storage/sqlite/sqlite-storage_test.go
test/transfer_test.go

index fc419c290b11b84fe7aba6aabc22f3f4a0ef6a72..771c3452cf23ae2faedf833c67472f17b625a600 100644 (file)
@@ -7,7 +7,9 @@ import (
        "fmt"
        "io"
        "log"
+       "net/url"
        "os"
+       "runtime"
        "sync"
        "time"
 
@@ -15,6 +17,7 @@ import (
        "crawshaw.io/sqlite/sqlitex"
        "github.com/anacrolix/missinggo/iter"
        "github.com/anacrolix/missinggo/v2/resource"
+       "github.com/anacrolix/torrent/storage"
 )
 
 type conn = *sqlite.Conn
@@ -77,7 +80,75 @@ end;
 `)
 }
 
-// Emulates a pool from a single Conn.
+// A convenience function that creates a connection pool, resource provider, and a pieces storage
+// ClientImpl and returns them all with a Close attached.
+func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) {
+       conns, provOpts, err := NewPool(opts)
+       if err != nil {
+               return
+       }
+       prov, err := NewProvider(conns, provOpts)
+       if err != nil {
+               return
+       }
+       store := storage.NewResourcePieces(prov)
+       return struct {
+               storage.ClientImpl
+               io.Closer
+       }{
+               store,
+               conns,
+       }, nil
+}
+
+type NewPoolOpts struct {
+       Path     string
+       Memory   bool
+       NumConns int
+       // Forces WAL, disables shared caching.
+       ConcurrentBlobReads bool
+}
+
+// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
+// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
+// top-level option type.
+type ProviderOpts struct {
+       NumConns int
+       // Concurrent blob reads require WAL.
+       ConcurrentBlobRead bool
+}
+
+func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
+       if opts.NumConns == 0 {
+               opts.NumConns = runtime.NumCPU()
+       }
+       if opts.Memory {
+               opts.Path = ":memory:"
+       }
+       values := make(url.Values)
+       if !opts.ConcurrentBlobReads {
+               values.Add("cache", "shared")
+       }
+       path := fmt.Sprintf("file:%s?%s", opts.Path, values.Encode())
+       conns, err := func() (ConnPool, error) {
+               switch opts.NumConns {
+               case 1:
+                       conn, err := sqlite.OpenConn(fmt.Sprintf("file:%s", opts.Path), 0)
+                       return &poolFromConn{conn: conn}, err
+               default:
+                       return sqlitex.Open(path, 0, opts.NumConns)
+               }
+       }()
+       if err != nil {
+               return
+       }
+       return conns, ProviderOpts{
+               NumConns:           opts.NumConns,
+               ConcurrentBlobRead: opts.ConcurrentBlobReads,
+       }, nil
+}
+
+// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
 type poolFromConn struct {
        mu   sync.Mutex
        conn conn
@@ -95,18 +166,13 @@ func (me *poolFromConn) Put(conn conn) {
        me.mu.Unlock()
 }
 
-func NewProvider(conn *sqlite.Conn) (_ *provider, err error) {
-       err = initConn(conn, false)
-       if err != nil {
-               return
-       }
-       err = initSchema(conn)
-       return &provider{&poolFromConn{conn: conn}}, err
+func (me *poolFromConn) Close() error {
+       return me.conn.Close()
 }
 
-// Needs the pool size so it can initialize all the connections with pragmas.
-func NewProviderPool(pool *sqlitex.Pool, numConns int, wal bool) (_ *provider, err error) {
-       _, err = initPoolConns(context.TODO(), pool, numConns, wal)
+// Needs the ConnPool size so it can initialize all the connections with pragmas.
+func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
+       _, err = initPoolConns(context.TODO(), pool, opts.NumConns, opts.ConcurrentBlobRead)
        if err != nil {
                return
        }
@@ -116,7 +182,7 @@ func NewProviderPool(pool *sqlitex.Pool, numConns int, wal bool) (_ *provider, e
        return &provider{pool: pool}, err
 }
 
-func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int, wal bool) (numInited int, err error) {
+func initPoolConns(ctx context.Context, pool ConnPool, numConn int, wal bool) (numInited int, err error) {
        var conns []conn
        defer func() {
                for _, c := range conns {
@@ -139,13 +205,14 @@ func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int, wal boo
        return
 }
 
-type pool interface {
+type ConnPool interface {
        Get(context.Context) conn
        Put(conn)
+       Close() error
 }
 
 type provider struct {
-       pool pool
+       pool ConnPool
 }
 
 func (p *provider) NewInstance(s string) (resource.Instance, error) {
index 71d5540a437e08514dbf4f97065eab78633d0454..2a41373989ed10268eed3a1ff07dfd2fca9d3f5d 100644 (file)
@@ -2,33 +2,40 @@ package sqliteStorage
 
 import (
        "bytes"
-       "fmt"
        "io"
        "io/ioutil"
        "path/filepath"
        "sync"
        "testing"
 
-       "crawshaw.io/sqlite"
-       "crawshaw.io/sqlite/sqlitex"
        _ "github.com/anacrolix/envpprof"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 )
 
-func TestSimultaneousIncrementalBlob(t *testing.T) {
-       const poolSize = 10
-       pool, err := sqlitex.Open(
-               // We don't do this in memory, because it seems to have some locking issues with updating
-               // last_used.
-               fmt.Sprintf("file:%s", filepath.Join(t.TempDir(), "sqlite3.db")),
-               // We can't disable WAL in this test because then we can't open 2 blobs simultaneously for read.
-               sqlite.OpenFlagsDefault, /* &^sqlite.SQLITE_OPEN_WAL */
-               poolSize)
+func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
+       opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
+       conns, provOpts, err := NewPool(opts)
        require.NoError(t, err)
-       defer pool.Close()
-       p, err := NewProviderPool(pool, poolSize, true)
+       t.Cleanup(func() { conns.Close() })
+       prov, err := NewProvider(conns, provOpts)
        require.NoError(t, err)
+       return conns, prov
+}
+
+func TestTextBlobSize(t *testing.T) {
+       _, prov := newConnsAndProv(t, NewPoolOpts{})
+       a, _ := prov.NewInstance("a")
+       a.Put(bytes.NewBufferString("\x00hello"))
+       fi, _ := a.Stat()
+       assert.EqualValues(t, 6, fi.Size())
+}
+
+func TestSimultaneousIncrementalBlob(t *testing.T) {
+       _, p := newConnsAndProv(t, NewPoolOpts{
+               NumConns:            2,
+               ConcurrentBlobReads: true,
+       })
        a, err := p.NewInstance("a")
        require.NoError(t, err)
        const contents = "hello, world"
index 25d03b4986ccf989be4b0845c24a7789a830155f..6c955b363af8274e4348cbb21422b36bb62baa7a 100644 (file)
@@ -11,8 +11,6 @@ import (
        "testing"
        "time"
 
-       "crawshaw.io/sqlite"
-       "crawshaw.io/sqlite/sqlitex"
        "github.com/anacrolix/missinggo/v2/filecache"
        "github.com/anacrolix/torrent"
        "github.com/anacrolix/torrent/internal/testutil"
@@ -272,46 +270,15 @@ func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
        testClientTransferSmallCache(t, false, -1)
 }
 
-func sqliteClientStorageFactory(connPathMaker func(dataDir string) string) storageFactory {
+func sqliteClientStorageFactory(connOptsMaker func(dataDir string) sqliteStorage.NewPoolOpts) storageFactory {
        return func(dataDir string) storage.ClientImplCloser {
-               path := connPathMaker(dataDir)
-               log.Printf("opening sqlite db at %q", path)
-               if true {
-                       conn, err := sqlite.OpenConn(path, 0)
-                       if err != nil {
-                               panic(err)
-                       }
-                       prov, err := sqliteStorage.NewProvider(conn)
-                       if err != nil {
-                               panic(err)
-                       }
-                       return struct {
-                               storage.ClientImpl
-                               io.Closer
-                       }{
-                               storage.NewResourcePieces(prov),
-                               conn,
-                       }
-               } else {
-                       // Test pool implementation for SQLITE_BUSY when we want SQLITE_LOCKED (so the
-                       // crawshaw.io/sqlite unlock notify handler kicks in for us).
-                       const poolSize = 1
-                       pool, err := sqlitex.Open(path, 0, poolSize)
-                       if err != nil {
-                               panic(err)
-                       }
-                       prov, err := sqliteStorage.NewProviderPool(pool, poolSize, false)
-                       if err != nil {
-                               panic(err)
-                       }
-                       return struct {
-                               storage.ClientImpl
-                               io.Closer
-                       }{
-                               storage.NewResourcePieces(prov),
-                               pool,
-                       }
+               connOpts := connOptsMaker(dataDir)
+               log.Printf("opening sqlite db: %#v", connOpts)
+               ret, err := sqliteStorage.NewPiecesStorage(connOpts)
+               if err != nil {
+                       panic(err)
                }
+               return ret
        }
 }
 
@@ -325,11 +292,15 @@ func TestClientTransferVarious(t *testing.T) {
                        Wrapper: fileCachePieceResourceStorage,
                })},
                {"Boltdb", storage.NewBoltDB},
-               {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) string {
-                       return "file:" + filepath.Join(dataDir, "sqlite.db")
+               {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts {
+                       return sqliteStorage.NewPoolOpts{
+                               Path: filepath.Join(dataDir, "sqlite.db"),
+                       }
                })},
-               {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) string {
-                       return "file:memory:?mode=memory&cache=shared"
+               {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts {
+                       return sqliteStorage.NewPoolOpts{
+                               Memory: true,
+                       }
                })},
        } {
                t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {