From e30084223d73716560d05c20b216fba7eddbbdac Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 27 Oct 2020 11:08:37 +1100 Subject: [PATCH] sqlite storage: Include capacity management --- storage/sqlite/sql | 26 ++++++++-- storage/sqlite/sqlite-storage.go | 73 +++++++++++++++++++++++++-- storage/sqlite/sqlite-storage_test.go | 2 +- 3 files changed, 92 insertions(+), 9 deletions(-) diff --git a/storage/sqlite/sql b/storage/sqlite/sql index a6f4bdce..31a76686 100644 --- a/storage/sqlite/sql +++ b/storage/sqlite/sql @@ -1,11 +1,29 @@ -with recursive excess( +pragma auto_vacuum=incremental; +create table if not exists blob( + name text, + last_used timestamp default (datetime('now')), + data blob, + primary key (name) +); + +create view if not exists deletable_blob as +with recursive excess_blob( usage_with, last_used, blob_rowid, data_length ) as ( - select * from (select (select sum(length(data)) from blob), last_used, rowid, length(data) from blob order by last_used, rowid limit 1) + select * from (select (select sum(length(data)) from blob) as usage_with, last_used, rowid, length(data) from blob order by last_used, rowid limit 1) + where usage_with >= (select value from setting where name='capacity') union all - select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess join blob + select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess_blob join blob on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid)) -) select * from excess limit 10; + where usage_with >= (select value from setting where name='capacity') +) select * from excess; + +CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin + delete from blob where rowid in (select blob_rowid from deletable_blob); +end; +CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin + delete from blob where rowid in (select blob_rowid from deletable_blob); +end; diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index 2674b4d9..c601ec0f 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -12,19 +12,53 @@ import ( "crawshaw.io/sqlite" "crawshaw.io/sqlite/sqlitex" + "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/v2/resource" ) type conn = *sqlite.Conn func initConn(conn conn) error { + return sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil) +} + +func initSchema(conn conn) error { return sqlitex.ExecScript(conn, ` +pragma auto_vacuum=incremental; + create table if not exists blob( name text, last_used timestamp default (datetime('now')), data blob, primary key (name) ); + +create table if not exists setting( + name primary key on conflict replace, + value +); + +create view if not exists deletable_blob as +with recursive excess( + usage_with, + last_used, + blob_rowid, + data_length +) as ( + select * from (select (select sum(length(data)) from blob) as usage_with, last_used, rowid, length(data) from blob order by last_used, rowid limit 1) + where usage_with >= (select value from setting where name='capacity') + union all + select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess join blob + on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid)) + where usage_with >= (select value from setting where name='capacity') +) select * from excess; + +CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin + delete from blob where rowid in (select blob_rowid from deletable_blob); +end; +CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin + delete from blob where rowid in (select blob_rowid from deletable_blob); +end; `) } @@ -46,18 +80,49 @@ func (me *poolFromConn) Put(conn conn) { me.mu.Unlock() } -func NewProvider(conn *sqlite.Conn) (*provider, error) { - err := initConn(conn) +func NewProvider(conn *sqlite.Conn) (_ *provider, err error) { + err = initConn(conn) + if err != nil { + return + } + err = initSchema(conn) return &provider{&poolFromConn{conn: conn}}, err } -func NewProviderPool(pool *sqlitex.Pool) (*provider, error) { +func NewProviderPool(pool *sqlitex.Pool, numConns int) (_ *provider, err error) { + _, err = initPoolConns(context.TODO(), pool, numConns) + if err != nil { + return + } conn := pool.Get(context.TODO()) defer pool.Put(conn) - err := initConn(conn) + err = initSchema(conn) return &provider{pool: pool}, err } +func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int) (numInited int, err error) { + var conns []conn + defer func() { + for _, c := range conns { + pool.Put(c) + } + }() + for range iter.N(numConn) { + conn := pool.Get(ctx) + if conn == nil { + break + } + conns = append(conns, conn) + err = initConn(conn) + if err != nil { + err = fmt.Errorf("initing conn %v: %w", len(conns), err) + return + } + numInited++ + } + return +} + type pool interface { Get(context.Context) conn Put(conn) diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go index c18c4c3a..6945069d 100644 --- a/storage/sqlite/sqlite-storage_test.go +++ b/storage/sqlite/sqlite-storage_test.go @@ -24,7 +24,7 @@ func TestSimultaneousIncrementalBlob(t *testing.T) { 10) require.NoError(t, err) defer pool.Close() - p, err := NewProviderPool(pool) + p, err := NewProviderPool(pool, 10) require.NoError(t, err) a, err := p.NewInstance("a") require.NoError(t, err) -- 2.48.1