"fmt"
"io"
"log"
+ "net/url"
"os"
+ "runtime"
"sync"
"time"
"crawshaw.io/sqlite/sqlitex"
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/resource"
+ "github.com/anacrolix/torrent/storage"
)
type conn = *sqlite.Conn
`)
}
-// Emulates a pool from a single Conn.
+// A convenience function that creates a connection pool, resource provider, and a pieces storage
+// ClientImpl and returns them all with a Close attached.
+func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) {
+ conns, provOpts, err := NewPool(opts)
+ if err != nil {
+ return
+ }
+ prov, err := NewProvider(conns, provOpts)
+ if err != nil {
+ return
+ }
+ store := storage.NewResourcePieces(prov)
+ return struct {
+ storage.ClientImpl
+ io.Closer
+ }{
+ store,
+ conns,
+ }, nil
+}
+
+type NewPoolOpts struct {
+ Path string
+ Memory bool
+ NumConns int
+ // Forces WAL, disables shared caching.
+ ConcurrentBlobReads bool
+}
+
+// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
+// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
+// top-level option type.
+type ProviderOpts struct {
+ NumConns int
+ // Concurrent blob reads require WAL.
+ ConcurrentBlobRead bool
+}
+
+func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
+ if opts.NumConns == 0 {
+ opts.NumConns = runtime.NumCPU()
+ }
+ if opts.Memory {
+ opts.Path = ":memory:"
+ }
+ values := make(url.Values)
+ if !opts.ConcurrentBlobReads {
+ values.Add("cache", "shared")
+ }
+ path := fmt.Sprintf("file:%s?%s", opts.Path, values.Encode())
+ conns, err := func() (ConnPool, error) {
+ switch opts.NumConns {
+ case 1:
+ conn, err := sqlite.OpenConn(fmt.Sprintf("file:%s", opts.Path), 0)
+ return &poolFromConn{conn: conn}, err
+ default:
+ return sqlitex.Open(path, 0, opts.NumConns)
+ }
+ }()
+ if err != nil {
+ return
+ }
+ return conns, ProviderOpts{
+ NumConns: opts.NumConns,
+ ConcurrentBlobRead: opts.ConcurrentBlobReads,
+ }, nil
+}
+
+// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
type poolFromConn struct {
mu sync.Mutex
conn conn
me.mu.Unlock()
}
-func NewProvider(conn *sqlite.Conn) (_ *provider, err error) {
- err = initConn(conn, false)
- if err != nil {
- return
- }
- err = initSchema(conn)
- return &provider{&poolFromConn{conn: conn}}, err
+func (me *poolFromConn) Close() error {
+ return me.conn.Close()
}
-// Needs the pool size so it can initialize all the connections with pragmas.
-func NewProviderPool(pool *sqlitex.Pool, numConns int, wal bool) (_ *provider, err error) {
- _, err = initPoolConns(context.TODO(), pool, numConns, wal)
+// Needs the ConnPool size so it can initialize all the connections with pragmas.
+func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
+ _, err = initPoolConns(context.TODO(), pool, opts.NumConns, opts.ConcurrentBlobRead)
if err != nil {
return
}
return &provider{pool: pool}, err
}
-func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int, wal bool) (numInited int, err error) {
+func initPoolConns(ctx context.Context, pool ConnPool, numConn int, wal bool) (numInited int, err error) {
var conns []conn
defer func() {
for _, c := range conns {
return
}
-type pool interface {
+type ConnPool interface {
Get(context.Context) conn
Put(conn)
+ Close() error
}
type provider struct {
- pool pool
+ pool ConnPool
}
func (p *provider) NewInstance(s string) (resource.Instance, error) {
import (
"bytes"
- "fmt"
"io"
"io/ioutil"
"path/filepath"
"sync"
"testing"
- "crawshaw.io/sqlite"
- "crawshaw.io/sqlite/sqlitex"
_ "github.com/anacrolix/envpprof"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
-func TestSimultaneousIncrementalBlob(t *testing.T) {
- const poolSize = 10
- pool, err := sqlitex.Open(
- // We don't do this in memory, because it seems to have some locking issues with updating
- // last_used.
- fmt.Sprintf("file:%s", filepath.Join(t.TempDir(), "sqlite3.db")),
- // We can't disable WAL in this test because then we can't open 2 blobs simultaneously for read.
- sqlite.OpenFlagsDefault, /* &^sqlite.SQLITE_OPEN_WAL */
- poolSize)
+func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
+ opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
+ conns, provOpts, err := NewPool(opts)
require.NoError(t, err)
- defer pool.Close()
- p, err := NewProviderPool(pool, poolSize, true)
+ t.Cleanup(func() { conns.Close() })
+ prov, err := NewProvider(conns, provOpts)
require.NoError(t, err)
+ return conns, prov
+}
+
+func TestTextBlobSize(t *testing.T) {
+ _, prov := newConnsAndProv(t, NewPoolOpts{})
+ a, _ := prov.NewInstance("a")
+ a.Put(bytes.NewBufferString("\x00hello"))
+ fi, _ := a.Stat()
+ assert.EqualValues(t, 6, fi.Size())
+}
+
+func TestSimultaneousIncrementalBlob(t *testing.T) {
+ _, p := newConnsAndProv(t, NewPoolOpts{
+ NumConns: 2,
+ ConcurrentBlobReads: true,
+ })
a, err := p.NewInstance("a")
require.NoError(t, err)
const contents = "hello, world"
"testing"
"time"
- "crawshaw.io/sqlite"
- "crawshaw.io/sqlite/sqlitex"
"github.com/anacrolix/missinggo/v2/filecache"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/internal/testutil"
testClientTransferSmallCache(t, false, -1)
}
-func sqliteClientStorageFactory(connPathMaker func(dataDir string) string) storageFactory {
+func sqliteClientStorageFactory(connOptsMaker func(dataDir string) sqliteStorage.NewPoolOpts) storageFactory {
return func(dataDir string) storage.ClientImplCloser {
- path := connPathMaker(dataDir)
- log.Printf("opening sqlite db at %q", path)
- if true {
- conn, err := sqlite.OpenConn(path, 0)
- if err != nil {
- panic(err)
- }
- prov, err := sqliteStorage.NewProvider(conn)
- if err != nil {
- panic(err)
- }
- return struct {
- storage.ClientImpl
- io.Closer
- }{
- storage.NewResourcePieces(prov),
- conn,
- }
- } else {
- // Test pool implementation for SQLITE_BUSY when we want SQLITE_LOCKED (so the
- // crawshaw.io/sqlite unlock notify handler kicks in for us).
- const poolSize = 1
- pool, err := sqlitex.Open(path, 0, poolSize)
- if err != nil {
- panic(err)
- }
- prov, err := sqliteStorage.NewProviderPool(pool, poolSize, false)
- if err != nil {
- panic(err)
- }
- return struct {
- storage.ClientImpl
- io.Closer
- }{
- storage.NewResourcePieces(prov),
- pool,
- }
+ connOpts := connOptsMaker(dataDir)
+ log.Printf("opening sqlite db: %#v", connOpts)
+ ret, err := sqliteStorage.NewPiecesStorage(connOpts)
+ if err != nil {
+ panic(err)
}
+ return ret
}
}
Wrapper: fileCachePieceResourceStorage,
})},
{"Boltdb", storage.NewBoltDB},
- {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) string {
- return "file:" + filepath.Join(dataDir, "sqlite.db")
+ {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts {
+ return sqliteStorage.NewPoolOpts{
+ Path: filepath.Join(dataDir, "sqlite.db"),
+ }
})},
- {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) string {
- return "file:memory:?mode=memory&cache=shared"
+ {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts {
+ return sqliteStorage.NewPoolOpts{
+ Memory: true,
+ }
})},
} {
t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {