]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Removed unused sqlite "provider" storage
authorMatt Joiner <anacrolix@gmail.com>
Wed, 25 Aug 2021 02:35:07 +0000 (12:35 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 25 Aug 2021 02:35:07 +0000 (12:35 +1000)
storage/sqlite/direct.go
storage/sqlite/sqlite-storage.go
storage/sqlite/sqlite-storage_test.go

index dd3f524d94ba6d82f6cea1a1167c0d4e506ab134..05d6dd8caa6dc6ea72bdbeba2dc51a2d44f60acc 100644 (file)
@@ -176,11 +176,12 @@ func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64,
 }
 
 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
-       ret := piece{sb: SquirrelBlob{
-               p.Hash().HexString(),
-               p.Length(),
-               t.c,
-       },
+       ret := piece{
+               sb: SquirrelBlob{
+                       p.Hash().HexString(),
+                       p.Length(),
+                       t.c,
+               },
        }
        ret.ReaderAt = &ret.sb
        ret.WriterAt = &ret.sb
index 38a8afccc8006bc9b35a2effe8442859ee28b162..f8381649677a79d5e9fdd4b399e5fa8dd1315d32 100644 (file)
@@ -4,28 +4,13 @@
 package sqliteStorage
 
 import (
-       "bytes"
-       "context"
        _ "embed"
        "errors"
-       "expvar"
        "fmt"
-       "io"
-       "log"
        "net/url"
-       "os"
-       "runtime"
-       "runtime/pprof"
-       "strings"
-       "sync"
-       "time"
 
        "crawshaw.io/sqlite"
        "crawshaw.io/sqlite/sqlitex"
-       "github.com/anacrolix/missinggo/iter"
-       "github.com/anacrolix/missinggo/v2/resource"
-
-       "github.com/anacrolix/torrent/storage"
 )
 
 type conn = *sqlite.Conn
@@ -160,19 +145,6 @@ func InitSchema(conn conn, pageSize int, triggers bool) error {
        return nil
 }
 
-type NewPiecesStorageOpts struct {
-       NewPoolOpts
-       InitDbOpts
-       ProvOpts    func(*ProviderOpts)
-       StorageOpts func(*storage.ResourcePiecesOpts)
-}
-
-type NewPoolOpts struct {
-       NewConnOpts
-       InitConnOpts
-       NumConns int
-}
-
 type InitDbOpts struct {
        DontInitSchema bool
        PageSize       int
@@ -181,21 +153,13 @@ type InitDbOpts struct {
        NoTriggers bool
 }
 
-// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
-// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
-// top-level option type.
-type PoolConf struct {
-       NumConns    int
-       JournalMode string
-}
-
 // Remove any capacity limits.
-func UnlimitCapacity(conn conn) error {
+func unlimitCapacity(conn conn) error {
        return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
 }
 
 // Set the capacity limit to exactly this value.
-func SetCapacity(conn conn, cap int64) error {
+func setCapacity(conn conn, cap int64) error {
        return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
 }
 
@@ -229,22 +193,14 @@ func initDatabase(conn conn, opts InitDbOpts) (err error) {
                        return
                }
        }
-       if opts.Capacity != 0 {
-               err = SetCapacity(conn, opts.Capacity)
-               if err != nil {
-                       return
-               }
+       if opts.Capacity < 0 {
+               err = unlimitCapacity(conn)
+       } else if opts.Capacity > 0 {
+               err = setCapacity(conn, opts.Capacity)
        }
        return
 }
 
-func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
-       withPoolConn(pool, func(c conn) {
-               err = initDatabase(c, opts)
-       })
-       return
-}
-
 // Go fmt, why you so shit?
 const openConnFlags = 0 |
        sqlite.SQLITE_OPEN_READWRITE |
@@ -255,545 +211,3 @@ const openConnFlags = 0 |
 func newConn(opts NewConnOpts) (conn, error) {
        return sqlite.OpenConn(newOpenUri(opts), openConnFlags)
 }
-
-type poolWithNumConns struct {
-       *sqlitex.Pool
-       numConns int
-}
-
-func (me poolWithNumConns) NumConns() int {
-       return me.numConns
-}
-
-func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
-       if opts.NumConns == 0 {
-               opts.NumConns = runtime.NumCPU()
-       }
-       switch opts.NumConns {
-       case 1:
-               conn, err := newConn(opts.NewConnOpts)
-               return &poolFromConn{conn: conn}, err
-       default:
-               _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), openConnFlags, opts.NumConns)
-               return poolWithNumConns{_pool, opts.NumConns}, err
-       }
-}
-
-// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
-type poolFromConn struct {
-       mu   sync.Mutex
-       conn conn
-}
-
-func (me *poolFromConn) Get(ctx context.Context) conn {
-       me.mu.Lock()
-       return me.conn
-}
-
-func (me *poolFromConn) Put(conn conn) {
-       if conn != me.conn {
-               panic("expected to same conn")
-       }
-       me.mu.Unlock()
-}
-
-func (me *poolFromConn) Close() error {
-       return me.conn.Close()
-}
-
-func (me *poolFromConn) NumConns() int { return 1 }
-
-type ProviderOpts struct {
-       BatchWrites bool
-}
-
-// Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
-// the ConnPool (since it has to initialize all the connections anyway).
-func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
-       prov := &provider{pool: pool, opts: opts}
-       if opts.BatchWrites {
-               writes := make(chan writeRequest)
-               prov.writes = writes
-               // This is retained for backwards compatibility. It may not be necessary.
-               runtime.SetFinalizer(prov, func(p *provider) {
-                       p.Close()
-               })
-               go providerWriter(writes, prov.pool)
-       }
-       return prov, nil
-}
-
-type InitPoolOpts struct {
-       NumConns int
-       InitConnOpts
-}
-
-func initPoolConns(ctx context.Context, pool ConnPool, opts InitConnOpts) (err error) {
-       var conns []conn
-       defer func() {
-               for _, c := range conns {
-                       pool.Put(c)
-               }
-       }()
-       for range iter.N(pool.NumConns()) {
-               conn := pool.Get(ctx)
-               if conn == nil {
-                       break
-               }
-               conns = append(conns, conn)
-               err = initConn(conn, opts)
-               if err != nil {
-                       err = fmt.Errorf("initing conn %v: %w", len(conns), err)
-                       return
-               }
-       }
-       return
-}
-
-type ConnPool interface {
-       Get(context.Context) conn
-       Put(conn)
-       Close() error
-       NumConns() int
-}
-
-func withPoolConn(pool ConnPool, with func(conn)) {
-       c := pool.Get(context.TODO())
-       defer pool.Put(c)
-       with(c)
-}
-
-type provider struct {
-       pool     ConnPool
-       writes   chan<- writeRequest
-       opts     ProviderOpts
-       closeMu  sync.RWMutex
-       closed   bool
-       closeErr error
-}
-
-var _ storage.ConsecutiveChunkReader = (*provider)(nil)
-
-func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
-       p.closeMu.RLock()
-       runner, err := p.getReadWithConnRunner()
-       if err != nil {
-               p.closeMu.RUnlock()
-               return nil, err
-       }
-       r, w := io.Pipe()
-       go func() {
-               defer p.closeMu.RUnlock()
-               err = runner(func(_ context.Context, conn conn) error {
-                       var written int64
-                       err = sqlitex.Exec(conn, `
-                               select
-                                       data,
-                                       cast(substr(name, ?+1) as integer) as offset
-                               from blob
-                               where name like ?||'%'
-                               order by offset`,
-                               func(stmt *sqlite.Stmt) error {
-                                       offset := stmt.ColumnInt64(1)
-                                       if offset != written {
-                                               return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
-                                       }
-                                       // TODO: Avoid intermediate buffers here
-                                       r := stmt.ColumnReader(0)
-                                       w1, err := io.Copy(w, r)
-                                       written += w1
-                                       return err
-                               },
-                               len(prefix),
-                               prefix,
-                       )
-                       return err
-               })
-               w.CloseWithError(err)
-       }()
-       return r, nil
-}
-
-func (me *provider) Close() error {
-       me.closeMu.Lock()
-       defer me.closeMu.Unlock()
-       if me.closed {
-               return me.closeErr
-       }
-       if me.writes != nil {
-               close(me.writes)
-       }
-       me.closeErr = me.pool.Close()
-       me.closed = true
-       return me.closeErr
-}
-
-type writeRequest struct {
-       query  withConn
-       done   chan<- error
-       labels pprof.LabelSet
-}
-
-var expvars = expvar.NewMap("sqliteStorage")
-
-func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
-       pprof.Do(context.Background(), labels, func(ctx context.Context) {
-               // We pass in the context in the hope that the CPU profiler might incorporate sqlite
-               // activity the action that triggered it. It doesn't seem that way, since those calls don't
-               // take a context.Context themselves. It may come in useful in the goroutine profiles
-               // though, and doesn't hurt to expose it here for other purposes should things change.
-               err = query(ctx, conn)
-       })
-       return
-}
-
-// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
-// stronger typing on the writes channel.
-func providerWriter(writes <-chan writeRequest, pool ConnPool) {
-       conn := pool.Get(context.TODO())
-       if conn == nil {
-               return
-       }
-       defer pool.Put(conn)
-       for {
-               first, ok := <-writes
-               if !ok {
-                       return
-               }
-               var buf []func()
-               var cantFail error
-               func() {
-                       defer sqlitex.Save(conn)(&cantFail)
-                       firstErr := runQueryWithLabels(first.query, first.labels, conn)
-                       buf = append(buf, func() { first.done <- firstErr })
-                       for {
-                               select {
-                               case wr, ok := <-writes:
-                                       if ok {
-                                               err := runQueryWithLabels(wr.query, wr.labels, conn)
-                                               buf = append(buf, func() { wr.done <- err })
-                                               continue
-                                       }
-                               default:
-                               }
-                               break
-                       }
-               }()
-               // Not sure what to do if this failed.
-               if cantFail != nil {
-                       expvars.Add("batchTransactionErrors", 1)
-               }
-               // Signal done after we know the transaction succeeded.
-               for _, done := range buf {
-                       done()
-               }
-               expvars.Add("batchTransactions", 1)
-               expvars.Add("batchedQueries", int64(len(buf)))
-               //log.Printf("batched %v write queries", len(buf))
-       }
-}
-
-func (p *provider) NewInstance(s string) (resource.Instance, error) {
-       return instance{s, p}, nil
-}
-
-type instance struct {
-       location string
-       p        *provider
-}
-
-func getLabels(skip int) pprof.LabelSet {
-       return pprof.Labels("sqlite-storage-action", func() string {
-               var pcs [8]uintptr
-               runtime.Callers(skip+3, pcs[:])
-               fs := runtime.CallersFrames(pcs[:])
-               f, _ := fs.Next()
-               funcName := f.Func.Name()
-               funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
-               //log.Printf("func name: %q", funcName)
-               return funcName
-       }())
-}
-
-func (p *provider) withConn(with withConn, write bool, skip int) error {
-       p.closeMu.RLock()
-       // I think we need to check this here because it may not be valid to send to the writes channel
-       // if we're already closed. So don't try to move this check into getReadWithConnRunner.
-       if p.closed {
-               p.closeMu.RUnlock()
-               return errors.New("closed")
-       }
-       if write && p.opts.BatchWrites {
-               done := make(chan error)
-               p.writes <- writeRequest{
-                       query:  with,
-                       done:   done,
-                       labels: getLabels(skip + 1),
-               }
-               p.closeMu.RUnlock()
-               return <-done
-       } else {
-               defer p.closeMu.RUnlock()
-               runner, err := p.getReadWithConnRunner()
-               if err != nil {
-                       return err
-               }
-               return runner(with)
-       }
-}
-
-// Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
-// function, the runner *must* be used or the conn is leaked. You should check the provider isn't
-// closed before using this.
-func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
-       conn := p.pool.Get(context.TODO())
-       if conn == nil {
-               err = errors.New("couldn't get pool conn")
-               return
-       }
-       with = func(with withConn) error {
-               defer p.pool.Put(conn)
-               return runQueryWithLabels(with, getLabels(1), conn)
-       }
-       return
-}
-
-type withConn func(context.Context, conn) error
-
-func (i instance) withConn(with withConn, write bool) error {
-       return i.p.withConn(with, write, 1)
-}
-
-func (i instance) getConn() *sqlite.Conn {
-       return i.p.pool.Get(context.TODO())
-}
-
-func (i instance) putConn(conn *sqlite.Conn) {
-       i.p.pool.Put(conn)
-}
-
-func (i instance) Readdirnames() (names []string, err error) {
-       prefix := i.location + "/"
-       err = i.withConn(func(_ context.Context, conn conn) error {
-               return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
-                       names = append(names, stmt.ColumnText(0)[len(prefix):])
-                       return nil
-               }, prefix+"%")
-       }, false)
-       //log.Printf("readdir %q gave %q", i.location, names)
-       return
-}
-
-func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
-       rows := 0
-       err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
-               rowid = stmt.ColumnInt64(0)
-               rows++
-               return nil
-       }, i.location)
-       if err != nil {
-               return
-       }
-       if rows == 1 {
-               return
-       }
-       if rows == 0 {
-               err = errors.New("blob not found")
-               return
-       }
-       panic(rows)
-}
-
-type connBlob struct {
-       *sqlite.Blob
-       onClose func()
-}
-
-func (me connBlob) Close() error {
-       err := me.Blob.Close()
-       me.onClose()
-       return err
-}
-
-func (i instance) Get() (ret io.ReadCloser, err error) {
-       conn := i.getConn()
-       if conn == nil {
-               panic("nil sqlite conn")
-       }
-       blob, err := i.openBlob(conn, false, true)
-       if err != nil {
-               i.putConn(conn)
-               return
-       }
-       var once sync.Once
-       return connBlob{blob, func() {
-               once.Do(func() { i.putConn(conn) })
-       }}, nil
-}
-
-func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
-       rowid, err := i.getBlobRowid(conn)
-       if err != nil {
-               return nil, err
-       }
-       // This seems to cause locking issues with in-memory databases. Is it something to do with not
-       // having WAL?
-       if updateAccess {
-               err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
-               if err != nil {
-                       err = fmt.Errorf("updating last_used: %w", err)
-                       return nil, err
-               }
-               if conn.Changes() != 1 {
-                       panic(conn.Changes())
-               }
-       }
-       return conn.OpenBlob("main", "blob", "data", rowid, write)
-}
-
-func (i instance) PutSized(reader io.Reader, size int64) (err error) {
-       err = i.withConn(func(_ context.Context, conn conn) error {
-               err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
-                       nil,
-                       i.location, size)
-               if err != nil {
-                       return err
-               }
-               blob, err := i.openBlob(conn, true, false)
-               if err != nil {
-                       return err
-               }
-               defer blob.Close()
-               _, err = io.Copy(blob, reader)
-               return err
-       }, true)
-       return
-}
-
-func (i instance) Put(reader io.Reader) (err error) {
-       var buf bytes.Buffer
-       _, err = io.Copy(&buf, reader)
-       if err != nil {
-               return err
-       }
-       if false {
-               return i.PutSized(&buf, int64(buf.Len()))
-       } else {
-               return i.withConn(func(_ context.Context, conn conn) error {
-                       for range iter.N(10) {
-                               err = sqlitex.Exec(conn,
-                                       "insert or replace into blob(name, data) values(?, cast(? as blob))",
-                                       nil,
-                                       i.location, buf.Bytes())
-                               if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
-                                       log.Print("sqlite busy")
-                                       time.Sleep(time.Second)
-                                       continue
-                               }
-                               break
-                       }
-                       return err
-               }, true)
-       }
-}
-
-type fileInfo struct {
-       size int64
-}
-
-func (f fileInfo) Name() string {
-       panic("implement me")
-}
-
-func (f fileInfo) Size() int64 {
-       return f.size
-}
-
-func (f fileInfo) Mode() os.FileMode {
-       panic("implement me")
-}
-
-func (f fileInfo) ModTime() time.Time {
-       panic("implement me")
-}
-
-func (f fileInfo) IsDir() bool {
-       panic("implement me")
-}
-
-func (f fileInfo) Sys() interface{} {
-       panic("implement me")
-}
-
-func (i instance) Stat() (ret os.FileInfo, err error) {
-       err = i.withConn(func(_ context.Context, conn conn) error {
-               var blob *sqlite.Blob
-               blob, err = i.openBlob(conn, false, false)
-               if err != nil {
-                       return err
-               }
-               defer blob.Close()
-               ret = fileInfo{blob.Size()}
-               return nil
-       }, false)
-       return
-}
-
-func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
-       err = i.withConn(func(_ context.Context, conn conn) error {
-               if false {
-                       var blob *sqlite.Blob
-                       blob, err = i.openBlob(conn, false, true)
-                       if err != nil {
-                               return err
-                       }
-                       defer blob.Close()
-                       if off >= blob.Size() {
-                               err = io.EOF
-                               return err
-                       }
-                       if off+int64(len(p)) > blob.Size() {
-                               p = p[:blob.Size()-off]
-                       }
-                       n, err = blob.ReadAt(p, off)
-               } else {
-                       gotRow := false
-                       err = sqlitex.Exec(
-                               conn,
-                               "select substr(data, ?, ?) from blob where name=?",
-                               func(stmt *sqlite.Stmt) error {
-                                       if gotRow {
-                                               panic("found multiple matching blobs")
-                                       } else {
-                                               gotRow = true
-                                       }
-                                       n = stmt.ColumnBytes(0, p)
-                                       return nil
-                               },
-                               off+1, len(p), i.location,
-                       )
-                       if err != nil {
-                               return err
-                       }
-                       if !gotRow {
-                               err = errors.New("blob not found")
-                               return err
-                       }
-                       if n < len(p) {
-                               err = io.EOF
-                       }
-               }
-               return nil
-       }, false)
-       return
-}
-
-func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
-       panic("implement me")
-}
-
-func (i instance) Delete() error {
-       return i.withConn(func(_ context.Context, conn conn) error {
-               return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
-       }, true)
-}
index 7d80309f5187040ace75ce36c3af75ee1ad9a50c..1013a8b2377cd598d1c160e603743181483904fa 100644 (file)
@@ -4,82 +4,19 @@
 package sqliteStorage
 
 import (
-       "bytes"
-       "context"
        "errors"
        "fmt"
-       "io"
-       "io/ioutil"
        "path/filepath"
-       "sync"
        "testing"
        "time"
 
        _ "github.com/anacrolix/envpprof"
-       "github.com/dustin/go-humanize"
-       qt "github.com/frankban/quicktest"
-       "github.com/stretchr/testify/assert"
-       "github.com/stretchr/testify/require"
-
        "github.com/anacrolix/torrent/storage"
        test_storage "github.com/anacrolix/torrent/storage/test"
+       "github.com/dustin/go-humanize"
+       qt "github.com/frankban/quicktest"
 )
 
-func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
-       opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
-       pool, err := NewPool(opts)
-       qt.Assert(t, err, qt.IsNil)
-       // sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now.
-       //t.Cleanup(func() { pool.Close() })
-       qt.Assert(t, initPoolDatabase(pool, InitDbOpts{}), qt.IsNil)
-       if !opts.Memory && opts.SetJournalMode == "" {
-               opts.SetJournalMode = "wal"
-       }
-       qt.Assert(t, initPoolConns(context.TODO(), pool, opts.InitConnOpts), qt.IsNil)
-       prov, err := NewProvider(pool, ProviderOpts{BatchWrites: pool.NumConns() > 1})
-       require.NoError(t, err)
-       t.Cleanup(func() { prov.Close() })
-       return pool, prov
-}
-
-func TestTextBlobSize(t *testing.T) {
-       _, prov := newConnsAndProv(t, NewPoolOpts{})
-       a, _ := prov.NewInstance("a")
-       err := a.Put(bytes.NewBufferString("\x00hello"))
-       qt.Assert(t, err, qt.IsNil)
-       fi, err := a.Stat()
-       qt.Assert(t, err, qt.IsNil)
-       assert.EqualValues(t, 6, fi.Size())
-}
-
-func TestSimultaneousIncrementalBlob(t *testing.T) {
-       _, p := newConnsAndProv(t, NewPoolOpts{
-               NumConns: 3,
-       })
-       a, err := p.NewInstance("a")
-       require.NoError(t, err)
-       const contents = "hello, world"
-       require.NoError(t, a.Put(bytes.NewReader([]byte("hello, world"))))
-       rc1, err := a.Get()
-       require.NoError(t, err)
-       rc2, err := a.Get()
-       require.NoError(t, err)
-       var b1, b2 []byte
-       var e1, e2 error
-       var wg sync.WaitGroup
-       doRead := func(b *[]byte, e *error, rc io.ReadCloser, n int) {
-               defer wg.Done()
-               defer rc.Close()
-               *b, *e = ioutil.ReadAll(rc)
-               require.NoError(t, *e, n)
-               assert.EqualValues(t, contents, *b)
-       }
-       wg.Add(2)
-       go doRead(&b2, &e2, rc2, 2)
-       go doRead(&b1, &e1, rc1, 1)
-       wg.Wait()
-}
-
 func BenchmarkMarkComplete(b *testing.B) {
        const pieceSize = test_storage.DefaultPieceSize
        const noTriggers = false