}
prov, err := NewProvider(conns, provOpts)
if err != nil {
+ conns.Close()
return
}
store := storage.NewResourcePieces(prov)
io.Closer
}{
store,
- conns,
+ prov,
}, nil
}
NumConns int
// Concurrent blob reads require WAL.
ConcurrentBlobRead bool
+ BatchWrites bool
}
func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
conns, err := func() (ConnPool, error) {
switch opts.NumConns {
case 1:
- conn, err := sqlite.OpenConn(fmt.Sprintf("file:%s", opts.Path), 0)
+ conn, err := sqlite.OpenConn(path, 0)
return &poolFromConn{conn: conn}, err
default:
return sqlitex.Open(path, 0, opts.NumConns)
return conns, ProviderOpts{
NumConns: opts.NumConns,
ConcurrentBlobRead: opts.ConcurrentBlobReads,
+ BatchWrites: true,
}, nil
}
return me.conn.Close()
}
-// Needs the ConnPool size so it can initialize all the connections with pragmas.
+// Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
+// the ConnPool (since it has to initialize all the connections anyway).
func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
- _, err = initPoolConns(context.TODO(), pool, opts.NumConns, opts.ConcurrentBlobRead)
+ _, err = initPoolConns(context.TODO(), pool, opts.NumConns, true)
if err != nil {
return
}
conn := pool.Get(context.TODO())
defer pool.Put(conn)
err = initSchema(conn)
- return &provider{pool: pool}, err
+ if err != nil {
+ return
+ }
+ writes := make(chan writeRequest)
+ prov := &provider{pool: pool, writes: writes, opts: opts}
+ go prov.writer(writes)
+ return prov, nil
}
func initPoolConns(ctx context.Context, pool ConnPool, numConn int, wal bool) (numInited int, err error) {
}
type provider struct {
- pool ConnPool
+ pool ConnPool
+ writes chan<- writeRequest
+ opts ProviderOpts
+}
+
+func (me *provider) Close() error {
+ close(me.writes)
+ return me.pool.Close()
+}
+
+type writeRequest struct {
+ query func(*sqlite.Conn)
+ done chan<- struct{}
+}
+
+func (me *provider) writer(writes <-chan writeRequest) {
+ for {
+ first, ok := <-writes
+ if !ok {
+ return
+ }
+ buf := []writeRequest{first}
+ buffer:
+ for {
+ select {
+ case wr, ok := <-writes:
+ if !ok {
+ break buffer
+ }
+ buf = append(buf, wr)
+ default:
+ break buffer
+ }
+ }
+ var cantFail error
+ func() {
+ conn := me.pool.Get(context.TODO())
+ defer me.pool.Put(conn)
+ defer sqlitex.Save(conn)(&cantFail)
+ for _, wr := range buf {
+ wr.query(conn)
+ }
+ }()
+ if cantFail != nil {
+ panic(cantFail)
+ }
+ for _, wr := range buf {
+ close(wr.done)
+ }
+ log.Printf("batched %v write queries", len(buf))
+ }
}
func (p *provider) NewInstance(s string) (resource.Instance, error) {
p *provider
}
-func (i instance) withConn(with func(conn conn)) {
- conn := i.p.pool.Get(context.TODO())
- //err := sqlitex.Exec(conn, "pragma synchronous", func(stmt *sqlite.Stmt) error {
- // log.Print(stmt.ColumnText(0))
- // return nil
- //})
- //if err != nil {
- // log.Print(err)
- //}
- defer i.p.pool.Put(conn)
- with(conn)
+func (i instance) withConn(with func(conn conn), write bool) {
+ if write && i.p.opts.BatchWrites {
+ done := make(chan struct{})
+ i.p.writes <- writeRequest{
+ query: with,
+ done: done,
+ }
+ <-done
+ } else {
+ conn := i.p.pool.Get(context.TODO())
+ defer i.p.pool.Put(conn)
+ with(conn)
+ }
}
func (i instance) getConn() *sqlite.Conn {
names = append(names, stmt.ColumnText(0)[len(prefix):])
return nil
}, prefix+"%")
- })
+ }, false)
//log.Printf("readdir %q gave %q", i.location, names)
return
}
}
break
}
- })
+ }, true)
return
}
}
defer blob.Close()
ret = fileInfo{blob.Size()}
- })
+ }, false)
return
}
err = io.EOF
}
}
- })
+ }, false)
return
}
func (i instance) Delete() (err error) {
i.withConn(func(conn conn) {
err = sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
- })
+ }, true)
return
}