]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Merge branch 'master' into squirrel
authorMatt Joiner <anacrolix@gmail.com>
Thu, 2 Sep 2021 04:19:29 +0000 (14:19 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 2 Sep 2021 04:19:29 +0000 (14:19 +1000)
go.mod
go.sum
storage/sqlite/deprecated.go [new file with mode: 0644]
storage/sqlite/direct.go
storage/sqlite/sql [deleted file]
storage/sqlite/sqlite-storage-cli/main.go [deleted file]
storage/sqlite/sqlite-storage.go [deleted file]
storage/sqlite/sqlite-storage_test.go

diff --git a/go.mod b/go.mod
index ea2b20bda76b84f3162c9ca265b9c965356c73c2..952d25a391b1ded7322897f6adc8ae9e8059c73e 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -15,6 +15,7 @@ require (
        github.com/anacrolix/missinggo/perf v1.0.0
        github.com/anacrolix/missinggo/v2 v2.5.2
        github.com/anacrolix/multiless v0.1.1-0.20210529082330-de2f6cf29619
+       github.com/anacrolix/squirrel v0.1.0
        github.com/anacrolix/sync v0.4.0
        github.com/anacrolix/tagflag v1.3.0
        github.com/anacrolix/upnp v0.1.2-0.20200416075019-5e9378ed1425
diff --git a/go.sum b/go.sum
index cc36b6bcc079ff197a30058dec047f31881e190d..39172aa75ad58e3c0f5aab2dfc2b6b466c9febbe 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -117,6 +117,8 @@ github.com/anacrolix/multiless v0.0.0-20210222022749-ef43011a77ec/go.mod h1:TrCL
 github.com/anacrolix/multiless v0.1.1-0.20210520040635-10ee7b5f3cff/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4=
 github.com/anacrolix/multiless v0.1.1-0.20210529082330-de2f6cf29619 h1:ZkusP2EHxvxm+IymiKJ8DBVE/E6fJkb8K/2+GXZpjAY=
 github.com/anacrolix/multiless v0.1.1-0.20210529082330-de2f6cf29619/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4=
+github.com/anacrolix/squirrel v0.1.0 h1:Zz7XUFUr2ozhsTvzwLdmrFpduoTHtBNTB/KZQ4Ivh00=
+github.com/anacrolix/squirrel v0.1.0/go.mod h1:YzgVvikMdFD441oTWlNG189bpKabO9Sbf3uCSVgca04=
 github.com/anacrolix/stm v0.1.0/go.mod h1:ZKz7e7ERWvP0KgL7WXfRjBXHNRhlVRlbBQecqFtPq+A=
 github.com/anacrolix/stm v0.1.1-0.20191106051447-e749ba3531cf/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQA4hsg=
 github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQA4hsg=
diff --git a/storage/sqlite/deprecated.go b/storage/sqlite/deprecated.go
new file mode 100644 (file)
index 0000000..a1b1df1
--- /dev/null
@@ -0,0 +1,7 @@
+package sqliteStorage
+
+import (
+       "github.com/anacrolix/squirrel"
+)
+
+type NewDirectStorageOpts = squirrel.NewCacheOpts
index 5e6b52ddb23d85600ec3879742b7914ce76b9f83..b36867fac3ce3457a0c9be2ad46b2c80ab5849d2 100644 (file)
 package sqliteStorage
 
 import (
-       "errors"
-       "fmt"
-       "runtime"
-       "sync"
-       "time"
+       "io"
 
        "crawshaw.io/sqlite"
-       "crawshaw.io/sqlite/sqlitex"
+       "github.com/anacrolix/squirrel"
 
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/storage"
 )
 
-type NewDirectStorageOpts struct {
-       NewConnOpts
-       InitDbOpts
-       InitConnOpts
-       GcBlobs           bool
-       NoCacheBlobs      bool
-       BlobFlushInterval time.Duration
-}
-
 // A convenience function that creates a connection pool, resource provider, and a pieces storage
 // ClientImpl and returns them all with a Close attached.
 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
-       conn, err := newConn(opts.NewConnOpts)
-       if err != nil {
-               return
-       }
-       if opts.PageSize == 0 {
-               // The largest size sqlite supports. I think we want this to be the smallest piece size we
-               // can expect, which is probably 1<<17.
-               opts.PageSize = 1 << 16
-       }
-       err = initDatabase(conn, opts.InitDbOpts)
-       if err != nil {
-               conn.Close()
-               return
-       }
-       err = initConn(conn, opts.InitConnOpts)
+       cache, err := squirrel.NewCache(opts)
        if err != nil {
-               conn.Close()
                return
        }
-       if opts.BlobFlushInterval == 0 && !opts.GcBlobs {
-               // This is influenced by typical busy timeouts, of 5-10s. We want to give other connections
-               // a few chances at getting a transaction through.
-               opts.BlobFlushInterval = time.Second
-       }
-       cl := &client{
-               conn:  conn,
-               blobs: make(map[string]*sqlite.Blob),
-               opts:  opts,
-       }
-       // Avoid race with cl.blobFlusherFunc
-       cl.l.Lock()
-       defer cl.l.Unlock()
-       if opts.BlobFlushInterval != 0 {
-               cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
-       }
-       cl.capacity = cl.getCapacity
-       return cl, nil
+       return &client{
+               cache,
+               cache.GetCapacity,
+       }, nil
 }
 
-func (cl *client) getCapacity() (ret *int64) {
-       cl.l.Lock()
-       defer cl.l.Unlock()
-       err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error {
-               ret = new(int64)
-               *ret = stmt.ColumnInt64(0)
-               return nil
-       })
-       if err != nil {
-               panic(err)
+func NewWrappingClient(cache *squirrel.Cache) storage.ClientImpl {
+       return &client{
+               cache,
+               cache.GetCapacity,
        }
-       return
 }
 
 type client struct {
-       l           sync.Mutex
-       conn        conn
-       blobs       map[string]*sqlite.Blob
-       blobFlusher *time.Timer
-       opts        NewDirectStorageOpts
-       closed      bool
-       capacity    func() *int64
-}
-
-func (c *client) blobFlusherFunc() {
-       c.l.Lock()
-       defer c.l.Unlock()
-       c.flushBlobs()
-       if !c.closed {
-               c.blobFlusher.Reset(c.opts.BlobFlushInterval)
-       }
-}
-
-func (c *client) flushBlobs() {
-       for key, b := range c.blobs {
-               // Need the lock to prevent racing with the GC finalizers.
-               b.Close()
-               delete(c.blobs, key)
-       }
+       *squirrel.Cache
+       capacity func() *int64
 }
 
 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
-       t := torrent{c}
+       t := torrent{c.Cache}
        return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil
 }
 
-func (c *client) Close() (err error) {
-       c.l.Lock()
-       defer c.l.Unlock()
-       c.flushBlobs()
-       if c.opts.BlobFlushInterval != 0 {
-               c.blobFlusher.Stop()
-       }
-       if !c.closed {
-               c.closed = true
-               err = c.conn.Close()
-               c.conn = nil
-       }
-       return
-}
-
 type torrent struct {
-       c *client
-}
-
-func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
-       rowidOk := false
-       err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
-               if rowidOk {
-                       panic("expected at most one row")
-               }
-               // TODO: How do we know if we got this wrong?
-               rowid = stmt.ColumnInt64(0)
-               rowidOk = true
-               return nil
-       }, name)
-       if err != nil {
-               return
-       }
-       if rowidOk {
-               return
-       }
-       if !create {
-               err = errors.New("no existing row")
-               return
-       }
-       err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
-       if err != nil {
-               return
-       }
-       rowid = c.LastInsertRowID()
-       return
+       c *squirrel.Cache
 }
 
 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
-       t.c.l.Lock()
-       defer t.c.l.Unlock()
-       name := p.Hash().HexString()
-       return piece{
-               name,
-               p.Length(),
-               t.c,
+       ret := piece{
+               sb: t.c.OpenWithLength(p.Hash().HexString(), p.Length()),
        }
+       ret.ReaderAt = &ret.sb
+       ret.WriterAt = &ret.sb
+       return ret
 }
 
 func (t torrent) Close() error {
@@ -178,130 +61,26 @@ func (t torrent) Close() error {
 }
 
 type piece struct {
-       name   string
-       length int64
-       *client
-}
-
-func (p piece) doAtIoWithBlob(
-       atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
-       b []byte,
-       off int64,
-       create bool,
-) (n int, err error) {
-       p.l.Lock()
-       defer p.l.Unlock()
-       if p.opts.NoCacheBlobs {
-               defer p.forgetBlob()
-       }
-       blob, err := p.getBlob(create)
-       if err != nil {
-               err = fmt.Errorf("getting blob: %w", err)
-               return
-       }
-       n, err = atIo(blob)(b, off)
-       if err == nil {
-               return
-       }
-       var se sqlite.Error
-       if !errors.As(err, &se) {
-               return
-       }
-       // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
-       // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
-       // because they may be attached to names that have since moved on to another blob.
-       if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
-               return
-       }
-       p.forgetBlob()
-       // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
-       // might be possible to skip to this version if we don't cache blobs.
-       blob, err = p.getBlob(create)
-       if err != nil {
-               err = fmt.Errorf("getting blob: %w", err)
-               return
-       }
-       return atIo(blob)(b, off)
-}
-
-func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
-       return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
-               return blob.ReadAt
-       }, b, off, false)
-}
-
-func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
-       return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
-               return blob.WriteAt
-       }, b, off, true)
+       sb squirrel.Blob
+       io.ReaderAt
+       io.WriterAt
 }
 
 func (p piece) MarkComplete() error {
-       p.l.Lock()
-       defer p.l.Unlock()
-       err := sqlitex.Exec(p.conn, "update blob set verified=true where name=?", nil, p.name)
-       if err != nil {
-               return err
-       }
-       changes := p.conn.Changes()
-       if changes != 1 {
-               panic(changes)
-       }
-       return nil
-}
-
-func (p piece) forgetBlob() {
-       blob, ok := p.blobs[p.name]
-       if !ok {
-               return
-       }
-       blob.Close()
-       delete(p.blobs, p.name)
+       return p.sb.SetTag("verified", true)
 }
 
 func (p piece) MarkNotComplete() error {
-       p.l.Lock()
-       defer p.l.Unlock()
-       return sqlitex.Exec(p.conn, "update blob set verified=false where name=?", nil, p.name)
+       return p.sb.SetTag("verified", false)
 }
 
 func (p piece) Completion() (ret storage.Completion) {
-       p.l.Lock()
-       defer p.l.Unlock()
-       err := sqlitex.Exec(p.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
+       err := p.sb.GetTag("verified", func(stmt *sqlite.Stmt) {
                ret.Complete = stmt.ColumnInt(0) != 0
-               return nil
-       }, p.name)
+       })
        ret.Ok = err == nil
        if err != nil {
                panic(err)
        }
        return
 }
-
-func (p piece) getBlob(create bool) (*sqlite.Blob, error) {
-       blob, ok := p.blobs[p.name]
-       if !ok {
-               rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
-               if err != nil {
-                       return nil, fmt.Errorf("getting rowid for blob: %w", err)
-               }
-               blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
-               if err != nil {
-                       panic(err)
-               }
-               if p.opts.GcBlobs {
-                       herp := new(byte)
-                       runtime.SetFinalizer(herp, func(*byte) {
-                               p.l.Lock()
-                               defer p.l.Unlock()
-                               // Note there's no guarantee that the finalizer fired while this blob is the same
-                               // one in the blob cache. It might be possible to rework this so that we check, or
-                               // strip finalizers as appropriate.
-                               blob.Close()
-                       })
-               }
-               p.blobs[p.name] = blob
-       }
-       return blob, nil
-}
diff --git a/storage/sqlite/sql b/storage/sqlite/sql
deleted file mode 100644 (file)
index 31a7668..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-pragma auto_vacuum=incremental;
-create table if not exists blob(
-       name text,
-       last_used timestamp default (datetime('now')),
-       data blob,
-       primary key (name)
-);
-
-create view if not exists deletable_blob as
-with recursive excess_blob(
-       usage_with,
-       last_used,
-       blob_rowid,
-       data_length
-) as (
-       select * from (select (select sum(length(data)) from blob) as usage_with, last_used, rowid, length(data) from blob order by last_used, rowid limit 1)
-               where usage_with >= (select value from setting where name='capacity')
-       union all
-       select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess_blob join blob
-               on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
-       where usage_with >= (select value from setting where name='capacity')
-) select * from excess;
-
-CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin
-       delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
-CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin
-       delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
diff --git a/storage/sqlite/sqlite-storage-cli/main.go b/storage/sqlite/sqlite-storage-cli/main.go
deleted file mode 100644 (file)
index 7db859f..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-//go:build cgo
-// +build cgo
-
-package main
-
-import (
-       "fmt"
-       "log"
-       "os"
-
-       "crawshaw.io/sqlite"
-       "github.com/alexflint/go-arg"
-
-       sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
-)
-
-type InitCommand struct {
-       Path string `arg:"positional"`
-}
-
-func main() {
-       err := mainErr()
-       if err != nil {
-               log.Printf("error in main: %v", err)
-               os.Exit(1)
-       }
-}
-
-func mainErr() error {
-       var args struct {
-               Init *InitCommand `arg:"subcommand"`
-       }
-       p := arg.MustParse(&args)
-       switch {
-       case args.Init != nil:
-               conn, err := sqlite.OpenConn(args.Init.Path, 0)
-               if err != nil {
-                       return fmt.Errorf("opening sqlite conn: %w", err)
-               }
-               defer conn.Close()
-               return sqliteStorage.InitSchema(conn, 1<<14, true)
-       default:
-               p.Fail("expected subcommand")
-               panic("unreachable")
-       }
-}
diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go
deleted file mode 100644 (file)
index 2cdf1e3..0000000
+++ /dev/null
@@ -1,866 +0,0 @@
-//go:build cgo
-// +build cgo
-
-package sqliteStorage
-
-import (
-       "bytes"
-       "context"
-       "errors"
-       "expvar"
-       "fmt"
-       "io"
-       "log"
-       "net/url"
-       "os"
-       "runtime"
-       "runtime/pprof"
-       "strings"
-       "sync"
-       "time"
-
-       "crawshaw.io/sqlite"
-       "crawshaw.io/sqlite/sqlitex"
-       "github.com/anacrolix/missinggo/iter"
-       "github.com/anacrolix/missinggo/v2/resource"
-
-       "github.com/anacrolix/torrent/storage"
-)
-
-type conn = *sqlite.Conn
-
-type InitConnOpts struct {
-       SetSynchronous int
-       SetJournalMode string
-       MmapSizeOk     bool  // If false, a package-specific default will be used.
-       MmapSize       int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
-}
-
-type UnexpectedJournalMode struct {
-       JournalMode string
-}
-
-func (me UnexpectedJournalMode) Error() string {
-       return fmt.Sprintf("unexpected journal mode: %q", me.JournalMode)
-}
-
-func setSynchronous(conn conn, syncInt int) (err error) {
-       err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma synchronous=%v`, syncInt), nil)
-       if err != nil {
-               return err
-       }
-       var (
-               actual   int
-               actualOk bool
-       )
-       err = sqlitex.ExecTransient(conn, `pragma synchronous`, func(stmt *sqlite.Stmt) error {
-               actual = stmt.ColumnInt(0)
-               actualOk = true
-               return nil
-       })
-       if err != nil {
-               return
-       }
-       if !actualOk {
-               return errors.New("synchronous setting query didn't return anything")
-       }
-       if actual != syncInt {
-               return fmt.Errorf("set synchronous %q, got %q", syncInt, actual)
-       }
-       return nil
-}
-
-func initConn(conn conn, opts InitConnOpts) (err error) {
-       err = setSynchronous(conn, opts.SetSynchronous)
-       if err != nil {
-               return
-       }
-       // Recursive triggers are required because we need to trim the blob_meta size after trimming to
-       // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
-       err = sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
-       if err != nil {
-               return err
-       }
-       if opts.SetJournalMode != "" {
-               err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
-                       ret := stmt.ColumnText(0)
-                       if ret != opts.SetJournalMode {
-                               return UnexpectedJournalMode{ret}
-                       }
-                       return nil
-               })
-               if err != nil {
-                       return err
-               }
-       }
-       if !opts.MmapSizeOk {
-               // Set the default. Currently it seems the library picks reasonable defaults, especially for
-               // wal.
-               opts.MmapSize = -1
-               //opts.MmapSize = 1 << 24 // 8 MiB
-       }
-       if opts.MmapSize >= 0 {
-               err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
-               if err != nil {
-                       return err
-               }
-       }
-       return nil
-}
-
-func setPageSize(conn conn, pageSize int) error {
-       if pageSize == 0 {
-               return nil
-       }
-       var retSize int64
-       err := sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma page_size=%d`, pageSize), nil)
-       if err != nil {
-               return err
-       }
-       err = sqlitex.ExecTransient(conn, "pragma page_size", func(stmt *sqlite.Stmt) error {
-               retSize = stmt.ColumnInt64(0)
-               return nil
-       })
-       if err != nil {
-               return err
-       }
-       if retSize != int64(pageSize) {
-               return fmt.Errorf("requested page size %v but got %v", pageSize, retSize)
-       }
-       return nil
-}
-
-func InitSchema(conn conn, pageSize int, triggers bool) error {
-       err := setPageSize(conn, pageSize)
-       if err != nil {
-               return fmt.Errorf("setting page size: %w", err)
-       }
-       err = sqlitex.ExecScript(conn, `
-               -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
-               -- can trim the database file size with partial vacuums without having to do a full vacuum, which
-               -- locks everything.
-               pragma auto_vacuum=incremental;
-
-               create table if not exists blob (
-                       name text,
-                       last_used timestamp default (datetime('now')),
-                       data blob,
-                       verified bool,
-                       primary key (name)
-               );
-
-               create table if not exists blob_meta (
-                       key text primary key,
-                       value
-               );
-
-               create index if not exists blob_last_used on blob(last_used);
-
-               -- While sqlite *seems* to be faster to get sum(length(data)) instead of
-               -- sum(length(data)), it may still require a large table scan at start-up or with a
-               -- cold-cache. With this we can be assured that it doesn't.
-               insert or ignore into blob_meta values ('size', 0);
-
-               create table if not exists setting (
-                       name primary key on conflict replace,
-                       value
-               );
-
-               create view if not exists deletable_blob as
-               with recursive excess (
-                       usage_with,
-                       last_used,
-                       blob_rowid,
-                       data_length
-               ) as (
-                       select *
-                       from (
-                               select
-                                       (select value from blob_meta where key='size') as usage_with,
-                                       last_used,
-                                       rowid,
-                                       length(data)
-                               from blob order by last_used, rowid limit 1
-                       )
-                       where usage_with > (select value from setting where name='capacity')
-                       union all
-                       select
-                               usage_with-data_length as new_usage_with,
-                               blob.last_used,
-                               blob.rowid,
-                               length(data)
-                       from excess join blob
-                       on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
-                       where new_usage_with > (select value from setting where name='capacity')
-               )
-               select * from excess;
-       `)
-       if err != nil {
-               return err
-       }
-       if triggers {
-               err := sqlitex.ExecScript(conn, `
-                       create trigger if not exists after_insert_blob
-                       after insert on blob
-                       begin
-                               update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
-                               delete from blob where rowid in (select blob_rowid from deletable_blob);
-                       end;
-
-                       create trigger if not exists after_update_blob
-                       after update of data on blob
-                       begin
-                               update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
-                               delete from blob where rowid in (select blob_rowid from deletable_blob);
-                       end;
-
-                       create trigger if not exists after_delete_blob
-                       after delete on blob
-                       begin
-                               update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
-                       end;
-               `)
-               if err != nil {
-                       return err
-               }
-       }
-       return nil
-}
-
-type NewPiecesStorageOpts struct {
-       NewPoolOpts
-       InitDbOpts
-       ProvOpts    func(*ProviderOpts)
-       StorageOpts func(*storage.ResourcePiecesOpts)
-}
-
-type NewPoolOpts struct {
-       NewConnOpts
-       InitConnOpts
-       NumConns int
-}
-
-type InitDbOpts struct {
-       DontInitSchema bool
-       PageSize       int
-       // If non-zero, overrides the existing setting.
-       Capacity   int64
-       NoTriggers bool
-}
-
-// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
-// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
-// top-level option type.
-type PoolConf struct {
-       NumConns    int
-       JournalMode string
-}
-
-// Remove any capacity limits.
-func UnlimitCapacity(conn conn) error {
-       return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
-}
-
-// Set the capacity limit to exactly this value.
-func SetCapacity(conn conn, cap int64) error {
-       return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
-}
-
-type NewConnOpts struct {
-       // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
-       // private, temporary on-disk database will be created. This private database will be
-       // automatically deleted as soon as the database connection is closed."
-       Path   string
-       Memory bool
-       // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
-       // and NumConns < 2.
-       NoConcurrentBlobReads bool
-}
-
-func newOpenUri(opts NewConnOpts) string {
-       path := url.PathEscape(opts.Path)
-       if opts.Memory {
-               path = ":memory:"
-       }
-       values := make(url.Values)
-       if opts.NoConcurrentBlobReads || opts.Memory {
-               values.Add("cache", "shared")
-       }
-       return fmt.Sprintf("file:%s?%s", path, values.Encode())
-}
-
-func initDatabase(conn conn, opts InitDbOpts) (err error) {
-       if !opts.DontInitSchema {
-               err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
-               if err != nil {
-                       return
-               }
-       }
-       if opts.Capacity != 0 {
-               err = SetCapacity(conn, opts.Capacity)
-               if err != nil {
-                       return
-               }
-       }
-       return
-}
-
-func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
-       withPoolConn(pool, func(c conn) {
-               err = initDatabase(c, opts)
-       })
-       return
-}
-
-// Go fmt, why you so shit?
-const openConnFlags = 0 |
-       sqlite.SQLITE_OPEN_READWRITE |
-       sqlite.SQLITE_OPEN_CREATE |
-       sqlite.SQLITE_OPEN_URI |
-       sqlite.SQLITE_OPEN_NOMUTEX
-
-func newConn(opts NewConnOpts) (conn, error) {
-       return sqlite.OpenConn(newOpenUri(opts), openConnFlags)
-}
-
-type poolWithNumConns struct {
-       *sqlitex.Pool
-       numConns int
-}
-
-func (me poolWithNumConns) NumConns() int {
-       return me.numConns
-}
-
-func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
-       if opts.NumConns == 0 {
-               opts.NumConns = runtime.NumCPU()
-       }
-       switch opts.NumConns {
-       case 1:
-               conn, err := newConn(opts.NewConnOpts)
-               return &poolFromConn{conn: conn}, err
-       default:
-               _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), openConnFlags, opts.NumConns)
-               return poolWithNumConns{_pool, opts.NumConns}, err
-       }
-}
-
-// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
-type poolFromConn struct {
-       mu   sync.Mutex
-       conn conn
-}
-
-func (me *poolFromConn) Get(ctx context.Context) conn {
-       me.mu.Lock()
-       return me.conn
-}
-
-func (me *poolFromConn) Put(conn conn) {
-       if conn != me.conn {
-               panic("expected to same conn")
-       }
-       me.mu.Unlock()
-}
-
-func (me *poolFromConn) Close() error {
-       return me.conn.Close()
-}
-
-func (me *poolFromConn) NumConns() int { return 1 }
-
-type ProviderOpts struct {
-       BatchWrites bool
-}
-
-// Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
-// the ConnPool (since it has to initialize all the connections anyway).
-func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
-       prov := &provider{pool: pool, opts: opts}
-       if opts.BatchWrites {
-               writes := make(chan writeRequest)
-               prov.writes = writes
-               // This is retained for backwards compatibility. It may not be necessary.
-               runtime.SetFinalizer(prov, func(p *provider) {
-                       p.Close()
-               })
-               go providerWriter(writes, prov.pool)
-       }
-       return prov, nil
-}
-
-type InitPoolOpts struct {
-       NumConns int
-       InitConnOpts
-}
-
-func initPoolConns(ctx context.Context, pool ConnPool, opts InitConnOpts) (err error) {
-       var conns []conn
-       defer func() {
-               for _, c := range conns {
-                       pool.Put(c)
-               }
-       }()
-       for range iter.N(pool.NumConns()) {
-               conn := pool.Get(ctx)
-               if conn == nil {
-                       break
-               }
-               conns = append(conns, conn)
-               err = initConn(conn, opts)
-               if err != nil {
-                       err = fmt.Errorf("initing conn %v: %w", len(conns), err)
-                       return
-               }
-       }
-       return
-}
-
-type ConnPool interface {
-       Get(context.Context) conn
-       Put(conn)
-       Close() error
-       NumConns() int
-}
-
-func withPoolConn(pool ConnPool, with func(conn)) {
-       c := pool.Get(context.TODO())
-       defer pool.Put(c)
-       with(c)
-}
-
-type provider struct {
-       pool     ConnPool
-       writes   chan<- writeRequest
-       opts     ProviderOpts
-       closeMu  sync.RWMutex
-       closed   bool
-       closeErr error
-}
-
-var _ storage.ConsecutiveChunkReader = (*provider)(nil)
-
-func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
-       p.closeMu.RLock()
-       runner, err := p.getReadWithConnRunner()
-       if err != nil {
-               p.closeMu.RUnlock()
-               return nil, err
-       }
-       r, w := io.Pipe()
-       go func() {
-               defer p.closeMu.RUnlock()
-               err = runner(func(_ context.Context, conn conn) error {
-                       var written int64
-                       err = sqlitex.Exec(conn, `
-                               select
-                                       data,
-                                       cast(substr(name, ?+1) as integer) as offset
-                               from blob
-                               where name like ?||'%'
-                               order by offset`,
-                               func(stmt *sqlite.Stmt) error {
-                                       offset := stmt.ColumnInt64(1)
-                                       if offset != written {
-                                               return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
-                                       }
-                                       // TODO: Avoid intermediate buffers here
-                                       r := stmt.ColumnReader(0)
-                                       w1, err := io.Copy(w, r)
-                                       written += w1
-                                       return err
-                               },
-                               len(prefix),
-                               prefix,
-                       )
-                       return err
-               })
-               w.CloseWithError(err)
-       }()
-       return r, nil
-}
-
-func (me *provider) Close() error {
-       me.closeMu.Lock()
-       defer me.closeMu.Unlock()
-       if me.closed {
-               return me.closeErr
-       }
-       if me.writes != nil {
-               close(me.writes)
-       }
-       me.closeErr = me.pool.Close()
-       me.closed = true
-       return me.closeErr
-}
-
-type writeRequest struct {
-       query  withConn
-       done   chan<- error
-       labels pprof.LabelSet
-}
-
-var expvars = expvar.NewMap("sqliteStorage")
-
-func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
-       pprof.Do(context.Background(), labels, func(ctx context.Context) {
-               // We pass in the context in the hope that the CPU profiler might incorporate sqlite
-               // activity the action that triggered it. It doesn't seem that way, since those calls don't
-               // take a context.Context themselves. It may come in useful in the goroutine profiles
-               // though, and doesn't hurt to expose it here for other purposes should things change.
-               err = query(ctx, conn)
-       })
-       return
-}
-
-// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
-// stronger typing on the writes channel.
-func providerWriter(writes <-chan writeRequest, pool ConnPool) {
-       conn := pool.Get(context.TODO())
-       if conn == nil {
-               return
-       }
-       defer pool.Put(conn)
-       for {
-               first, ok := <-writes
-               if !ok {
-                       return
-               }
-               var buf []func()
-               var cantFail error
-               func() {
-                       defer sqlitex.Save(conn)(&cantFail)
-                       firstErr := runQueryWithLabels(first.query, first.labels, conn)
-                       buf = append(buf, func() { first.done <- firstErr })
-                       for {
-                               select {
-                               case wr, ok := <-writes:
-                                       if ok {
-                                               err := runQueryWithLabels(wr.query, wr.labels, conn)
-                                               buf = append(buf, func() { wr.done <- err })
-                                               continue
-                                       }
-                               default:
-                               }
-                               break
-                       }
-               }()
-               // Not sure what to do if this failed.
-               if cantFail != nil {
-                       expvars.Add("batchTransactionErrors", 1)
-               }
-               // Signal done after we know the transaction succeeded.
-               for _, done := range buf {
-                       done()
-               }
-               expvars.Add("batchTransactions", 1)
-               expvars.Add("batchedQueries", int64(len(buf)))
-               //log.Printf("batched %v write queries", len(buf))
-       }
-}
-
-func (p *provider) NewInstance(s string) (resource.Instance, error) {
-       return instance{s, p}, nil
-}
-
-type instance struct {
-       location string
-       p        *provider
-}
-
-func getLabels(skip int) pprof.LabelSet {
-       return pprof.Labels("sqlite-storage-action", func() string {
-               var pcs [8]uintptr
-               runtime.Callers(skip+3, pcs[:])
-               fs := runtime.CallersFrames(pcs[:])
-               f, _ := fs.Next()
-               funcName := f.Func.Name()
-               funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
-               //log.Printf("func name: %q", funcName)
-               return funcName
-       }())
-}
-
-func (p *provider) withConn(with withConn, write bool, skip int) error {
-       p.closeMu.RLock()
-       // I think we need to check this here because it may not be valid to send to the writes channel
-       // if we're already closed. So don't try to move this check into getReadWithConnRunner.
-       if p.closed {
-               p.closeMu.RUnlock()
-               return errors.New("closed")
-       }
-       if write && p.opts.BatchWrites {
-               done := make(chan error)
-               p.writes <- writeRequest{
-                       query:  with,
-                       done:   done,
-                       labels: getLabels(skip + 1),
-               }
-               p.closeMu.RUnlock()
-               return <-done
-       } else {
-               defer p.closeMu.RUnlock()
-               runner, err := p.getReadWithConnRunner()
-               if err != nil {
-                       return err
-               }
-               return runner(with)
-       }
-}
-
-// Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
-// function, the runner *must* be used or the conn is leaked. You should check the provider isn't
-// closed before using this.
-func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
-       conn := p.pool.Get(context.TODO())
-       if conn == nil {
-               err = errors.New("couldn't get pool conn")
-               return
-       }
-       with = func(with withConn) error {
-               defer p.pool.Put(conn)
-               return runQueryWithLabels(with, getLabels(1), conn)
-       }
-       return
-}
-
-type withConn func(context.Context, conn) error
-
-func (i instance) withConn(with withConn, write bool) error {
-       return i.p.withConn(with, write, 1)
-}
-
-func (i instance) getConn() *sqlite.Conn {
-       return i.p.pool.Get(context.TODO())
-}
-
-func (i instance) putConn(conn *sqlite.Conn) {
-       i.p.pool.Put(conn)
-}
-
-func (i instance) Readdirnames() (names []string, err error) {
-       prefix := i.location + "/"
-       err = i.withConn(func(_ context.Context, conn conn) error {
-               return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
-                       names = append(names, stmt.ColumnText(0)[len(prefix):])
-                       return nil
-               }, prefix+"%")
-       }, false)
-       //log.Printf("readdir %q gave %q", i.location, names)
-       return
-}
-
-func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
-       rows := 0
-       err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
-               rowid = stmt.ColumnInt64(0)
-               rows++
-               return nil
-       }, i.location)
-       if err != nil {
-               return
-       }
-       if rows == 1 {
-               return
-       }
-       if rows == 0 {
-               err = errors.New("blob not found")
-               return
-       }
-       panic(rows)
-}
-
-type connBlob struct {
-       *sqlite.Blob
-       onClose func()
-}
-
-func (me connBlob) Close() error {
-       err := me.Blob.Close()
-       me.onClose()
-       return err
-}
-
-func (i instance) Get() (ret io.ReadCloser, err error) {
-       conn := i.getConn()
-       if conn == nil {
-               panic("nil sqlite conn")
-       }
-       blob, err := i.openBlob(conn, false, true)
-       if err != nil {
-               i.putConn(conn)
-               return
-       }
-       var once sync.Once
-       return connBlob{blob, func() {
-               once.Do(func() { i.putConn(conn) })
-       }}, nil
-}
-
-func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
-       rowid, err := i.getBlobRowid(conn)
-       if err != nil {
-               return nil, err
-       }
-       // This seems to cause locking issues with in-memory databases. Is it something to do with not
-       // having WAL?
-       if updateAccess {
-               err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
-               if err != nil {
-                       err = fmt.Errorf("updating last_used: %w", err)
-                       return nil, err
-               }
-               if conn.Changes() != 1 {
-                       panic(conn.Changes())
-               }
-       }
-       return conn.OpenBlob("main", "blob", "data", rowid, write)
-}
-
-func (i instance) PutSized(reader io.Reader, size int64) (err error) {
-       err = i.withConn(func(_ context.Context, conn conn) error {
-               err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
-                       nil,
-                       i.location, size)
-               if err != nil {
-                       return err
-               }
-               blob, err := i.openBlob(conn, true, false)
-               if err != nil {
-                       return err
-               }
-               defer blob.Close()
-               _, err = io.Copy(blob, reader)
-               return err
-       }, true)
-       return
-}
-
-func (i instance) Put(reader io.Reader) (err error) {
-       var buf bytes.Buffer
-       _, err = io.Copy(&buf, reader)
-       if err != nil {
-               return err
-       }
-       if false {
-               return i.PutSized(&buf, int64(buf.Len()))
-       } else {
-               return i.withConn(func(_ context.Context, conn conn) error {
-                       for range iter.N(10) {
-                               err = sqlitex.Exec(conn,
-                                       "insert or replace into blob(name, data) values(?, cast(? as blob))",
-                                       nil,
-                                       i.location, buf.Bytes())
-                               if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
-                                       log.Print("sqlite busy")
-                                       time.Sleep(time.Second)
-                                       continue
-                               }
-                               break
-                       }
-                       return err
-               }, true)
-       }
-}
-
-type fileInfo struct {
-       size int64
-}
-
-func (f fileInfo) Name() string {
-       panic("implement me")
-}
-
-func (f fileInfo) Size() int64 {
-       return f.size
-}
-
-func (f fileInfo) Mode() os.FileMode {
-       panic("implement me")
-}
-
-func (f fileInfo) ModTime() time.Time {
-       panic("implement me")
-}
-
-func (f fileInfo) IsDir() bool {
-       panic("implement me")
-}
-
-func (f fileInfo) Sys() interface{} {
-       panic("implement me")
-}
-
-func (i instance) Stat() (ret os.FileInfo, err error) {
-       err = i.withConn(func(_ context.Context, conn conn) error {
-               var blob *sqlite.Blob
-               blob, err = i.openBlob(conn, false, false)
-               if err != nil {
-                       return err
-               }
-               defer blob.Close()
-               ret = fileInfo{blob.Size()}
-               return nil
-       }, false)
-       return
-}
-
-func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
-       err = i.withConn(func(_ context.Context, conn conn) error {
-               if false {
-                       var blob *sqlite.Blob
-                       blob, err = i.openBlob(conn, false, true)
-                       if err != nil {
-                               return err
-                       }
-                       defer blob.Close()
-                       if off >= blob.Size() {
-                               err = io.EOF
-                               return err
-                       }
-                       if off+int64(len(p)) > blob.Size() {
-                               p = p[:blob.Size()-off]
-                       }
-                       n, err = blob.ReadAt(p, off)
-               } else {
-                       gotRow := false
-                       err = sqlitex.Exec(
-                               conn,
-                               "select substr(data, ?, ?) from blob where name=?",
-                               func(stmt *sqlite.Stmt) error {
-                                       if gotRow {
-                                               panic("found multiple matching blobs")
-                                       } else {
-                                               gotRow = true
-                                       }
-                                       n = stmt.ColumnBytes(0, p)
-                                       return nil
-                               },
-                               off+1, len(p), i.location,
-                       )
-                       if err != nil {
-                               return err
-                       }
-                       if !gotRow {
-                               err = errors.New("blob not found")
-                               return err
-                       }
-                       if n < len(p) {
-                               err = io.EOF
-                       }
-               }
-               return nil
-       }, false)
-       return
-}
-
-func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
-       panic("implement me")
-}
-
-func (i instance) Delete() error {
-       return i.withConn(func(_ context.Context, conn conn) error {
-               return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
-       }, true)
-}
index 7d80309f5187040ace75ce36c3af75ee1ad9a50c..43c743e39423afddf0f4bfb836b2b19a0936b850 100644 (file)
@@ -4,82 +4,20 @@
 package sqliteStorage
 
 import (
-       "bytes"
-       "context"
        "errors"
        "fmt"
-       "io"
-       "io/ioutil"
        "path/filepath"
-       "sync"
        "testing"
        "time"
 
        _ "github.com/anacrolix/envpprof"
-       "github.com/dustin/go-humanize"
-       qt "github.com/frankban/quicktest"
-       "github.com/stretchr/testify/assert"
-       "github.com/stretchr/testify/require"
-
+       "github.com/anacrolix/squirrel"
        "github.com/anacrolix/torrent/storage"
        test_storage "github.com/anacrolix/torrent/storage/test"
+       "github.com/dustin/go-humanize"
+       qt "github.com/frankban/quicktest"
 )
 
-func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
-       opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
-       pool, err := NewPool(opts)
-       qt.Assert(t, err, qt.IsNil)
-       // sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now.
-       //t.Cleanup(func() { pool.Close() })
-       qt.Assert(t, initPoolDatabase(pool, InitDbOpts{}), qt.IsNil)
-       if !opts.Memory && opts.SetJournalMode == "" {
-               opts.SetJournalMode = "wal"
-       }
-       qt.Assert(t, initPoolConns(context.TODO(), pool, opts.InitConnOpts), qt.IsNil)
-       prov, err := NewProvider(pool, ProviderOpts{BatchWrites: pool.NumConns() > 1})
-       require.NoError(t, err)
-       t.Cleanup(func() { prov.Close() })
-       return pool, prov
-}
-
-func TestTextBlobSize(t *testing.T) {
-       _, prov := newConnsAndProv(t, NewPoolOpts{})
-       a, _ := prov.NewInstance("a")
-       err := a.Put(bytes.NewBufferString("\x00hello"))
-       qt.Assert(t, err, qt.IsNil)
-       fi, err := a.Stat()
-       qt.Assert(t, err, qt.IsNil)
-       assert.EqualValues(t, 6, fi.Size())
-}
-
-func TestSimultaneousIncrementalBlob(t *testing.T) {
-       _, p := newConnsAndProv(t, NewPoolOpts{
-               NumConns: 3,
-       })
-       a, err := p.NewInstance("a")
-       require.NoError(t, err)
-       const contents = "hello, world"
-       require.NoError(t, a.Put(bytes.NewReader([]byte("hello, world"))))
-       rc1, err := a.Get()
-       require.NoError(t, err)
-       rc2, err := a.Get()
-       require.NoError(t, err)
-       var b1, b2 []byte
-       var e1, e2 error
-       var wg sync.WaitGroup
-       doRead := func(b *[]byte, e *error, rc io.ReadCloser, n int) {
-               defer wg.Done()
-               defer rc.Close()
-               *b, *e = ioutil.ReadAll(rc)
-               require.NoError(t, *e, n)
-               assert.EqualValues(t, contents, *b)
-       }
-       wg.Add(2)
-       go doRead(&b2, &e2, rc2, 2)
-       go doRead(&b1, &e1, rc1, 1)
-       wg.Wait()
-}
-
 func BenchmarkMarkComplete(b *testing.B) {
        const pieceSize = test_storage.DefaultPieceSize
        const noTriggers = false
@@ -93,7 +31,7 @@ func BenchmarkMarkComplete(b *testing.B) {
        }
        c := qt.New(b)
        b.Run("CustomDirect", func(b *testing.B) {
-               var opts NewDirectStorageOpts
+               var opts squirrel.NewCacheOpts
                opts.Capacity = capacity
                opts.NoTriggers = noTriggers
                benchOpts := func(b *testing.B) {
@@ -117,7 +55,7 @@ func BenchmarkMarkComplete(b *testing.B) {
                                directBench := func(b *testing.B) {
                                        opts.Path = filepath.Join(b.TempDir(), "storage.db")
                                        ci, err := NewDirectStorage(opts)
-                                       var ujm UnexpectedJournalMode
+                                       var ujm squirrel.ErrUnexpectedJournalMode
                                        if errors.As(err, &ujm) {
                                                b.Skipf("setting journal mode %q: %v", opts.SetJournalMode, err)
                                        }