From 19d5905b6c3092214ad711ca51b2d678b8bdd65b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 25 Aug 2021 12:35:07 +1000 Subject: [PATCH] Removed unused sqlite "provider" storage --- storage/sqlite/direct.go | 11 +- storage/sqlite/sqlite-storage.go | 598 +------------------------- storage/sqlite/sqlite-storage_test.go | 67 +-- 3 files changed, 14 insertions(+), 662 deletions(-) diff --git a/storage/sqlite/direct.go b/storage/sqlite/direct.go index dd3f524d..05d6dd8c 100644 --- a/storage/sqlite/direct.go +++ b/storage/sqlite/direct.go @@ -176,11 +176,12 @@ func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, } func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl { - ret := piece{sb: SquirrelBlob{ - p.Hash().HexString(), - p.Length(), - t.c, - }, + ret := piece{ + sb: SquirrelBlob{ + p.Hash().HexString(), + p.Length(), + t.c, + }, } ret.ReaderAt = &ret.sb ret.WriterAt = &ret.sb diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index 38a8afcc..f8381649 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -4,28 +4,13 @@ package sqliteStorage import ( - "bytes" - "context" _ "embed" "errors" - "expvar" "fmt" - "io" - "log" "net/url" - "os" - "runtime" - "runtime/pprof" - "strings" - "sync" - "time" "crawshaw.io/sqlite" "crawshaw.io/sqlite/sqlitex" - "github.com/anacrolix/missinggo/iter" - "github.com/anacrolix/missinggo/v2/resource" - - "github.com/anacrolix/torrent/storage" ) type conn = *sqlite.Conn @@ -160,19 +145,6 @@ func InitSchema(conn conn, pageSize int, triggers bool) error { return nil } -type NewPiecesStorageOpts struct { - NewPoolOpts - InitDbOpts - ProvOpts func(*ProviderOpts) - StorageOpts func(*storage.ResourcePiecesOpts) -} - -type NewPoolOpts struct { - NewConnOpts - InitConnOpts - NumConns int -} - type InitDbOpts struct { DontInitSchema bool PageSize int @@ -181,21 +153,13 @@ type InitDbOpts struct { NoTriggers bool } -// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now, -// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the -// top-level option type. -type PoolConf struct { - NumConns int - JournalMode string -} - // Remove any capacity limits. -func UnlimitCapacity(conn conn) error { +func unlimitCapacity(conn conn) error { return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil) } // Set the capacity limit to exactly this value. -func SetCapacity(conn conn, cap int64) error { +func setCapacity(conn conn, cap int64) error { return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap) } @@ -229,22 +193,14 @@ func initDatabase(conn conn, opts InitDbOpts) (err error) { return } } - if opts.Capacity != 0 { - err = SetCapacity(conn, opts.Capacity) - if err != nil { - return - } + if opts.Capacity < 0 { + err = unlimitCapacity(conn) + } else if opts.Capacity > 0 { + err = setCapacity(conn, opts.Capacity) } return } -func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) { - withPoolConn(pool, func(c conn) { - err = initDatabase(c, opts) - }) - return -} - // Go fmt, why you so shit? const openConnFlags = 0 | sqlite.SQLITE_OPEN_READWRITE | @@ -255,545 +211,3 @@ const openConnFlags = 0 | func newConn(opts NewConnOpts) (conn, error) { return sqlite.OpenConn(newOpenUri(opts), openConnFlags) } - -type poolWithNumConns struct { - *sqlitex.Pool - numConns int -} - -func (me poolWithNumConns) NumConns() int { - return me.numConns -} - -func NewPool(opts NewPoolOpts) (_ ConnPool, err error) { - if opts.NumConns == 0 { - opts.NumConns = runtime.NumCPU() - } - switch opts.NumConns { - case 1: - conn, err := newConn(opts.NewConnOpts) - return &poolFromConn{conn: conn}, err - default: - _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), openConnFlags, opts.NumConns) - return poolWithNumConns{_pool, opts.NumConns}, err - } -} - -// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool. -type poolFromConn struct { - mu sync.Mutex - conn conn -} - -func (me *poolFromConn) Get(ctx context.Context) conn { - me.mu.Lock() - return me.conn -} - -func (me *poolFromConn) Put(conn conn) { - if conn != me.conn { - panic("expected to same conn") - } - me.mu.Unlock() -} - -func (me *poolFromConn) Close() error { - return me.conn.Close() -} - -func (me *poolFromConn) NumConns() int { return 1 } - -type ProviderOpts struct { - BatchWrites bool -} - -// Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of -// the ConnPool (since it has to initialize all the connections anyway). -func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) { - prov := &provider{pool: pool, opts: opts} - if opts.BatchWrites { - writes := make(chan writeRequest) - prov.writes = writes - // This is retained for backwards compatibility. It may not be necessary. - runtime.SetFinalizer(prov, func(p *provider) { - p.Close() - }) - go providerWriter(writes, prov.pool) - } - return prov, nil -} - -type InitPoolOpts struct { - NumConns int - InitConnOpts -} - -func initPoolConns(ctx context.Context, pool ConnPool, opts InitConnOpts) (err error) { - var conns []conn - defer func() { - for _, c := range conns { - pool.Put(c) - } - }() - for range iter.N(pool.NumConns()) { - conn := pool.Get(ctx) - if conn == nil { - break - } - conns = append(conns, conn) - err = initConn(conn, opts) - if err != nil { - err = fmt.Errorf("initing conn %v: %w", len(conns), err) - return - } - } - return -} - -type ConnPool interface { - Get(context.Context) conn - Put(conn) - Close() error - NumConns() int -} - -func withPoolConn(pool ConnPool, with func(conn)) { - c := pool.Get(context.TODO()) - defer pool.Put(c) - with(c) -} - -type provider struct { - pool ConnPool - writes chan<- writeRequest - opts ProviderOpts - closeMu sync.RWMutex - closed bool - closeErr error -} - -var _ storage.ConsecutiveChunkReader = (*provider)(nil) - -func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) { - p.closeMu.RLock() - runner, err := p.getReadWithConnRunner() - if err != nil { - p.closeMu.RUnlock() - return nil, err - } - r, w := io.Pipe() - go func() { - defer p.closeMu.RUnlock() - err = runner(func(_ context.Context, conn conn) error { - var written int64 - err = sqlitex.Exec(conn, ` - select - data, - cast(substr(name, ?+1) as integer) as offset - from blob - where name like ?||'%' - order by offset`, - func(stmt *sqlite.Stmt) error { - offset := stmt.ColumnInt64(1) - if offset != written { - return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written) - } - // TODO: Avoid intermediate buffers here - r := stmt.ColumnReader(0) - w1, err := io.Copy(w, r) - written += w1 - return err - }, - len(prefix), - prefix, - ) - return err - }) - w.CloseWithError(err) - }() - return r, nil -} - -func (me *provider) Close() error { - me.closeMu.Lock() - defer me.closeMu.Unlock() - if me.closed { - return me.closeErr - } - if me.writes != nil { - close(me.writes) - } - me.closeErr = me.pool.Close() - me.closed = true - return me.closeErr -} - -type writeRequest struct { - query withConn - done chan<- error - labels pprof.LabelSet -} - -var expvars = expvar.NewMap("sqliteStorage") - -func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) { - pprof.Do(context.Background(), labels, func(ctx context.Context) { - // We pass in the context in the hope that the CPU profiler might incorporate sqlite - // activity the action that triggered it. It doesn't seem that way, since those calls don't - // take a context.Context themselves. It may come in useful in the goroutine profiles - // though, and doesn't hurt to expose it here for other purposes should things change. - err = query(ctx, conn) - }) - return -} - -// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have -// stronger typing on the writes channel. -func providerWriter(writes <-chan writeRequest, pool ConnPool) { - conn := pool.Get(context.TODO()) - if conn == nil { - return - } - defer pool.Put(conn) - for { - first, ok := <-writes - if !ok { - return - } - var buf []func() - var cantFail error - func() { - defer sqlitex.Save(conn)(&cantFail) - firstErr := runQueryWithLabels(first.query, first.labels, conn) - buf = append(buf, func() { first.done <- firstErr }) - for { - select { - case wr, ok := <-writes: - if ok { - err := runQueryWithLabels(wr.query, wr.labels, conn) - buf = append(buf, func() { wr.done <- err }) - continue - } - default: - } - break - } - }() - // Not sure what to do if this failed. - if cantFail != nil { - expvars.Add("batchTransactionErrors", 1) - } - // Signal done after we know the transaction succeeded. - for _, done := range buf { - done() - } - expvars.Add("batchTransactions", 1) - expvars.Add("batchedQueries", int64(len(buf))) - //log.Printf("batched %v write queries", len(buf)) - } -} - -func (p *provider) NewInstance(s string) (resource.Instance, error) { - return instance{s, p}, nil -} - -type instance struct { - location string - p *provider -} - -func getLabels(skip int) pprof.LabelSet { - return pprof.Labels("sqlite-storage-action", func() string { - var pcs [8]uintptr - runtime.Callers(skip+3, pcs[:]) - fs := runtime.CallersFrames(pcs[:]) - f, _ := fs.Next() - funcName := f.Func.Name() - funcName = funcName[strings.LastIndexByte(funcName, '.')+1:] - //log.Printf("func name: %q", funcName) - return funcName - }()) -} - -func (p *provider) withConn(with withConn, write bool, skip int) error { - p.closeMu.RLock() - // I think we need to check this here because it may not be valid to send to the writes channel - // if we're already closed. So don't try to move this check into getReadWithConnRunner. - if p.closed { - p.closeMu.RUnlock() - return errors.New("closed") - } - if write && p.opts.BatchWrites { - done := make(chan error) - p.writes <- writeRequest{ - query: with, - done: done, - labels: getLabels(skip + 1), - } - p.closeMu.RUnlock() - return <-done - } else { - defer p.closeMu.RUnlock() - runner, err := p.getReadWithConnRunner() - if err != nil { - return err - } - return runner(with) - } -} - -// Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this -// function, the runner *must* be used or the conn is leaked. You should check the provider isn't -// closed before using this. -func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) { - conn := p.pool.Get(context.TODO()) - if conn == nil { - err = errors.New("couldn't get pool conn") - return - } - with = func(with withConn) error { - defer p.pool.Put(conn) - return runQueryWithLabels(with, getLabels(1), conn) - } - return -} - -type withConn func(context.Context, conn) error - -func (i instance) withConn(with withConn, write bool) error { - return i.p.withConn(with, write, 1) -} - -func (i instance) getConn() *sqlite.Conn { - return i.p.pool.Get(context.TODO()) -} - -func (i instance) putConn(conn *sqlite.Conn) { - i.p.pool.Put(conn) -} - -func (i instance) Readdirnames() (names []string, err error) { - prefix := i.location + "/" - err = i.withConn(func(_ context.Context, conn conn) error { - return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error { - names = append(names, stmt.ColumnText(0)[len(prefix):]) - return nil - }, prefix+"%") - }, false) - //log.Printf("readdir %q gave %q", i.location, names) - return -} - -func (i instance) getBlobRowid(conn conn) (rowid int64, err error) { - rows := 0 - err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error { - rowid = stmt.ColumnInt64(0) - rows++ - return nil - }, i.location) - if err != nil { - return - } - if rows == 1 { - return - } - if rows == 0 { - err = errors.New("blob not found") - return - } - panic(rows) -} - -type connBlob struct { - *sqlite.Blob - onClose func() -} - -func (me connBlob) Close() error { - err := me.Blob.Close() - me.onClose() - return err -} - -func (i instance) Get() (ret io.ReadCloser, err error) { - conn := i.getConn() - if conn == nil { - panic("nil sqlite conn") - } - blob, err := i.openBlob(conn, false, true) - if err != nil { - i.putConn(conn) - return - } - var once sync.Once - return connBlob{blob, func() { - once.Do(func() { i.putConn(conn) }) - }}, nil -} - -func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) { - rowid, err := i.getBlobRowid(conn) - if err != nil { - return nil, err - } - // This seems to cause locking issues with in-memory databases. Is it something to do with not - // having WAL? - if updateAccess { - err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid) - if err != nil { - err = fmt.Errorf("updating last_used: %w", err) - return nil, err - } - if conn.Changes() != 1 { - panic(conn.Changes()) - } - } - return conn.OpenBlob("main", "blob", "data", rowid, write) -} - -func (i instance) PutSized(reader io.Reader, size int64) (err error) { - err = i.withConn(func(_ context.Context, conn conn) error { - err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))", - nil, - i.location, size) - if err != nil { - return err - } - blob, err := i.openBlob(conn, true, false) - if err != nil { - return err - } - defer blob.Close() - _, err = io.Copy(blob, reader) - return err - }, true) - return -} - -func (i instance) Put(reader io.Reader) (err error) { - var buf bytes.Buffer - _, err = io.Copy(&buf, reader) - if err != nil { - return err - } - if false { - return i.PutSized(&buf, int64(buf.Len())) - } else { - return i.withConn(func(_ context.Context, conn conn) error { - for range iter.N(10) { - err = sqlitex.Exec(conn, - "insert or replace into blob(name, data) values(?, cast(? as blob))", - nil, - i.location, buf.Bytes()) - if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY { - log.Print("sqlite busy") - time.Sleep(time.Second) - continue - } - break - } - return err - }, true) - } -} - -type fileInfo struct { - size int64 -} - -func (f fileInfo) Name() string { - panic("implement me") -} - -func (f fileInfo) Size() int64 { - return f.size -} - -func (f fileInfo) Mode() os.FileMode { - panic("implement me") -} - -func (f fileInfo) ModTime() time.Time { - panic("implement me") -} - -func (f fileInfo) IsDir() bool { - panic("implement me") -} - -func (f fileInfo) Sys() interface{} { - panic("implement me") -} - -func (i instance) Stat() (ret os.FileInfo, err error) { - err = i.withConn(func(_ context.Context, conn conn) error { - var blob *sqlite.Blob - blob, err = i.openBlob(conn, false, false) - if err != nil { - return err - } - defer blob.Close() - ret = fileInfo{blob.Size()} - return nil - }, false) - return -} - -func (i instance) ReadAt(p []byte, off int64) (n int, err error) { - err = i.withConn(func(_ context.Context, conn conn) error { - if false { - var blob *sqlite.Blob - blob, err = i.openBlob(conn, false, true) - if err != nil { - return err - } - defer blob.Close() - if off >= blob.Size() { - err = io.EOF - return err - } - if off+int64(len(p)) > blob.Size() { - p = p[:blob.Size()-off] - } - n, err = blob.ReadAt(p, off) - } else { - gotRow := false - err = sqlitex.Exec( - conn, - "select substr(data, ?, ?) from blob where name=?", - func(stmt *sqlite.Stmt) error { - if gotRow { - panic("found multiple matching blobs") - } else { - gotRow = true - } - n = stmt.ColumnBytes(0, p) - return nil - }, - off+1, len(p), i.location, - ) - if err != nil { - return err - } - if !gotRow { - err = errors.New("blob not found") - return err - } - if n < len(p) { - err = io.EOF - } - } - return nil - }, false) - return -} - -func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) { - panic("implement me") -} - -func (i instance) Delete() error { - return i.withConn(func(_ context.Context, conn conn) error { - return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location) - }, true) -} diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go index 7d80309f..1013a8b2 100644 --- a/storage/sqlite/sqlite-storage_test.go +++ b/storage/sqlite/sqlite-storage_test.go @@ -4,82 +4,19 @@ package sqliteStorage import ( - "bytes" - "context" "errors" "fmt" - "io" - "io/ioutil" "path/filepath" - "sync" "testing" "time" _ "github.com/anacrolix/envpprof" - "github.com/dustin/go-humanize" - qt "github.com/frankban/quicktest" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/anacrolix/torrent/storage" test_storage "github.com/anacrolix/torrent/storage/test" + "github.com/dustin/go-humanize" + qt "github.com/frankban/quicktest" ) -func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) { - opts.Path = filepath.Join(t.TempDir(), "sqlite3.db") - pool, err := NewPool(opts) - qt.Assert(t, err, qt.IsNil) - // sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now. - //t.Cleanup(func() { pool.Close() }) - qt.Assert(t, initPoolDatabase(pool, InitDbOpts{}), qt.IsNil) - if !opts.Memory && opts.SetJournalMode == "" { - opts.SetJournalMode = "wal" - } - qt.Assert(t, initPoolConns(context.TODO(), pool, opts.InitConnOpts), qt.IsNil) - prov, err := NewProvider(pool, ProviderOpts{BatchWrites: pool.NumConns() > 1}) - require.NoError(t, err) - t.Cleanup(func() { prov.Close() }) - return pool, prov -} - -func TestTextBlobSize(t *testing.T) { - _, prov := newConnsAndProv(t, NewPoolOpts{}) - a, _ := prov.NewInstance("a") - err := a.Put(bytes.NewBufferString("\x00hello")) - qt.Assert(t, err, qt.IsNil) - fi, err := a.Stat() - qt.Assert(t, err, qt.IsNil) - assert.EqualValues(t, 6, fi.Size()) -} - -func TestSimultaneousIncrementalBlob(t *testing.T) { - _, p := newConnsAndProv(t, NewPoolOpts{ - NumConns: 3, - }) - a, err := p.NewInstance("a") - require.NoError(t, err) - const contents = "hello, world" - require.NoError(t, a.Put(bytes.NewReader([]byte("hello, world")))) - rc1, err := a.Get() - require.NoError(t, err) - rc2, err := a.Get() - require.NoError(t, err) - var b1, b2 []byte - var e1, e2 error - var wg sync.WaitGroup - doRead := func(b *[]byte, e *error, rc io.ReadCloser, n int) { - defer wg.Done() - defer rc.Close() - *b, *e = ioutil.ReadAll(rc) - require.NoError(t, *e, n) - assert.EqualValues(t, contents, *b) - } - wg.Add(2) - go doRead(&b2, &e2, rc2, 2) - go doRead(&b1, &e1, rc1, 1) - wg.Wait() -} - func BenchmarkMarkComplete(b *testing.B) { const pieceSize = test_storage.DefaultPieceSize const noTriggers = false -- 2.48.1