]> Sergey Matveev's repositories - btrtrc.git/commitdiff
sqlite storage: Include capacity management
authorMatt Joiner <anacrolix@gmail.com>
Tue, 27 Oct 2020 00:08:37 +0000 (11:08 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 27 Oct 2020 00:08:37 +0000 (11:08 +1100)
storage/sqlite/sql
storage/sqlite/sqlite-storage.go
storage/sqlite/sqlite-storage_test.go

index a6f4bdce613805179504051fe9b57e5314e20281..31a766868117f1d190f69edd4a8aaf20e511d39c 100644 (file)
@@ -1,11 +1,29 @@
-with recursive excess(
+pragma auto_vacuum=incremental;
+create table if not exists blob(
+       name text,
+       last_used timestamp default (datetime('now')),
+       data blob,
+       primary key (name)
+);
+
+create view if not exists deletable_blob as
+with recursive excess_blob(
        usage_with,
        last_used,
        blob_rowid,
        data_length
 ) as (
-       select * from (select (select sum(length(data)) from blob), last_used, rowid, length(data) from blob order by last_used, rowid limit 1)
+       select * from (select (select sum(length(data)) from blob) as usage_with, last_used, rowid, length(data) from blob order by last_used, rowid limit 1)
+               where usage_with >= (select value from setting where name='capacity')
        union all
-       select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess join blob
+       select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess_blob join blob
                on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
-) select * from excess limit 10;
+       where usage_with >= (select value from setting where name='capacity')
+) select * from excess;
+
+CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin
+       delete from blob where rowid in (select blob_rowid from deletable_blob);
+end;
+CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin
+       delete from blob where rowid in (select blob_rowid from deletable_blob);
+end;
index 2674b4d9af2a326d70d93c92b7fcce21635f6fd5..c601ec0f7167808b70d8843f512ed06af623d463 100644 (file)
@@ -12,19 +12,53 @@ import (
 
        "crawshaw.io/sqlite"
        "crawshaw.io/sqlite/sqlitex"
+       "github.com/anacrolix/missinggo/iter"
        "github.com/anacrolix/missinggo/v2/resource"
 )
 
 type conn = *sqlite.Conn
 
 func initConn(conn conn) error {
+       return sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
+}
+
+func initSchema(conn conn) error {
        return sqlitex.ExecScript(conn, `
+pragma auto_vacuum=incremental;
+
 create table if not exists blob(
        name text,
        last_used timestamp default (datetime('now')),
        data blob,
        primary key (name)
 );
+
+create table if not exists setting(
+       name primary key on conflict replace,
+       value
+);
+
+create view if not exists deletable_blob as
+with recursive excess(
+       usage_with,
+       last_used,
+       blob_rowid,
+       data_length
+) as (
+       select * from (select (select sum(length(data)) from blob) as usage_with, last_used, rowid, length(data) from blob order by last_used, rowid limit 1)
+               where usage_with >= (select value from setting where name='capacity')
+       union all
+       select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess join blob
+               on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
+       where usage_with >= (select value from setting where name='capacity')
+) select * from excess;
+
+CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin 
+       delete from blob where rowid in (select blob_rowid from deletable_blob);
+end;
+CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin
+       delete from blob where rowid in (select blob_rowid from deletable_blob);
+end;
 `)
 }
 
@@ -46,18 +80,49 @@ func (me *poolFromConn) Put(conn conn) {
        me.mu.Unlock()
 }
 
-func NewProvider(conn *sqlite.Conn) (*provider, error) {
-       err := initConn(conn)
+func NewProvider(conn *sqlite.Conn) (_ *provider, err error) {
+       err = initConn(conn)
+       if err != nil {
+               return
+       }
+       err = initSchema(conn)
        return &provider{&poolFromConn{conn: conn}}, err
 }
 
-func NewProviderPool(pool *sqlitex.Pool) (*provider, error) {
+func NewProviderPool(pool *sqlitex.Pool, numConns int) (_ *provider, err error) {
+       _, err = initPoolConns(context.TODO(), pool, numConns)
+       if err != nil {
+               return
+       }
        conn := pool.Get(context.TODO())
        defer pool.Put(conn)
-       err := initConn(conn)
+       err = initSchema(conn)
        return &provider{pool: pool}, err
 }
 
+func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int) (numInited int, err error) {
+       var conns []conn
+       defer func() {
+               for _, c := range conns {
+                       pool.Put(c)
+               }
+       }()
+       for range iter.N(numConn) {
+               conn := pool.Get(ctx)
+               if conn == nil {
+                       break
+               }
+               conns = append(conns, conn)
+               err = initConn(conn)
+               if err != nil {
+                       err = fmt.Errorf("initing conn %v: %w", len(conns), err)
+                       return
+               }
+               numInited++
+       }
+       return
+}
+
 type pool interface {
        Get(context.Context) conn
        Put(conn)
index c18c4c3a48002e0c19f939b7f21bd962d545376d..6945069d06b8a9ef106c7f717dea1af47fe0aa3b 100644 (file)
@@ -24,7 +24,7 @@ func TestSimultaneousIncrementalBlob(t *testing.T) {
                10)
        require.NoError(t, err)
        defer pool.Close()
-       p, err := NewProviderPool(pool)
+       p, err := NewProviderPool(pool, 10)
        require.NoError(t, err)
        a, err := p.NewInstance("a")
        require.NoError(t, err)