]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Performance fiddling on sqlite storage
authorMatt Joiner <anacrolix@gmail.com>
Tue, 27 Oct 2020 06:07:49 +0000 (17:07 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 27 Oct 2020 06:07:49 +0000 (17:07 +1100)
storage/sqlite/sqlite-storage.go
storage/sqlite/sqlite-storage_test.go
test/transfer_test.go

index c601ec0f7167808b70d8843f512ed06af623d463..00c56362aae244299e8001a1164a3a7f50cbfe3c 100644 (file)
@@ -6,6 +6,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "log"
        "os"
        "sync"
        "time"
@@ -18,8 +19,22 @@ import (
 
 type conn = *sqlite.Conn
 
-func initConn(conn conn) error {
-       return sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
+func initConn(conn conn, wal bool) error {
+       err := sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
+       if err != nil {
+               return err
+       }
+       if !wal {
+               err = sqlitex.ExecTransient(conn, `pragma journal_mode=off`, nil)
+               if err != nil {
+                       return err
+               }
+       }
+       err = sqlitex.ExecTransient(conn, `pragma mmap_size=1000000000`, nil)
+       if err != nil {
+               return err
+       }
+       return nil
 }
 
 func initSchema(conn conn) error {
@@ -81,7 +96,7 @@ func (me *poolFromConn) Put(conn conn) {
 }
 
 func NewProvider(conn *sqlite.Conn) (_ *provider, err error) {
-       err = initConn(conn)
+       err = initConn(conn, false)
        if err != nil {
                return
        }
@@ -89,8 +104,9 @@ func NewProvider(conn *sqlite.Conn) (_ *provider, err error) {
        return &provider{&poolFromConn{conn: conn}}, err
 }
 
-func NewProviderPool(pool *sqlitex.Pool, numConns int) (_ *provider, err error) {
-       _, err = initPoolConns(context.TODO(), pool, numConns)
+// Needs the pool size so it can initialize all the connections with pragmas.
+func NewProviderPool(pool *sqlitex.Pool, numConns int, wal bool) (_ *provider, err error) {
+       _, err = initPoolConns(context.TODO(), pool, numConns, wal)
        if err != nil {
                return
        }
@@ -100,7 +116,7 @@ func NewProviderPool(pool *sqlitex.Pool, numConns int) (_ *provider, err error)
        return &provider{pool: pool}, err
 }
 
-func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int) (numInited int, err error) {
+func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int, wal bool) (numInited int, err error) {
        var conns []conn
        defer func() {
                for _, c := range conns {
@@ -113,7 +129,7 @@ func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int) (numIni
                        break
                }
                conns = append(conns, conn)
-               err = initConn(conn)
+               err = initConn(conn, wal)
                if err != nil {
                        err = fmt.Errorf("initing conn %v: %w", len(conns), err)
                        return
@@ -143,6 +159,13 @@ type instance struct {
 
 func (i instance) withConn(with func(conn conn)) {
        conn := i.p.pool.Get(context.TODO())
+       //err := sqlitex.Exec(conn, "pragma synchronous", func(stmt *sqlite.Stmt) error {
+       //      log.Print(stmt.ColumnText(0))
+       //      return nil
+       //})
+       //if err != nil {
+       //      log.Print(err)
+       //}
        defer i.p.pool.Put(conn)
        with(conn)
 }
@@ -238,7 +261,15 @@ func (i instance) Put(reader io.Reader) (err error) {
                return err
        }
        i.withConn(func(conn conn) {
-               err = sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, ?)", nil, i.location, buf.Bytes())
+               for range iter.N(10) {
+                       err = sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, ?)", nil, i.location, buf.Bytes())
+                       if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
+                               log.Print("sqlite busy")
+                               time.Sleep(time.Second)
+                               continue
+                       }
+                       break
+               }
        })
        return
 }
index 6945069d06b8a9ef106c7f717dea1af47fe0aa3b..71d5540a437e08514dbf4f97065eab78633d0454 100644 (file)
@@ -9,6 +9,7 @@ import (
        "sync"
        "testing"
 
+       "crawshaw.io/sqlite"
        "crawshaw.io/sqlite/sqlitex"
        _ "github.com/anacrolix/envpprof"
        "github.com/stretchr/testify/assert"
@@ -16,15 +17,17 @@ import (
 )
 
 func TestSimultaneousIncrementalBlob(t *testing.T) {
+       const poolSize = 10
        pool, err := sqlitex.Open(
                // We don't do this in memory, because it seems to have some locking issues with updating
                // last_used.
                fmt.Sprintf("file:%s", filepath.Join(t.TempDir(), "sqlite3.db")),
-               0,
-               10)
+               // We can't disable WAL in this test because then we can't open 2 blobs simultaneously for read.
+               sqlite.OpenFlagsDefault, /* &^sqlite.SQLITE_OPEN_WAL */
+               poolSize)
        require.NoError(t, err)
        defer pool.Close()
-       p, err := NewProviderPool(pool, 10)
+       p, err := NewProviderPool(pool, poolSize, true)
        require.NoError(t, err)
        a, err := p.NewInstance("a")
        require.NoError(t, err)
index 7462a547c53b53fa3af380373c4da99447c17750..25d03b4986ccf989be4b0845c24a7789a830155f 100644 (file)
@@ -12,6 +12,7 @@ import (
        "time"
 
        "crawshaw.io/sqlite"
+       "crawshaw.io/sqlite/sqlitex"
        "github.com/anacrolix/missinggo/v2/filecache"
        "github.com/anacrolix/torrent"
        "github.com/anacrolix/torrent/internal/testutil"
@@ -275,22 +276,42 @@ func sqliteClientStorageFactory(connPathMaker func(dataDir string) string) stora
        return func(dataDir string) storage.ClientImplCloser {
                path := connPathMaker(dataDir)
                log.Printf("opening sqlite db at %q", path)
-               conn, err := sqlite.OpenConn(path, 0)
-               if err != nil {
-                       panic(err)
-               }
-               prov, err := sqliteStorage.NewProvider(conn)
-               if err != nil {
-                       panic(err)
-               }
-               return struct {
-                       storage.ClientImpl
-                       io.Closer
-               }{
-                       storage.NewResourcePieces(prov),
-                       conn,
+               if true {
+                       conn, err := sqlite.OpenConn(path, 0)
+                       if err != nil {
+                               panic(err)
+                       }
+                       prov, err := sqliteStorage.NewProvider(conn)
+                       if err != nil {
+                               panic(err)
+                       }
+                       return struct {
+                               storage.ClientImpl
+                               io.Closer
+                       }{
+                               storage.NewResourcePieces(prov),
+                               conn,
+                       }
+               } else {
+                       // Test pool implementation for SQLITE_BUSY when we want SQLITE_LOCKED (so the
+                       // crawshaw.io/sqlite unlock notify handler kicks in for us).
+                       const poolSize = 1
+                       pool, err := sqlitex.Open(path, 0, poolSize)
+                       if err != nil {
+                               panic(err)
+                       }
+                       prov, err := sqliteStorage.NewProviderPool(pool, poolSize, false)
+                       if err != nil {
+                               panic(err)
+                       }
+                       return struct {
+                               storage.ClientImpl
+                               io.Closer
+                       }{
+                               storage.NewResourcePieces(prov),
+                               pool,
+                       }
                }
-
        }
 }
 
@@ -308,7 +329,7 @@ func TestClientTransferVarious(t *testing.T) {
                        return "file:" + filepath.Join(dataDir, "sqlite.db")
                })},
                {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) string {
-                       return "file:memory:?mode=memory"
+                       return "file:memory:?mode=memory&cache=shared"
                })},
        } {
                t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {