From 8e1a8440bfaa80f2eecae677c2986dc5e79f511f Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 27 Oct 2020 17:07:49 +1100 Subject: [PATCH] Performance fiddling on sqlite storage --- storage/sqlite/sqlite-storage.go | 47 ++++++++++++++++++++---- storage/sqlite/sqlite-storage_test.go | 9 +++-- test/transfer_test.go | 53 +++++++++++++++++++-------- 3 files changed, 82 insertions(+), 27 deletions(-) diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index c601ec0f..00c56362 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "log" "os" "sync" "time" @@ -18,8 +19,22 @@ import ( type conn = *sqlite.Conn -func initConn(conn conn) error { - return sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil) +func initConn(conn conn, wal bool) error { + err := sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil) + if err != nil { + return err + } + if !wal { + err = sqlitex.ExecTransient(conn, `pragma journal_mode=off`, nil) + if err != nil { + return err + } + } + err = sqlitex.ExecTransient(conn, `pragma mmap_size=1000000000`, nil) + if err != nil { + return err + } + return nil } func initSchema(conn conn) error { @@ -81,7 +96,7 @@ func (me *poolFromConn) Put(conn conn) { } func NewProvider(conn *sqlite.Conn) (_ *provider, err error) { - err = initConn(conn) + err = initConn(conn, false) if err != nil { return } @@ -89,8 +104,9 @@ func NewProvider(conn *sqlite.Conn) (_ *provider, err error) { return &provider{&poolFromConn{conn: conn}}, err } -func NewProviderPool(pool *sqlitex.Pool, numConns int) (_ *provider, err error) { - _, err = initPoolConns(context.TODO(), pool, numConns) +// Needs the pool size so it can initialize all the connections with pragmas. +func NewProviderPool(pool *sqlitex.Pool, numConns int, wal bool) (_ *provider, err error) { + _, err = initPoolConns(context.TODO(), pool, numConns, wal) if err != nil { return } @@ -100,7 +116,7 @@ func NewProviderPool(pool *sqlitex.Pool, numConns int) (_ *provider, err error) return &provider{pool: pool}, err } -func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int) (numInited int, err error) { +func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int, wal bool) (numInited int, err error) { var conns []conn defer func() { for _, c := range conns { @@ -113,7 +129,7 @@ func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int) (numIni break } conns = append(conns, conn) - err = initConn(conn) + err = initConn(conn, wal) if err != nil { err = fmt.Errorf("initing conn %v: %w", len(conns), err) return @@ -143,6 +159,13 @@ type instance struct { func (i instance) withConn(with func(conn conn)) { conn := i.p.pool.Get(context.TODO()) + //err := sqlitex.Exec(conn, "pragma synchronous", func(stmt *sqlite.Stmt) error { + // log.Print(stmt.ColumnText(0)) + // return nil + //}) + //if err != nil { + // log.Print(err) + //} defer i.p.pool.Put(conn) with(conn) } @@ -238,7 +261,15 @@ func (i instance) Put(reader io.Reader) (err error) { return err } i.withConn(func(conn conn) { - err = sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, ?)", nil, i.location, buf.Bytes()) + for range iter.N(10) { + err = sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, ?)", nil, i.location, buf.Bytes()) + if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY { + log.Print("sqlite busy") + time.Sleep(time.Second) + continue + } + break + } }) return } diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go index 6945069d..71d5540a 100644 --- a/storage/sqlite/sqlite-storage_test.go +++ b/storage/sqlite/sqlite-storage_test.go @@ -9,6 +9,7 @@ import ( "sync" "testing" + "crawshaw.io/sqlite" "crawshaw.io/sqlite/sqlitex" _ "github.com/anacrolix/envpprof" "github.com/stretchr/testify/assert" @@ -16,15 +17,17 @@ import ( ) func TestSimultaneousIncrementalBlob(t *testing.T) { + const poolSize = 10 pool, err := sqlitex.Open( // We don't do this in memory, because it seems to have some locking issues with updating // last_used. fmt.Sprintf("file:%s", filepath.Join(t.TempDir(), "sqlite3.db")), - 0, - 10) + // We can't disable WAL in this test because then we can't open 2 blobs simultaneously for read. + sqlite.OpenFlagsDefault, /* &^sqlite.SQLITE_OPEN_WAL */ + poolSize) require.NoError(t, err) defer pool.Close() - p, err := NewProviderPool(pool, 10) + p, err := NewProviderPool(pool, poolSize, true) require.NoError(t, err) a, err := p.NewInstance("a") require.NoError(t, err) diff --git a/test/transfer_test.go b/test/transfer_test.go index 7462a547..25d03b49 100644 --- a/test/transfer_test.go +++ b/test/transfer_test.go @@ -12,6 +12,7 @@ import ( "time" "crawshaw.io/sqlite" + "crawshaw.io/sqlite/sqlitex" "github.com/anacrolix/missinggo/v2/filecache" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/internal/testutil" @@ -275,22 +276,42 @@ func sqliteClientStorageFactory(connPathMaker func(dataDir string) string) stora return func(dataDir string) storage.ClientImplCloser { path := connPathMaker(dataDir) log.Printf("opening sqlite db at %q", path) - conn, err := sqlite.OpenConn(path, 0) - if err != nil { - panic(err) - } - prov, err := sqliteStorage.NewProvider(conn) - if err != nil { - panic(err) - } - return struct { - storage.ClientImpl - io.Closer - }{ - storage.NewResourcePieces(prov), - conn, + if true { + conn, err := sqlite.OpenConn(path, 0) + if err != nil { + panic(err) + } + prov, err := sqliteStorage.NewProvider(conn) + if err != nil { + panic(err) + } + return struct { + storage.ClientImpl + io.Closer + }{ + storage.NewResourcePieces(prov), + conn, + } + } else { + // Test pool implementation for SQLITE_BUSY when we want SQLITE_LOCKED (so the + // crawshaw.io/sqlite unlock notify handler kicks in for us). + const poolSize = 1 + pool, err := sqlitex.Open(path, 0, poolSize) + if err != nil { + panic(err) + } + prov, err := sqliteStorage.NewProviderPool(pool, poolSize, false) + if err != nil { + panic(err) + } + return struct { + storage.ClientImpl + io.Closer + }{ + storage.NewResourcePieces(prov), + pool, + } } - } } @@ -308,7 +329,7 @@ func TestClientTransferVarious(t *testing.T) { return "file:" + filepath.Join(dataDir, "sqlite.db") })}, {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) string { - return "file:memory:?mode=memory" + return "file:memory:?mode=memory&cache=shared" })}, } { t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) { -- 2.48.1