import (
"bytes"
+ "context"
"errors"
"fmt"
"io"
`)
}
+// Emulates a pool from a single Conn.
+type poolFromConn struct {
+ mu sync.Mutex
+ conn conn
+}
+
+func (me *poolFromConn) Get(ctx context.Context) conn {
+ me.mu.Lock()
+ return me.conn
+}
+
+func (me *poolFromConn) Put(conn conn) {
+ if conn != me.conn {
+ panic("expected to same conn")
+ }
+ me.mu.Unlock()
+}
+
func NewProvider(conn *sqlite.Conn) (*provider, error) {
err := initConn(conn)
- return &provider{conn: conn}, err
+ return &provider{&poolFromConn{conn: conn}}, err
+}
+
+func NewProviderPool(pool *sqlitex.Pool) (*provider, error) {
+ conn := pool.Get(context.TODO())
+ defer pool.Put(conn)
+ err := initConn(conn)
+ return &provider{pool: pool}, err
+}
+
+type pool interface {
+ Get(context.Context) conn
+ Put(conn)
}
type provider struct {
- mu sync.Mutex
- conn conn
+ pool pool
}
func (p *provider) NewInstance(s string) (resource.Instance, error) {
}
func (i instance) withConn(with func(conn conn)) {
- i.lockConn()
- defer i.unlockConn()
- with(i.p.conn)
+ conn := i.p.pool.Get(context.TODO())
+ defer i.p.pool.Put(conn)
+ with(conn)
}
-func (i instance) lockConn() {
- i.p.mu.Lock()
+func (i instance) getConn() *sqlite.Conn {
+ return i.p.pool.Get(context.TODO())
}
-func (i instance) unlockConn() {
- i.p.mu.Unlock()
+func (i instance) putConn(conn *sqlite.Conn) {
+ i.p.pool.Put(conn)
}
func (i instance) Readdirnames() (names []string, err error) {
}
func (i instance) Get() (ret io.ReadCloser, err error) {
- i.lockConn()
- blob, err := i.openBlob(i.p.conn, false, true)
+ conn := i.getConn()
+ blob, err := i.openBlob(conn, false, true)
if err != nil {
- i.unlockConn()
+ i.putConn(conn)
return
}
var once sync.Once
return connBlob{blob, func() {
- once.Do(i.unlockConn)
+ once.Do(func() { i.putConn(conn) })
}}, nil
}
if err != nil {
return nil, err
}
+ // This seems to cause locking issues with in-memory databases. Is it something to do with not
+ // having WAL?
if updateAccess {
err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
if err != nil {
--- /dev/null
+package sqliteStorage
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "path/filepath"
+ "sync"
+ "testing"
+
+ "crawshaw.io/sqlite/sqlitex"
+ _ "github.com/anacrolix/envpprof"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestSimultaneousIncrementalBlob(t *testing.T) {
+ pool, err := sqlitex.Open(
+ // We don't do this in memory, because it seems to have some locking issues with updating
+ // last_used.
+ fmt.Sprintf("file:%s", filepath.Join(t.TempDir(), "sqlite3.db")),
+ 0,
+ 10)
+ require.NoError(t, err)
+ defer pool.Close()
+ p, err := NewProviderPool(pool)
+ require.NoError(t, err)
+ a, err := p.NewInstance("a")
+ require.NoError(t, err)
+ const contents = "hello, world"
+ require.NoError(t, a.Put(bytes.NewReader([]byte("hello, world"))))
+ rc1, err := a.Get()
+ require.NoError(t, err)
+ rc2, err := a.Get()
+ require.NoError(t, err)
+ var b1, b2 []byte
+ var e1, e2 error
+ var wg sync.WaitGroup
+ doRead := func(b *[]byte, e *error, rc io.ReadCloser, n int) {
+ defer wg.Done()
+ defer rc.Close()
+ *b, *e = ioutil.ReadAll(rc)
+ require.NoError(t, *e, n)
+ assert.EqualValues(t, contents, *b)
+ }
+ wg.Add(2)
+ go doRead(&b2, &e2, rc2, 2)
+ go doRead(&b1, &e1, rc1, 1)
+ wg.Wait()
+}