package sqliteStorage
import (
- "bytes"
- "context"
_ "embed"
"errors"
- "expvar"
"fmt"
- "io"
- "log"
"net/url"
- "os"
- "runtime"
- "runtime/pprof"
- "strings"
- "sync"
- "time"
"crawshaw.io/sqlite"
"crawshaw.io/sqlite/sqlitex"
- "github.com/anacrolix/missinggo/iter"
- "github.com/anacrolix/missinggo/v2/resource"
-
- "github.com/anacrolix/torrent/storage"
)
type conn = *sqlite.Conn
return nil
}
-type NewPiecesStorageOpts struct {
- NewPoolOpts
- InitDbOpts
- ProvOpts func(*ProviderOpts)
- StorageOpts func(*storage.ResourcePiecesOpts)
-}
-
-type NewPoolOpts struct {
- NewConnOpts
- InitConnOpts
- NumConns int
-}
-
type InitDbOpts struct {
DontInitSchema bool
PageSize int
NoTriggers bool
}
-// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
-// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
-// top-level option type.
-type PoolConf struct {
- NumConns int
- JournalMode string
-}
-
// Remove any capacity limits.
-func UnlimitCapacity(conn conn) error {
+func unlimitCapacity(conn conn) error {
return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
}
// Set the capacity limit to exactly this value.
-func SetCapacity(conn conn, cap int64) error {
+func setCapacity(conn conn, cap int64) error {
return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
}
return
}
}
- if opts.Capacity != 0 {
- err = SetCapacity(conn, opts.Capacity)
- if err != nil {
- return
- }
+ if opts.Capacity < 0 {
+ err = unlimitCapacity(conn)
+ } else if opts.Capacity > 0 {
+ err = setCapacity(conn, opts.Capacity)
}
return
}
-func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
- withPoolConn(pool, func(c conn) {
- err = initDatabase(c, opts)
- })
- return
-}
-
// Go fmt, why you so shit?
const openConnFlags = 0 |
sqlite.SQLITE_OPEN_READWRITE |
func newConn(opts NewConnOpts) (conn, error) {
return sqlite.OpenConn(newOpenUri(opts), openConnFlags)
}
-
-type poolWithNumConns struct {
- *sqlitex.Pool
- numConns int
-}
-
-func (me poolWithNumConns) NumConns() int {
- return me.numConns
-}
-
-func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
- if opts.NumConns == 0 {
- opts.NumConns = runtime.NumCPU()
- }
- switch opts.NumConns {
- case 1:
- conn, err := newConn(opts.NewConnOpts)
- return &poolFromConn{conn: conn}, err
- default:
- _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), openConnFlags, opts.NumConns)
- return poolWithNumConns{_pool, opts.NumConns}, err
- }
-}
-
-// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
-type poolFromConn struct {
- mu sync.Mutex
- conn conn
-}
-
-func (me *poolFromConn) Get(ctx context.Context) conn {
- me.mu.Lock()
- return me.conn
-}
-
-func (me *poolFromConn) Put(conn conn) {
- if conn != me.conn {
- panic("expected to same conn")
- }
- me.mu.Unlock()
-}
-
-func (me *poolFromConn) Close() error {
- return me.conn.Close()
-}
-
-func (me *poolFromConn) NumConns() int { return 1 }
-
-type ProviderOpts struct {
- BatchWrites bool
-}
-
-// Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
-// the ConnPool (since it has to initialize all the connections anyway).
-func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
- prov := &provider{pool: pool, opts: opts}
- if opts.BatchWrites {
- writes := make(chan writeRequest)
- prov.writes = writes
- // This is retained for backwards compatibility. It may not be necessary.
- runtime.SetFinalizer(prov, func(p *provider) {
- p.Close()
- })
- go providerWriter(writes, prov.pool)
- }
- return prov, nil
-}
-
-type InitPoolOpts struct {
- NumConns int
- InitConnOpts
-}
-
-func initPoolConns(ctx context.Context, pool ConnPool, opts InitConnOpts) (err error) {
- var conns []conn
- defer func() {
- for _, c := range conns {
- pool.Put(c)
- }
- }()
- for range iter.N(pool.NumConns()) {
- conn := pool.Get(ctx)
- if conn == nil {
- break
- }
- conns = append(conns, conn)
- err = initConn(conn, opts)
- if err != nil {
- err = fmt.Errorf("initing conn %v: %w", len(conns), err)
- return
- }
- }
- return
-}
-
-type ConnPool interface {
- Get(context.Context) conn
- Put(conn)
- Close() error
- NumConns() int
-}
-
-func withPoolConn(pool ConnPool, with func(conn)) {
- c := pool.Get(context.TODO())
- defer pool.Put(c)
- with(c)
-}
-
-type provider struct {
- pool ConnPool
- writes chan<- writeRequest
- opts ProviderOpts
- closeMu sync.RWMutex
- closed bool
- closeErr error
-}
-
-var _ storage.ConsecutiveChunkReader = (*provider)(nil)
-
-func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
- p.closeMu.RLock()
- runner, err := p.getReadWithConnRunner()
- if err != nil {
- p.closeMu.RUnlock()
- return nil, err
- }
- r, w := io.Pipe()
- go func() {
- defer p.closeMu.RUnlock()
- err = runner(func(_ context.Context, conn conn) error {
- var written int64
- err = sqlitex.Exec(conn, `
- select
- data,
- cast(substr(name, ?+1) as integer) as offset
- from blob
- where name like ?||'%'
- order by offset`,
- func(stmt *sqlite.Stmt) error {
- offset := stmt.ColumnInt64(1)
- if offset != written {
- return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
- }
- // TODO: Avoid intermediate buffers here
- r := stmt.ColumnReader(0)
- w1, err := io.Copy(w, r)
- written += w1
- return err
- },
- len(prefix),
- prefix,
- )
- return err
- })
- w.CloseWithError(err)
- }()
- return r, nil
-}
-
-func (me *provider) Close() error {
- me.closeMu.Lock()
- defer me.closeMu.Unlock()
- if me.closed {
- return me.closeErr
- }
- if me.writes != nil {
- close(me.writes)
- }
- me.closeErr = me.pool.Close()
- me.closed = true
- return me.closeErr
-}
-
-type writeRequest struct {
- query withConn
- done chan<- error
- labels pprof.LabelSet
-}
-
-var expvars = expvar.NewMap("sqliteStorage")
-
-func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
- pprof.Do(context.Background(), labels, func(ctx context.Context) {
- // We pass in the context in the hope that the CPU profiler might incorporate sqlite
- // activity the action that triggered it. It doesn't seem that way, since those calls don't
- // take a context.Context themselves. It may come in useful in the goroutine profiles
- // though, and doesn't hurt to expose it here for other purposes should things change.
- err = query(ctx, conn)
- })
- return
-}
-
-// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
-// stronger typing on the writes channel.
-func providerWriter(writes <-chan writeRequest, pool ConnPool) {
- conn := pool.Get(context.TODO())
- if conn == nil {
- return
- }
- defer pool.Put(conn)
- for {
- first, ok := <-writes
- if !ok {
- return
- }
- var buf []func()
- var cantFail error
- func() {
- defer sqlitex.Save(conn)(&cantFail)
- firstErr := runQueryWithLabels(first.query, first.labels, conn)
- buf = append(buf, func() { first.done <- firstErr })
- for {
- select {
- case wr, ok := <-writes:
- if ok {
- err := runQueryWithLabels(wr.query, wr.labels, conn)
- buf = append(buf, func() { wr.done <- err })
- continue
- }
- default:
- }
- break
- }
- }()
- // Not sure what to do if this failed.
- if cantFail != nil {
- expvars.Add("batchTransactionErrors", 1)
- }
- // Signal done after we know the transaction succeeded.
- for _, done := range buf {
- done()
- }
- expvars.Add("batchTransactions", 1)
- expvars.Add("batchedQueries", int64(len(buf)))
- //log.Printf("batched %v write queries", len(buf))
- }
-}
-
-func (p *provider) NewInstance(s string) (resource.Instance, error) {
- return instance{s, p}, nil
-}
-
-type instance struct {
- location string
- p *provider
-}
-
-func getLabels(skip int) pprof.LabelSet {
- return pprof.Labels("sqlite-storage-action", func() string {
- var pcs [8]uintptr
- runtime.Callers(skip+3, pcs[:])
- fs := runtime.CallersFrames(pcs[:])
- f, _ := fs.Next()
- funcName := f.Func.Name()
- funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
- //log.Printf("func name: %q", funcName)
- return funcName
- }())
-}
-
-func (p *provider) withConn(with withConn, write bool, skip int) error {
- p.closeMu.RLock()
- // I think we need to check this here because it may not be valid to send to the writes channel
- // if we're already closed. So don't try to move this check into getReadWithConnRunner.
- if p.closed {
- p.closeMu.RUnlock()
- return errors.New("closed")
- }
- if write && p.opts.BatchWrites {
- done := make(chan error)
- p.writes <- writeRequest{
- query: with,
- done: done,
- labels: getLabels(skip + 1),
- }
- p.closeMu.RUnlock()
- return <-done
- } else {
- defer p.closeMu.RUnlock()
- runner, err := p.getReadWithConnRunner()
- if err != nil {
- return err
- }
- return runner(with)
- }
-}
-
-// Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
-// function, the runner *must* be used or the conn is leaked. You should check the provider isn't
-// closed before using this.
-func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
- conn := p.pool.Get(context.TODO())
- if conn == nil {
- err = errors.New("couldn't get pool conn")
- return
- }
- with = func(with withConn) error {
- defer p.pool.Put(conn)
- return runQueryWithLabels(with, getLabels(1), conn)
- }
- return
-}
-
-type withConn func(context.Context, conn) error
-
-func (i instance) withConn(with withConn, write bool) error {
- return i.p.withConn(with, write, 1)
-}
-
-func (i instance) getConn() *sqlite.Conn {
- return i.p.pool.Get(context.TODO())
-}
-
-func (i instance) putConn(conn *sqlite.Conn) {
- i.p.pool.Put(conn)
-}
-
-func (i instance) Readdirnames() (names []string, err error) {
- prefix := i.location + "/"
- err = i.withConn(func(_ context.Context, conn conn) error {
- return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
- names = append(names, stmt.ColumnText(0)[len(prefix):])
- return nil
- }, prefix+"%")
- }, false)
- //log.Printf("readdir %q gave %q", i.location, names)
- return
-}
-
-func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
- rows := 0
- err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
- rowid = stmt.ColumnInt64(0)
- rows++
- return nil
- }, i.location)
- if err != nil {
- return
- }
- if rows == 1 {
- return
- }
- if rows == 0 {
- err = errors.New("blob not found")
- return
- }
- panic(rows)
-}
-
-type connBlob struct {
- *sqlite.Blob
- onClose func()
-}
-
-func (me connBlob) Close() error {
- err := me.Blob.Close()
- me.onClose()
- return err
-}
-
-func (i instance) Get() (ret io.ReadCloser, err error) {
- conn := i.getConn()
- if conn == nil {
- panic("nil sqlite conn")
- }
- blob, err := i.openBlob(conn, false, true)
- if err != nil {
- i.putConn(conn)
- return
- }
- var once sync.Once
- return connBlob{blob, func() {
- once.Do(func() { i.putConn(conn) })
- }}, nil
-}
-
-func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
- rowid, err := i.getBlobRowid(conn)
- if err != nil {
- return nil, err
- }
- // This seems to cause locking issues with in-memory databases. Is it something to do with not
- // having WAL?
- if updateAccess {
- err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
- if err != nil {
- err = fmt.Errorf("updating last_used: %w", err)
- return nil, err
- }
- if conn.Changes() != 1 {
- panic(conn.Changes())
- }
- }
- return conn.OpenBlob("main", "blob", "data", rowid, write)
-}
-
-func (i instance) PutSized(reader io.Reader, size int64) (err error) {
- err = i.withConn(func(_ context.Context, conn conn) error {
- err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
- nil,
- i.location, size)
- if err != nil {
- return err
- }
- blob, err := i.openBlob(conn, true, false)
- if err != nil {
- return err
- }
- defer blob.Close()
- _, err = io.Copy(blob, reader)
- return err
- }, true)
- return
-}
-
-func (i instance) Put(reader io.Reader) (err error) {
- var buf bytes.Buffer
- _, err = io.Copy(&buf, reader)
- if err != nil {
- return err
- }
- if false {
- return i.PutSized(&buf, int64(buf.Len()))
- } else {
- return i.withConn(func(_ context.Context, conn conn) error {
- for range iter.N(10) {
- err = sqlitex.Exec(conn,
- "insert or replace into blob(name, data) values(?, cast(? as blob))",
- nil,
- i.location, buf.Bytes())
- if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
- log.Print("sqlite busy")
- time.Sleep(time.Second)
- continue
- }
- break
- }
- return err
- }, true)
- }
-}
-
-type fileInfo struct {
- size int64
-}
-
-func (f fileInfo) Name() string {
- panic("implement me")
-}
-
-func (f fileInfo) Size() int64 {
- return f.size
-}
-
-func (f fileInfo) Mode() os.FileMode {
- panic("implement me")
-}
-
-func (f fileInfo) ModTime() time.Time {
- panic("implement me")
-}
-
-func (f fileInfo) IsDir() bool {
- panic("implement me")
-}
-
-func (f fileInfo) Sys() interface{} {
- panic("implement me")
-}
-
-func (i instance) Stat() (ret os.FileInfo, err error) {
- err = i.withConn(func(_ context.Context, conn conn) error {
- var blob *sqlite.Blob
- blob, err = i.openBlob(conn, false, false)
- if err != nil {
- return err
- }
- defer blob.Close()
- ret = fileInfo{blob.Size()}
- return nil
- }, false)
- return
-}
-
-func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
- err = i.withConn(func(_ context.Context, conn conn) error {
- if false {
- var blob *sqlite.Blob
- blob, err = i.openBlob(conn, false, true)
- if err != nil {
- return err
- }
- defer blob.Close()
- if off >= blob.Size() {
- err = io.EOF
- return err
- }
- if off+int64(len(p)) > blob.Size() {
- p = p[:blob.Size()-off]
- }
- n, err = blob.ReadAt(p, off)
- } else {
- gotRow := false
- err = sqlitex.Exec(
- conn,
- "select substr(data, ?, ?) from blob where name=?",
- func(stmt *sqlite.Stmt) error {
- if gotRow {
- panic("found multiple matching blobs")
- } else {
- gotRow = true
- }
- n = stmt.ColumnBytes(0, p)
- return nil
- },
- off+1, len(p), i.location,
- )
- if err != nil {
- return err
- }
- if !gotRow {
- err = errors.New("blob not found")
- return err
- }
- if n < len(p) {
- err = io.EOF
- }
- }
- return nil
- }, false)
- return
-}
-
-func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
- panic("implement me")
-}
-
-func (i instance) Delete() error {
- return i.withConn(func(_ context.Context, conn conn) error {
- return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
- }, true)
-}
package sqliteStorage
import (
- "bytes"
- "context"
"errors"
"fmt"
- "io"
- "io/ioutil"
"path/filepath"
- "sync"
"testing"
"time"
_ "github.com/anacrolix/envpprof"
- "github.com/dustin/go-humanize"
- qt "github.com/frankban/quicktest"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
-
"github.com/anacrolix/torrent/storage"
test_storage "github.com/anacrolix/torrent/storage/test"
+ "github.com/dustin/go-humanize"
+ qt "github.com/frankban/quicktest"
)
-func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
- opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
- pool, err := NewPool(opts)
- qt.Assert(t, err, qt.IsNil)
- // sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now.
- //t.Cleanup(func() { pool.Close() })
- qt.Assert(t, initPoolDatabase(pool, InitDbOpts{}), qt.IsNil)
- if !opts.Memory && opts.SetJournalMode == "" {
- opts.SetJournalMode = "wal"
- }
- qt.Assert(t, initPoolConns(context.TODO(), pool, opts.InitConnOpts), qt.IsNil)
- prov, err := NewProvider(pool, ProviderOpts{BatchWrites: pool.NumConns() > 1})
- require.NoError(t, err)
- t.Cleanup(func() { prov.Close() })
- return pool, prov
-}
-
-func TestTextBlobSize(t *testing.T) {
- _, prov := newConnsAndProv(t, NewPoolOpts{})
- a, _ := prov.NewInstance("a")
- err := a.Put(bytes.NewBufferString("\x00hello"))
- qt.Assert(t, err, qt.IsNil)
- fi, err := a.Stat()
- qt.Assert(t, err, qt.IsNil)
- assert.EqualValues(t, 6, fi.Size())
-}
-
-func TestSimultaneousIncrementalBlob(t *testing.T) {
- _, p := newConnsAndProv(t, NewPoolOpts{
- NumConns: 3,
- })
- a, err := p.NewInstance("a")
- require.NoError(t, err)
- const contents = "hello, world"
- require.NoError(t, a.Put(bytes.NewReader([]byte("hello, world"))))
- rc1, err := a.Get()
- require.NoError(t, err)
- rc2, err := a.Get()
- require.NoError(t, err)
- var b1, b2 []byte
- var e1, e2 error
- var wg sync.WaitGroup
- doRead := func(b *[]byte, e *error, rc io.ReadCloser, n int) {
- defer wg.Done()
- defer rc.Close()
- *b, *e = ioutil.ReadAll(rc)
- require.NoError(t, *e, n)
- assert.EqualValues(t, contents, *b)
- }
- wg.Add(2)
- go doRead(&b2, &e2, rc2, 2)
- go doRead(&b1, &e1, rc1, 1)
- wg.Wait()
-}
-
func BenchmarkMarkComplete(b *testing.B) {
const pieceSize = test_storage.DefaultPieceSize
const noTriggers = false