]> Sergey Matveev's repositories - btrtrc.git/blobdiff - storage/sqlite/sqlite-storage.go
Move storage piece benchmarks to storage/test and add a lot more dials
[btrtrc.git] / storage / sqlite / sqlite-storage.go
index 0f8dc13670b8c720ee581d387e98ac3bcf4c71a5..2700204a6266dd3557888fe2da503898ca3451ae 100644 (file)
@@ -11,6 +11,8 @@ import (
        "net/url"
        "os"
        "runtime"
+       "runtime/pprof"
+       "strings"
        "sync"
        "time"
 
@@ -47,103 +49,124 @@ func initConn(conn conn, wal bool) error {
        return nil
 }
 
-func initSchema(conn conn) error {
-       return sqlitex.ExecScript(conn, `
--- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
--- can trim the database file size with partial vacuums without having to do a full vacuum, which 
--- locks everything.
-pragma page_size=16384;
-pragma auto_vacuum=incremental;
-
-create table if not exists blob (
-       name text,
-       last_used timestamp default (datetime('now')),
-       data blob,
-       primary key (name)
-);
-
-create table if not exists blob_meta (
-       key text primary key,
-       value
-);
-
--- While sqlite *seems* to be faster to get sum(length(data)) instead of 
--- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a 
--- cold-cache. With this we can be assured that it doesn't.
-insert or ignore into blob_meta values ('size', 0);
-
-create table if not exists setting (
-       name primary key on conflict replace,
-       value
-);
-
-create view if not exists deletable_blob as
-with recursive excess (
-       usage_with,
-       last_used,
-       blob_rowid,
-       data_length
-) as (
-       select * 
-       from (
-               select 
-                       (select value from blob_meta where key='size') as usage_with,
+func initSchema(conn conn, pageSize int, triggers bool) error {
+       if pageSize != 0 {
+               err := sqlitex.ExecScript(conn, fmt.Sprintf("pragma page_size=%d", pageSize))
+               if err != nil {
+                       return err
+               }
+       }
+       err := sqlitex.ExecScript(conn, `
+               -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
+               -- can trim the database file size with partial vacuums without having to do a full vacuum, which 
+               -- locks everything.
+               pragma auto_vacuum=incremental;
+               
+               create table if not exists blob (
+                       name text,
+                       last_used timestamp default (datetime('now')),
+                       data blob,
+                       primary key (name)
+               );
+               
+               create table if not exists blob_meta (
+                       key text primary key,
+                       value
+               );
+               
+               -- While sqlite *seems* to be faster to get sum(length(data)) instead of 
+               -- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a 
+               -- cold-cache. With this we can be assured that it doesn't.
+               insert or ignore into blob_meta values ('size', 0);
+               
+               create table if not exists setting (
+                       name primary key on conflict replace,
+                       value
+               );
+       
+               create view if not exists deletable_blob as
+               with recursive excess (
+                       usage_with,
                        last_used,
-                       rowid,
-                       length(cast(data as blob))
-               from blob order by last_used, rowid limit 1
-       )
-       where usage_with > (select value from setting where name='capacity')
-       union all
-       select 
-               usage_with-data_length as new_usage_with,
-               blob.last_used,
-               blob.rowid,
-               length(cast(data as blob))
-       from excess join blob
-       on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
-       where new_usage_with > (select value from setting where name='capacity')
-)
-select * from excess;
-
-create trigger if not exists after_insert_blob
-after insert on blob
-begin
-       update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
-       delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
-
-create trigger if not exists after_update_blob
-after update of data on blob
-begin
-       update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
-       delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
-
-create trigger if not exists after_delete_blob
-after delete on blob
-begin
-       update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
-end;
-`)
+                       blob_rowid,
+                       data_length
+               ) as (
+                       select * 
+                       from (
+                               select 
+                                       (select value from blob_meta where key='size') as usage_with,
+                                       last_used,
+                                       rowid,
+                                       length(cast(data as blob))
+                               from blob order by last_used, rowid limit 1
+                       )
+                       where usage_with > (select value from setting where name='capacity')
+                       union all
+                       select 
+                               usage_with-data_length as new_usage_with,
+                               blob.last_used,
+                               blob.rowid,
+                               length(cast(data as blob))
+                       from excess join blob
+                       on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
+                       where new_usage_with > (select value from setting where name='capacity')
+               )
+               select * from excess;
+       `)
+       if err != nil {
+               return err
+       }
+       if triggers {
+               err := sqlitex.ExecScript(conn, `
+                       create trigger if not exists after_insert_blob
+                       after insert on blob
+                       begin
+                               update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
+                               delete from blob where rowid in (select blob_rowid from deletable_blob);
+                       end;
+                       
+                       create trigger if not exists after_update_blob
+                       after update of data on blob
+                       begin
+                               update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
+                               delete from blob where rowid in (select blob_rowid from deletable_blob);
+                       end;
+                       
+                       create trigger if not exists after_delete_blob
+                       after delete on blob
+                       begin
+                               update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
+                       end;
+               `)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+type NewPiecesStorageOpts struct {
+       NewPoolOpts
+       ProvOpts func(*ProviderOpts)
+       storage.ResourcePiecesOpts
 }
 
 // A convenience function that creates a connection pool, resource provider, and a pieces storage
 // ClientImpl and returns them all with a Close attached.
-func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) {
-       conns, provOpts, err := NewPool(opts)
+func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
+       conns, provOpts, err := NewPool(opts.NewPoolOpts)
        if err != nil {
                return
        }
+       if f := opts.ProvOpts; f != nil {
+               f(&provOpts)
+       }
        prov, err := NewProvider(conns, provOpts)
        if err != nil {
                conns.Close()
                return
        }
-       store := storage.NewResourcePiecesOpts(prov, storage.ResourcePiecesOpts{
-               LeaveIncompleteChunks: true,
-               AllowSizedPuts:        true,
-       })
+       store := storage.NewResourcePiecesOpts(prov, opts.ResourcePiecesOpts)
        return struct {
                storage.ClientImpl
                io.Closer
@@ -160,6 +183,7 @@ type NewPoolOpts struct {
        // Forces WAL, disables shared caching.
        ConcurrentBlobReads bool
        DontInitSchema      bool
+       PageSize            int
        // If non-zero, overrides the existing setting.
        Capacity int64
 }
@@ -216,7 +240,10 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
        conn := conns.Get(context.TODO())
        defer conns.Put(conn)
        if !opts.DontInitSchema {
-               err = initSchema(conn)
+               if opts.PageSize == 0 {
+                       opts.PageSize = 1 << 14
+               }
+               err = initSchema(conn, opts.PageSize, true)
                if err != nil {
                        return
                }
@@ -343,7 +370,7 @@ func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written i
                        prefix,
                )
                return err
-       }, false)
+       }, false, 0)
        return
 }
 
@@ -358,12 +385,20 @@ func (me *provider) Close() error {
 }
 
 type writeRequest struct {
-       query withConn
-       done  chan<- error
+       query  withConn
+       done   chan<- error
+       labels pprof.LabelSet
 }
 
 var expvars = expvar.NewMap("sqliteStorage")
 
+func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
+       pprof.Do(context.Background(), labels, func(context.Context) {
+               err = query(conn)
+       })
+       return
+}
+
 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
 // stronger typing on the writes channel.
 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
@@ -381,13 +416,13 @@ func providerWriter(writes <-chan writeRequest, pool ConnPool) {
                var cantFail error
                func() {
                        defer sqlitex.Save(conn)(&cantFail)
-                       firstErr := first.query(conn)
+                       firstErr := runQueryWithLabels(first.query, first.labels, conn)
                        buf = append(buf, func() { first.done <- firstErr })
                        for {
                                select {
                                case wr, ok := <-writes:
                                        if ok {
-                                               err := wr.query(conn)
+                                               err := runQueryWithLabels(wr.query, wr.labels, conn)
                                                buf = append(buf, func() { wr.done <- err })
                                                continue
                                        }
@@ -419,12 +454,26 @@ type instance struct {
        p        *provider
 }
 
-func (p *provider) withConn(with withConn, write bool) error {
+func getLabels(skip int) pprof.LabelSet {
+       return pprof.Labels("f", func() string {
+               var pcs [8]uintptr
+               runtime.Callers(skip+3, pcs[:])
+               fs := runtime.CallersFrames(pcs[:])
+               f, _ := fs.Next()
+               funcName := f.Func.Name()
+               funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
+               //log.Printf("func name: %q", funcName)
+               return funcName
+       }())
+}
+
+func (p *provider) withConn(with withConn, write bool, skip int) error {
        if write && p.opts.BatchWrites {
                done := make(chan error)
                p.writes <- writeRequest{
-                       query: with,
-                       done:  done,
+                       query:  with,
+                       done:   done,
+                       labels: getLabels(skip + 1),
                }
                return <-done
        } else {
@@ -433,14 +482,14 @@ func (p *provider) withConn(with withConn, write bool) error {
                        return errors.New("couldn't get pool conn")
                }
                defer p.pool.Put(conn)
-               return with(conn)
+               return runQueryWithLabels(with, getLabels(skip+1), conn)
        }
 }
 
 type withConn func(conn) error
 
 func (i instance) withConn(with withConn, write bool) error {
-       return i.p.withConn(with, write)
+       return i.p.withConn(with, write, 1)
 }
 
 func (i instance) getConn() *sqlite.Conn {