"errors"
"fmt"
"io"
+ "log"
"os"
"sync"
"time"
type conn = *sqlite.Conn
-func initConn(conn conn) error {
- return sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
+func initConn(conn conn, wal bool) error {
+ err := sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
+ if err != nil {
+ return err
+ }
+ if !wal {
+ err = sqlitex.ExecTransient(conn, `pragma journal_mode=off`, nil)
+ if err != nil {
+ return err
+ }
+ }
+ err = sqlitex.ExecTransient(conn, `pragma mmap_size=1000000000`, nil)
+ if err != nil {
+ return err
+ }
+ return nil
}
func initSchema(conn conn) error {
}
func NewProvider(conn *sqlite.Conn) (_ *provider, err error) {
- err = initConn(conn)
+ err = initConn(conn, false)
if err != nil {
return
}
return &provider{&poolFromConn{conn: conn}}, err
}
-func NewProviderPool(pool *sqlitex.Pool, numConns int) (_ *provider, err error) {
- _, err = initPoolConns(context.TODO(), pool, numConns)
+// Needs the pool size so it can initialize all the connections with pragmas.
+func NewProviderPool(pool *sqlitex.Pool, numConns int, wal bool) (_ *provider, err error) {
+ _, err = initPoolConns(context.TODO(), pool, numConns, wal)
if err != nil {
return
}
return &provider{pool: pool}, err
}
-func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int) (numInited int, err error) {
+func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int, wal bool) (numInited int, err error) {
var conns []conn
defer func() {
for _, c := range conns {
break
}
conns = append(conns, conn)
- err = initConn(conn)
+ err = initConn(conn, wal)
if err != nil {
err = fmt.Errorf("initing conn %v: %w", len(conns), err)
return
func (i instance) withConn(with func(conn conn)) {
conn := i.p.pool.Get(context.TODO())
+ //err := sqlitex.Exec(conn, "pragma synchronous", func(stmt *sqlite.Stmt) error {
+ // log.Print(stmt.ColumnText(0))
+ // return nil
+ //})
+ //if err != nil {
+ // log.Print(err)
+ //}
defer i.p.pool.Put(conn)
with(conn)
}
return err
}
i.withConn(func(conn conn) {
- err = sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, ?)", nil, i.location, buf.Bytes())
+ for range iter.N(10) {
+ err = sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, ?)", nil, i.location, buf.Bytes())
+ if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
+ log.Print("sqlite busy")
+ time.Sleep(time.Second)
+ continue
+ }
+ break
+ }
})
return
}
"sync"
"testing"
+ "crawshaw.io/sqlite"
"crawshaw.io/sqlite/sqlitex"
_ "github.com/anacrolix/envpprof"
"github.com/stretchr/testify/assert"
)
func TestSimultaneousIncrementalBlob(t *testing.T) {
+ const poolSize = 10
pool, err := sqlitex.Open(
// We don't do this in memory, because it seems to have some locking issues with updating
// last_used.
fmt.Sprintf("file:%s", filepath.Join(t.TempDir(), "sqlite3.db")),
- 0,
- 10)
+ // We can't disable WAL in this test because then we can't open 2 blobs simultaneously for read.
+ sqlite.OpenFlagsDefault, /* &^sqlite.SQLITE_OPEN_WAL */
+ poolSize)
require.NoError(t, err)
defer pool.Close()
- p, err := NewProviderPool(pool, 10)
+ p, err := NewProviderPool(pool, poolSize, true)
require.NoError(t, err)
a, err := p.NewInstance("a")
require.NoError(t, err)
"time"
"crawshaw.io/sqlite"
+ "crawshaw.io/sqlite/sqlitex"
"github.com/anacrolix/missinggo/v2/filecache"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/internal/testutil"
return func(dataDir string) storage.ClientImplCloser {
path := connPathMaker(dataDir)
log.Printf("opening sqlite db at %q", path)
- conn, err := sqlite.OpenConn(path, 0)
- if err != nil {
- panic(err)
- }
- prov, err := sqliteStorage.NewProvider(conn)
- if err != nil {
- panic(err)
- }
- return struct {
- storage.ClientImpl
- io.Closer
- }{
- storage.NewResourcePieces(prov),
- conn,
+ if true {
+ conn, err := sqlite.OpenConn(path, 0)
+ if err != nil {
+ panic(err)
+ }
+ prov, err := sqliteStorage.NewProvider(conn)
+ if err != nil {
+ panic(err)
+ }
+ return struct {
+ storage.ClientImpl
+ io.Closer
+ }{
+ storage.NewResourcePieces(prov),
+ conn,
+ }
+ } else {
+ // Test pool implementation for SQLITE_BUSY when we want SQLITE_LOCKED (so the
+ // crawshaw.io/sqlite unlock notify handler kicks in for us).
+ const poolSize = 1
+ pool, err := sqlitex.Open(path, 0, poolSize)
+ if err != nil {
+ panic(err)
+ }
+ prov, err := sqliteStorage.NewProviderPool(pool, poolSize, false)
+ if err != nil {
+ panic(err)
+ }
+ return struct {
+ storage.ClientImpl
+ io.Closer
+ }{
+ storage.NewResourcePieces(prov),
+ pool,
+ }
}
-
}
}
return "file:" + filepath.Join(dataDir, "sqlite.db")
})},
{"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) string {
- return "file:memory:?mode=memory"
+ return "file:memory:?mode=memory&cache=shared"
})},
} {
t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {