type conn = *sqlite.Conn
-func initConn(conn conn, opts ProviderOpts) error {
+type InitConnOpts struct {
+ SetJournalMode string
+ MmapSizeOk bool // If false, a package-specific default will be used.
+ MmapSize int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
+}
+
+func (me InitConnOpts) JournalMode() string {
+ if me.SetJournalMode != "" {
+ return me.SetJournalMode
+ }
+ return "wal"
+}
+
+func initConn(conn conn, opts InitConnOpts) error {
// Recursive triggers are required because we need to trim the blob_meta size after trimming to
// capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
err := sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
if err != nil {
return err
}
- if opts.NoConcurrentBlobReads {
- err = sqlitex.ExecTransient(conn, `pragma journal_mode=off`, nil)
+ if opts.SetJournalMode != "" {
+ err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
+ ret := stmt.ColumnText(0)
+ if ret != opts.SetJournalMode {
+ panic(ret)
+ }
+ return nil
+ })
if err != nil {
return err
}
}
- if opts.MmapSizeOk {
+ if !opts.MmapSizeOk {
+ opts.MmapSize = 1 << 24 // 8 MiB
+ }
+ if opts.MmapSize >= 0 {
err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
if err != nil {
return err
type NewPiecesStorageOpts struct {
NewPoolOpts
+ InitDbOpts
ProvOpts func(*ProviderOpts)
StorageOpts func(*storage.ResourcePiecesOpts)
}
// A convenience function that creates a connection pool, resource provider, and a pieces storage
// ClientImpl and returns them all with a Close attached.
func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
- conns, provOpts, err := NewPool(opts.NewPoolOpts)
+ conns, err := NewPool(opts.NewPoolOpts)
+ if err != nil {
+ return
+ }
+ err = initPoolDatabase(conns, opts.InitDbOpts)
if err != nil {
return
}
+ provOpts := ProviderOpts{
+ BatchWrites: conns.NumConns() > 1,
+ }
if f := opts.ProvOpts; f != nil {
f(&provOpts)
}
conns.Close()
return
}
+ var (
+ journalMode string
+ )
+ withPoolConn(conns, func(c conn) {
+ err = sqlitex.Exec(c, "pragma journal_mode", func(stmt *sqlite.Stmt) error {
+ journalMode = stmt.ColumnText(0)
+ return nil
+ })
+ })
+ if err != nil {
+ err = fmt.Errorf("getting journal mode: %w", err)
+ prov.Close()
+ return
+ }
+ if journalMode == "" {
+ err = errors.New("didn't get journal mode")
+ prov.Close()
+ return
+ }
storageOpts := storage.ResourcePiecesOpts{
- NoSizedPuts: provOpts.NoConcurrentBlobReads,
+ NoSizedPuts: journalMode != "wal" || conns.NumConns() == 1,
}
if f := opts.StorageOpts; f != nil {
f(&storageOpts)
}
type NewPoolOpts struct {
- // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
- // private, temporary on-disk database will be created. This private database will be
- // automatically deleted as soon as the database connection is closed."
- Path string
- Memory bool
+ NewConnOpts
+ InitConnOpts
NumConns int
- // Forces WAL, disables shared caching.
- NoConcurrentBlobReads bool
- DontInitSchema bool
- PageSize int
+}
+
+type InitDbOpts struct {
+ DontInitSchema bool
+ PageSize int
// If non-zero, overrides the existing setting.
Capacity int64
}
// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
// top-level option type.
-type ProviderOpts struct {
- NumConns int
- // Concurrent blob reads require WAL.
- NoConcurrentBlobReads bool
- BatchWrites bool
- MmapSize int64
- MmapSizeOk bool
+type PoolConf struct {
+ NumConns int
+ JournalMode string
}
// Remove any capacity limits.
return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
}
-func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
- if opts.NumConns == 0 {
- opts.NumConns = runtime.NumCPU()
- }
+type NewConnOpts struct {
+ // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
+ // private, temporary on-disk database will be created. This private database will be
+ // automatically deleted as soon as the database connection is closed."
+ Path string
+ Memory bool
+ // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
+ // and NumConns < 2.
+ NoConcurrentBlobReads bool
+}
+
+func newOpenUri(opts NewConnOpts) string {
path := url.PathEscape(opts.Path)
if opts.Memory {
path = ":memory:"
if opts.NoConcurrentBlobReads || opts.Memory {
values.Add("cache", "shared")
}
- uri := fmt.Sprintf("file:%s?%s", path, values.Encode())
- conns, err := func() (ConnPool, error) {
- switch opts.NumConns {
- case 1:
- conn, err := sqlite.OpenConn(uri, 0)
- return &poolFromConn{conn: conn}, err
- default:
- return sqlitex.Open(uri, 0, opts.NumConns)
- }
- }()
- if err != nil {
- return
- }
- defer func() {
- if err != nil {
- conns.Close()
- }
- }()
- conn := conns.Get(context.TODO())
- defer conns.Put(conn)
+ return fmt.Sprintf("file:%s?%s", path, values.Encode())
+}
+
+func initDatabase(conn conn, opts InitDbOpts) (err error) {
if !opts.DontInitSchema {
if opts.PageSize == 0 {
opts.PageSize = 1 << 14
return
}
}
- return conns, ProviderOpts{
- NumConns: opts.NumConns,
- NoConcurrentBlobReads: opts.NoConcurrentBlobReads || opts.Memory || opts.NumConns == 1,
- BatchWrites: opts.NumConns > 1,
- MmapSize: 1 << 23, // 8 MiB
- MmapSizeOk: true,
- }, nil
+ return
+}
+
+func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
+ withPoolConn(pool, func(c conn) {
+ err = initDatabase(c, opts)
+ })
+ return
+}
+
+func newConn(opts NewConnOpts) (conn, error) {
+ return sqlite.OpenConn(newOpenUri(opts), 0)
+}
+
+type poolWithNumConns struct {
+ *sqlitex.Pool
+ numConns int
+}
+
+func (me poolWithNumConns) NumConns() int {
+ return me.numConns
+}
+
+func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
+ if opts.NumConns == 0 {
+ opts.NumConns = runtime.NumCPU()
+ }
+ conns, err := func() (ConnPool, error) {
+ switch opts.NumConns {
+ case 1:
+ conn, err := newConn(opts.NewConnOpts)
+ return &poolFromConn{conn: conn}, err
+ default:
+ _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), 0, opts.NumConns)
+ return poolWithNumConns{_pool, opts.NumConns}, err
+ }
+ }()
+ if err != nil {
+ return
+ }
+ defer func() {
+ if err != nil {
+ conns.Close()
+ }
+ }()
+ return conns, initPoolConns(nil, conns, InitPoolOpts{
+ NumConns: opts.NumConns,
+ InitConnOpts: opts.InitConnOpts,
+ })
}
// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
return me.conn.Close()
}
+func (poolFromConn) NumConns() int { return 1 }
+
+type ProviderOpts struct {
+ BatchWrites bool
+}
+
// Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
// the ConnPool (since it has to initialize all the connections anyway).
func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
- _, err = initPoolConns(context.TODO(), pool, opts)
- if err != nil {
- err = fmt.Errorf("initing pool conns: %w", err)
- return
- }
prov := &provider{pool: pool, opts: opts}
if opts.BatchWrites {
- if opts.NumConns < 2 {
- err = errors.New("batch writes requires more than 1 conn")
- return
- }
writes := make(chan writeRequest)
prov.writes = writes
// This is retained for backwards compatibility. It may not be necessary.
return prov, nil
}
-func initPoolConns(ctx context.Context, pool ConnPool, opts ProviderOpts) (numInited int, err error) {
+type InitPoolOpts struct {
+ NumConns int
+ InitConnOpts
+}
+
+func initPoolConns(ctx context.Context, pool ConnPool, opts InitPoolOpts) (err error) {
var conns []conn
defer func() {
for _, c := range conns {
break
}
conns = append(conns, conn)
- err = initConn(conn, opts)
+ err = initConn(conn, opts.InitConnOpts)
if err != nil {
err = fmt.Errorf("initing conn %v: %w", len(conns), err)
return
}
- numInited++
}
return
}
Get(context.Context) conn
Put(conn)
Close() error
+ NumConns() int
+}
+
+func withPoolConn(pool ConnPool, with func(conn)) {
+ c := pool.Get(nil)
+ defer pool.Put(c)
+ with(c)
}
type provider struct {
func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
- conns, provOpts, err := NewPool(opts)
- require.NoError(t, err)
+ pool, err := NewPool(opts)
+ qt.Assert(t, err, qt.IsNil)
// sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now.
- //t.Cleanup(func() { conns.Close() })
- prov, err := NewProvider(conns, provOpts)
+ //t.Cleanup(func() { pool.Close() })
+ qt.Assert(t, initPoolDatabase(pool, InitDbOpts{}), qt.IsNil)
+ prov, err := NewProvider(pool, ProviderOpts{BatchWrites: pool.NumConns() > 1})
require.NoError(t, err)
t.Cleanup(func() { prov.Close() })
- return conns, prov
+ return pool, prov
}
func TestTextBlobSize(t *testing.T) {
func BenchmarkMarkComplete(b *testing.B) {
const pieceSize = test_storage.DefaultPieceSize
const capacity = test_storage.DefaultNumPieces * pieceSize / 2
+ runBench := func(b *testing.B, ci storage.ClientImpl) {
+ test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
+ }
c := qt.New(b)
- for _, storage := range []struct {
- name string
- maker func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser
- }{
- {"SqliteDirect", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser {
- ci, err := NewDirectStorage(NewDirectStorageOpts{
- NewPoolOpts: newPoolOpts,
- ProvOpts: provOpts,
+ for _, memory := range []bool{false, true} {
+ b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
+ b.Run("Direct", func(b *testing.B) {
+ var opts NewDirectStorageOpts
+ opts.Memory = memory
+ opts.Path = filepath.Join(b.TempDir(), "storage.db")
+ opts.Capacity = capacity
+ ci, err := NewDirectStorage(opts)
+ c.Assert(err, qt.IsNil)
+ defer ci.Close()
+ runBench(b, ci)
})
- c.Assert(err, qt.IsNil)
- return ci
- }},
- {"SqlitePieceStorage", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser {
- ci, err := NewPiecesStorage(NewPiecesStorageOpts{
- NewPoolOpts: newPoolOpts,
- ProvOpts: provOpts,
+ b.Run("ResourcePieces", func(b *testing.B) {
+ for _, batchWrites := range []bool{false, true} {
+ b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
+ var opts NewPiecesStorageOpts
+ opts.Path = filepath.Join(b.TempDir(), "storage.db")
+ //b.Logf("storage db path: %q", dbPath)
+ opts.Capacity = capacity
+ opts.Memory = memory
+ opts.ProvOpts = func(opts *ProviderOpts) {
+ opts.BatchWrites = batchWrites
+ }
+ ci, err := NewPiecesStorage(opts)
+ c.Assert(err, qt.IsNil)
+ defer ci.Close()
+ runBench(b, ci)
+ })
+ }
})
- c.Assert(err, qt.IsNil)
- return ci
- }},
- } {
- b.Run(storage.name, func(b *testing.B) {
- for _, memory := range []bool{false, true} {
- b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
- for _, batchWrites := range []bool{false, true} {
- b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
- dbPath := filepath.Join(b.TempDir(), "storage.db")
- //b.Logf("storage db path: %q", dbPath)
- newPoolOpts := NewPoolOpts{
- Path: dbPath,
- Capacity: capacity,
- NoConcurrentBlobReads: false,
- PageSize: 1 << 14,
- Memory: memory,
- }
- provOpts := func(opts *ProviderOpts) {
- opts.BatchWrites = batchWrites
- }
- ci := storage.maker(newPoolOpts, provOpts)
- defer ci.Close()
- test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
- })
- }
- })
- }
})
}
}
"fmt"
"io"
"io/ioutil"
- "log"
"os"
"path/filepath"
"runtime"
cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
}
cfg.Seed = false
- cfg.Debug = true
+ //cfg.Debug = true
if ps.ConfigureLeecher.Config != nil {
ps.ConfigureLeecher.Config(cfg)
}
func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase {
return leecherStorageTestCase{
fmt.Sprintf("SqliteFile,NumConns=%v", numConns),
- sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts {
- return sqliteStorage.NewPiecesStorageOpts{
- NewPoolOpts: sqliteStorage.NewPoolOpts{
- Path: filepath.Join(dataDir, "sqlite.db"),
- NumConns: numConns,
- },
- }
+ sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) {
+ opts.Path = filepath.Join(dataDir, "sqlite.db")
+ opts.NumConns = numConns
+ return
}),
numConns,
}
{"Boltdb", storage.NewBoltDB, 0},
{"SqliteDirect", func(s string) storage.ClientImplCloser {
path := filepath.Join(s, "sqlite3.db")
- log.Print(path)
- cl, err := sqliteStorage.NewDirectStorage(sqliteStorage.NewDirectStorageOpts{
- NewPoolOpts: sqliteStorage.NewPoolOpts{
- Path: path,
- },
- ProvOpts: nil,
- })
+ var opts sqliteStorage.NewDirectStorageOpts
+ opts.Path = path
+ cl, err := sqliteStorage.NewDirectStorage(opts)
if err != nil {
panic(err)
}
sqliteLeecherStorageTestCase(2),
// This should use a number of connections equal to the number of CPUs
sqliteLeecherStorageTestCase(0),
- {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts {
- return sqliteStorage.NewPiecesStorageOpts{
- NewPoolOpts: sqliteStorage.NewPoolOpts{
- Memory: true,
- },
- }
+ {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) {
+ opts.Memory = true
+ return
}), 0},
} {
t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {