]> Sergey Matveev's repositories - btrtrc.git/commitdiff
sqlite storage: Add batched writes
authorMatt Joiner <anacrolix@gmail.com>
Fri, 30 Oct 2020 08:46:51 +0000 (19:46 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 30 Oct 2020 08:46:51 +0000 (19:46 +1100)
storage/sqlite/sqlite-storage.go
storage/sqlite/sqlite-storage_test.go

index 75520e7740220ac13f4db80c3c1120c2ccbfa4f8..4d0003533be956553ddf41ca2e37ce2372d497c0 100644 (file)
@@ -89,6 +89,7 @@ func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error)
        }
        prov, err := NewProvider(conns, provOpts)
        if err != nil {
+               conns.Close()
                return
        }
        store := storage.NewResourcePieces(prov)
@@ -97,7 +98,7 @@ func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error)
                io.Closer
        }{
                store,
-               conns,
+               prov,
        }, nil
 }
 
@@ -116,6 +117,7 @@ type ProviderOpts struct {
        NumConns int
        // Concurrent blob reads require WAL.
        ConcurrentBlobRead bool
+       BatchWrites        bool
 }
 
 func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
@@ -133,7 +135,7 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
        conns, err := func() (ConnPool, error) {
                switch opts.NumConns {
                case 1:
-                       conn, err := sqlite.OpenConn(fmt.Sprintf("file:%s", opts.Path), 0)
+                       conn, err := sqlite.OpenConn(path, 0)
                        return &poolFromConn{conn: conn}, err
                default:
                        return sqlitex.Open(path, 0, opts.NumConns)
@@ -145,6 +147,7 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
        return conns, ProviderOpts{
                NumConns:           opts.NumConns,
                ConcurrentBlobRead: opts.ConcurrentBlobReads,
+               BatchWrites:        true,
        }, nil
 }
 
@@ -170,16 +173,23 @@ func (me *poolFromConn) Close() error {
        return me.conn.Close()
 }
 
-// Needs the ConnPool size so it can initialize all the connections with pragmas.
+// Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
+// the ConnPool (since it has to initialize all the connections anyway).
 func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
-       _, err = initPoolConns(context.TODO(), pool, opts.NumConns, opts.ConcurrentBlobRead)
+       _, err = initPoolConns(context.TODO(), pool, opts.NumConns, true)
        if err != nil {
                return
        }
        conn := pool.Get(context.TODO())
        defer pool.Put(conn)
        err = initSchema(conn)
-       return &provider{pool: pool}, err
+       if err != nil {
+               return
+       }
+       writes := make(chan writeRequest)
+       prov := &provider{pool: pool, writes: writes, opts: opts}
+       go prov.writer(writes)
+       return prov, nil
 }
 
 func initPoolConns(ctx context.Context, pool ConnPool, numConn int, wal bool) (numInited int, err error) {
@@ -212,7 +222,57 @@ type ConnPool interface {
 }
 
 type provider struct {
-       pool ConnPool
+       pool   ConnPool
+       writes chan<- writeRequest
+       opts   ProviderOpts
+}
+
+func (me *provider) Close() error {
+       close(me.writes)
+       return me.pool.Close()
+}
+
+type writeRequest struct {
+       query func(*sqlite.Conn)
+       done  chan<- struct{}
+}
+
+func (me *provider) writer(writes <-chan writeRequest) {
+       for {
+               first, ok := <-writes
+               if !ok {
+                       return
+               }
+               buf := []writeRequest{first}
+       buffer:
+               for {
+                       select {
+                       case wr, ok := <-writes:
+                               if !ok {
+                                       break buffer
+                               }
+                               buf = append(buf, wr)
+                       default:
+                               break buffer
+                       }
+               }
+               var cantFail error
+               func() {
+                       conn := me.pool.Get(context.TODO())
+                       defer me.pool.Put(conn)
+                       defer sqlitex.Save(conn)(&cantFail)
+                       for _, wr := range buf {
+                               wr.query(conn)
+                       }
+               }()
+               if cantFail != nil {
+                       panic(cantFail)
+               }
+               for _, wr := range buf {
+                       close(wr.done)
+               }
+               log.Printf("batched %v write queries", len(buf))
+       }
 }
 
 func (p *provider) NewInstance(s string) (resource.Instance, error) {
@@ -224,17 +284,19 @@ type instance struct {
        p        *provider
 }
 
-func (i instance) withConn(with func(conn conn)) {
-       conn := i.p.pool.Get(context.TODO())
-       //err := sqlitex.Exec(conn, "pragma synchronous", func(stmt *sqlite.Stmt) error {
-       //      log.Print(stmt.ColumnText(0))
-       //      return nil
-       //})
-       //if err != nil {
-       //      log.Print(err)
-       //}
-       defer i.p.pool.Put(conn)
-       with(conn)
+func (i instance) withConn(with func(conn conn), write bool) {
+       if write && i.p.opts.BatchWrites {
+               done := make(chan struct{})
+               i.p.writes <- writeRequest{
+                       query: with,
+                       done:  done,
+               }
+               <-done
+       } else {
+               conn := i.p.pool.Get(context.TODO())
+               defer i.p.pool.Put(conn)
+               with(conn)
+       }
 }
 
 func (i instance) getConn() *sqlite.Conn {
@@ -252,7 +314,7 @@ func (i instance) Readdirnames() (names []string, err error) {
                        names = append(names, stmt.ColumnText(0)[len(prefix):])
                        return nil
                }, prefix+"%")
-       })
+       }, false)
        //log.Printf("readdir %q gave %q", i.location, names)
        return
 }
@@ -340,7 +402,7 @@ func (i instance) Put(reader io.Reader) (err error) {
                        }
                        break
                }
-       })
+       }, true)
        return
 }
 
@@ -381,7 +443,7 @@ func (i instance) Stat() (ret os.FileInfo, err error) {
                }
                defer blob.Close()
                ret = fileInfo{blob.Size()}
-       })
+       }, false)
        return
 }
 
@@ -429,7 +491,7 @@ func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
                                err = io.EOF
                        }
                }
-       })
+       }, false)
        return
 }
 
@@ -440,6 +502,6 @@ func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
 func (i instance) Delete() (err error) {
        i.withConn(func(conn conn) {
                err = sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
-       })
+       }, true)
        return
 }
index 2a41373989ed10268eed3a1ff07dfd2fca9d3f5d..bb71d7febdf0a9161993e065087490f462180a7c 100644 (file)
@@ -17,9 +17,11 @@ func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
        opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
        conns, provOpts, err := NewPool(opts)
        require.NoError(t, err)
-       t.Cleanup(func() { conns.Close() })
+       // sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now.
+       //t.Cleanup(func() { conns.Close() })
        prov, err := NewProvider(conns, provOpts)
        require.NoError(t, err)
+       t.Cleanup(func() { prov.Close() })
        return conns, prov
 }