]> Sergey Matveev's repositories - btrtrc.git/commitdiff
sqlite storage: Add NewProviderPool
authorMatt Joiner <anacrolix@gmail.com>
Thu, 22 Oct 2020 22:03:44 +0000 (09:03 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 22 Oct 2020 22:03:44 +0000 (09:03 +1100)
storage/sqlite/sqlite-storage.go
storage/sqlite/sqlite-storage_test.go [new file with mode: 0644]

index 0ec59070c1bb4e0b80ba2a759dda0a75840dfe35..e4eba4beb7a2b01107a5d1dbac03b52c917160d2 100644 (file)
@@ -2,6 +2,7 @@ package sqliteStorage
 
 import (
        "bytes"
+       "context"
        "errors"
        "fmt"
        "io"
@@ -27,14 +28,43 @@ create table if not exists blob(
 `)
 }
 
+// Emulates a pool from a single Conn.
+type poolFromConn struct {
+       mu   sync.Mutex
+       conn conn
+}
+
+func (me *poolFromConn) Get(ctx context.Context) conn {
+       me.mu.Lock()
+       return me.conn
+}
+
+func (me *poolFromConn) Put(conn conn) {
+       if conn != me.conn {
+               panic("expected to same conn")
+       }
+       me.mu.Unlock()
+}
+
 func NewProvider(conn *sqlite.Conn) (*provider, error) {
        err := initConn(conn)
-       return &provider{conn: conn}, err
+       return &provider{&poolFromConn{conn: conn}}, err
+}
+
+func NewProviderPool(pool *sqlitex.Pool) (*provider, error) {
+       conn := pool.Get(context.TODO())
+       defer pool.Put(conn)
+       err := initConn(conn)
+       return &provider{pool: pool}, err
+}
+
+type pool interface {
+       Get(context.Context) conn
+       Put(conn)
 }
 
 type provider struct {
-       mu   sync.Mutex
-       conn conn
+       pool pool
 }
 
 func (p *provider) NewInstance(s string) (resource.Instance, error) {
@@ -47,17 +77,17 @@ type instance struct {
 }
 
 func (i instance) withConn(with func(conn conn)) {
-       i.lockConn()
-       defer i.unlockConn()
-       with(i.p.conn)
+       conn := i.p.pool.Get(context.TODO())
+       defer i.p.pool.Put(conn)
+       with(conn)
 }
 
-func (i instance) lockConn() {
-       i.p.mu.Lock()
+func (i instance) getConn() *sqlite.Conn {
+       return i.p.pool.Get(context.TODO())
 }
 
-func (i instance) unlockConn() {
-       i.p.mu.Unlock()
+func (i instance) putConn(conn *sqlite.Conn) {
+       i.p.pool.Put(conn)
 }
 
 func (i instance) Readdirnames() (names []string, err error) {
@@ -104,15 +134,15 @@ func (me connBlob) Close() error {
 }
 
 func (i instance) Get() (ret io.ReadCloser, err error) {
-       i.lockConn()
-       blob, err := i.openBlob(i.p.conn, false, true)
+       conn := i.getConn()
+       blob, err := i.openBlob(conn, false, true)
        if err != nil {
-               i.unlockConn()
+               i.putConn(conn)
                return
        }
        var once sync.Once
        return connBlob{blob, func() {
-               once.Do(i.unlockConn)
+               once.Do(func() { i.putConn(conn) })
        }}, nil
 }
 
@@ -121,6 +151,8 @@ func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, e
        if err != nil {
                return nil, err
        }
+       // This seems to cause locking issues with in-memory databases. Is it something to do with not
+       // having WAL?
        if updateAccess {
                err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
                if err != nil {
diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go
new file mode 100644 (file)
index 0000000..c18c4c3
--- /dev/null
@@ -0,0 +1,51 @@
+package sqliteStorage
+
+import (
+       "bytes"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "path/filepath"
+       "sync"
+       "testing"
+
+       "crawshaw.io/sqlite/sqlitex"
+       _ "github.com/anacrolix/envpprof"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestSimultaneousIncrementalBlob(t *testing.T) {
+       pool, err := sqlitex.Open(
+               // We don't do this in memory, because it seems to have some locking issues with updating
+               // last_used.
+               fmt.Sprintf("file:%s", filepath.Join(t.TempDir(), "sqlite3.db")),
+               0,
+               10)
+       require.NoError(t, err)
+       defer pool.Close()
+       p, err := NewProviderPool(pool)
+       require.NoError(t, err)
+       a, err := p.NewInstance("a")
+       require.NoError(t, err)
+       const contents = "hello, world"
+       require.NoError(t, a.Put(bytes.NewReader([]byte("hello, world"))))
+       rc1, err := a.Get()
+       require.NoError(t, err)
+       rc2, err := a.Get()
+       require.NoError(t, err)
+       var b1, b2 []byte
+       var e1, e2 error
+       var wg sync.WaitGroup
+       doRead := func(b *[]byte, e *error, rc io.ReadCloser, n int) {
+               defer wg.Done()
+               defer rc.Close()
+               *b, *e = ioutil.ReadAll(rc)
+               require.NoError(t, *e, n)
+               assert.EqualValues(t, contents, *b)
+       }
+       wg.Add(2)
+       go doRead(&b2, &e2, rc2, 2)
+       go doRead(&b1, &e1, rc1, 1)
+       wg.Wait()
+}