"net/url"
"os"
"runtime"
+ "runtime/pprof"
+ "strings"
"sync"
"time"
return nil
}
-func initSchema(conn conn) error {
- return sqlitex.ExecScript(conn, `
--- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
--- can trim the database file size with partial vacuums without having to do a full vacuum, which
--- locks everything.
-pragma page_size=16384;
-pragma auto_vacuum=incremental;
-
-create table if not exists blob (
- name text,
- last_used timestamp default (datetime('now')),
- data blob,
- primary key (name)
-);
-
-create table if not exists blob_meta (
- key text primary key,
- value
-);
-
--- While sqlite *seems* to be faster to get sum(length(data)) instead of
--- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a
--- cold-cache. With this we can be assured that it doesn't.
-insert or ignore into blob_meta values ('size', 0);
-
-create table if not exists setting (
- name primary key on conflict replace,
- value
-);
-
-create view if not exists deletable_blob as
-with recursive excess (
- usage_with,
- last_used,
- blob_rowid,
- data_length
-) as (
- select *
- from (
- select
- (select value from blob_meta where key='size') as usage_with,
+func initSchema(conn conn, pageSize int, triggers bool) error {
+ if pageSize != 0 {
+ err := sqlitex.ExecScript(conn, fmt.Sprintf("pragma page_size=%d", pageSize))
+ if err != nil {
+ return err
+ }
+ }
+ err := sqlitex.ExecScript(conn, `
+ -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
+ -- can trim the database file size with partial vacuums without having to do a full vacuum, which
+ -- locks everything.
+ pragma auto_vacuum=incremental;
+
+ create table if not exists blob (
+ name text,
+ last_used timestamp default (datetime('now')),
+ data blob,
+ primary key (name)
+ );
+
+ create table if not exists blob_meta (
+ key text primary key,
+ value
+ );
+
+ -- While sqlite *seems* to be faster to get sum(length(data)) instead of
+ -- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a
+ -- cold-cache. With this we can be assured that it doesn't.
+ insert or ignore into blob_meta values ('size', 0);
+
+ create table if not exists setting (
+ name primary key on conflict replace,
+ value
+ );
+
+ create view if not exists deletable_blob as
+ with recursive excess (
+ usage_with,
last_used,
- rowid,
- length(cast(data as blob))
- from blob order by last_used, rowid limit 1
- )
- where usage_with > (select value from setting where name='capacity')
- union all
- select
- usage_with-data_length as new_usage_with,
- blob.last_used,
- blob.rowid,
- length(cast(data as blob))
- from excess join blob
- on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
- where new_usage_with > (select value from setting where name='capacity')
-)
-select * from excess;
-
-create trigger if not exists after_insert_blob
-after insert on blob
-begin
- update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
- delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
-
-create trigger if not exists after_update_blob
-after update of data on blob
-begin
- update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
- delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
-
-create trigger if not exists after_delete_blob
-after delete on blob
-begin
- update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
-end;
-`)
+ blob_rowid,
+ data_length
+ ) as (
+ select *
+ from (
+ select
+ (select value from blob_meta where key='size') as usage_with,
+ last_used,
+ rowid,
+ length(cast(data as blob))
+ from blob order by last_used, rowid limit 1
+ )
+ where usage_with > (select value from setting where name='capacity')
+ union all
+ select
+ usage_with-data_length as new_usage_with,
+ blob.last_used,
+ blob.rowid,
+ length(cast(data as blob))
+ from excess join blob
+ on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
+ where new_usage_with > (select value from setting where name='capacity')
+ )
+ select * from excess;
+ `)
+ if err != nil {
+ return err
+ }
+ if triggers {
+ err := sqlitex.ExecScript(conn, `
+ create trigger if not exists after_insert_blob
+ after insert on blob
+ begin
+ update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
+ delete from blob where rowid in (select blob_rowid from deletable_blob);
+ end;
+
+ create trigger if not exists after_update_blob
+ after update of data on blob
+ begin
+ update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
+ delete from blob where rowid in (select blob_rowid from deletable_blob);
+ end;
+
+ create trigger if not exists after_delete_blob
+ after delete on blob
+ begin
+ update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
+ end;
+ `)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+type NewPiecesStorageOpts struct {
+ NewPoolOpts
+ ProvOpts func(*ProviderOpts)
+ storage.ResourcePiecesOpts
}
// A convenience function that creates a connection pool, resource provider, and a pieces storage
// ClientImpl and returns them all with a Close attached.
-func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) {
- conns, provOpts, err := NewPool(opts)
+func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
+ conns, provOpts, err := NewPool(opts.NewPoolOpts)
if err != nil {
return
}
+ if f := opts.ProvOpts; f != nil {
+ f(&provOpts)
+ }
prov, err := NewProvider(conns, provOpts)
if err != nil {
conns.Close()
return
}
- store := storage.NewResourcePiecesOpts(prov, storage.ResourcePiecesOpts{
- LeaveIncompleteChunks: true,
- AllowSizedPuts: true,
- })
+ store := storage.NewResourcePiecesOpts(prov, opts.ResourcePiecesOpts)
return struct {
storage.ClientImpl
io.Closer
// Forces WAL, disables shared caching.
ConcurrentBlobReads bool
DontInitSchema bool
+ PageSize int
// If non-zero, overrides the existing setting.
Capacity int64
}
conn := conns.Get(context.TODO())
defer conns.Put(conn)
if !opts.DontInitSchema {
- err = initSchema(conn)
+ if opts.PageSize == 0 {
+ opts.PageSize = 1 << 14
+ }
+ err = initSchema(conn, opts.PageSize, true)
if err != nil {
return
}
prefix,
)
return err
- }, false)
+ }, false, 0)
return
}
}
type writeRequest struct {
- query withConn
- done chan<- error
+ query withConn
+ done chan<- error
+ labels pprof.LabelSet
}
var expvars = expvar.NewMap("sqliteStorage")
+func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
+ pprof.Do(context.Background(), labels, func(context.Context) {
+ err = query(conn)
+ })
+ return
+}
+
// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
// stronger typing on the writes channel.
func providerWriter(writes <-chan writeRequest, pool ConnPool) {
var cantFail error
func() {
defer sqlitex.Save(conn)(&cantFail)
- firstErr := first.query(conn)
+ firstErr := runQueryWithLabels(first.query, first.labels, conn)
buf = append(buf, func() { first.done <- firstErr })
for {
select {
case wr, ok := <-writes:
if ok {
- err := wr.query(conn)
+ err := runQueryWithLabels(wr.query, wr.labels, conn)
buf = append(buf, func() { wr.done <- err })
continue
}
p *provider
}
-func (p *provider) withConn(with withConn, write bool) error {
+func getLabels(skip int) pprof.LabelSet {
+ return pprof.Labels("f", func() string {
+ var pcs [8]uintptr
+ runtime.Callers(skip+3, pcs[:])
+ fs := runtime.CallersFrames(pcs[:])
+ f, _ := fs.Next()
+ funcName := f.Func.Name()
+ funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
+ //log.Printf("func name: %q", funcName)
+ return funcName
+ }())
+}
+
+func (p *provider) withConn(with withConn, write bool, skip int) error {
if write && p.opts.BatchWrites {
done := make(chan error)
p.writes <- writeRequest{
- query: with,
- done: done,
+ query: with,
+ done: done,
+ labels: getLabels(skip + 1),
}
return <-done
} else {
return errors.New("couldn't get pool conn")
}
defer p.pool.Put(conn)
- return with(conn)
+ return runQueryWithLabels(with, getLabels(skip+1), conn)
}
}
type withConn func(conn) error
func (i instance) withConn(with withConn, write bool) error {
- return i.p.withConn(with, write)
+ return i.p.withConn(with, write, 1)
}
func (i instance) getConn() *sqlite.Conn {