From d6fcf7a32a807dfaa3f5c81ec33f7c3427d6ca28 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 25 Aug 2021 14:37:00 +1000 Subject: [PATCH] Use separate squirrel module --- go.mod | 1 + storage/sqlite/deprecated.go | 7 + storage/sqlite/direct.go | 271 +--------------------- storage/sqlite/init-triggers.sql | 25 -- storage/sqlite/init.sql | 64 ----- storage/sqlite/sql | 29 --- storage/sqlite/sqlite-storage-cli/main.go | 46 ---- storage/sqlite/sqlite-storage.go | 213 ----------------- storage/sqlite/sqlite-storage_test.go | 5 +- 9 files changed, 23 insertions(+), 638 deletions(-) create mode 100644 storage/sqlite/deprecated.go delete mode 100644 storage/sqlite/init-triggers.sql delete mode 100644 storage/sqlite/init.sql delete mode 100644 storage/sqlite/sql delete mode 100644 storage/sqlite/sqlite-storage-cli/main.go delete mode 100644 storage/sqlite/sqlite-storage.go diff --git a/go.mod b/go.mod index 5a45857a..2d4ccaa3 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/anacrolix/missinggo/perf v1.0.0 github.com/anacrolix/missinggo/v2 v2.5.2 github.com/anacrolix/multiless v0.1.1-0.20210529082330-de2f6cf29619 + github.com/anacrolix/squirrel v0.0.0-00010101000000-000000000000 github.com/anacrolix/sync v0.4.0 github.com/anacrolix/tagflag v1.3.0 github.com/anacrolix/upnp v0.1.2-0.20200416075019-5e9378ed1425 diff --git a/storage/sqlite/deprecated.go b/storage/sqlite/deprecated.go new file mode 100644 index 00000000..a1b1df14 --- /dev/null +++ b/storage/sqlite/deprecated.go @@ -0,0 +1,7 @@ +package sqliteStorage + +import ( + "github.com/anacrolix/squirrel" +) + +type NewDirectStorageOpts = squirrel.NewCacheOpts diff --git a/storage/sqlite/direct.go b/storage/sqlite/direct.go index 05d6dd8c..918bf7ef 100644 --- a/storage/sqlite/direct.go +++ b/storage/sqlite/direct.go @@ -4,180 +4,44 @@ package sqliteStorage import ( - "errors" - "fmt" "io" - "runtime" - "sync" - "time" "crawshaw.io/sqlite" - "crawshaw.io/sqlite/sqlitex" + "github.com/anacrolix/squirrel" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/storage" ) -type NewDirectStorageOpts struct { - NewConnOpts - InitDbOpts - InitConnOpts - GcBlobs bool - NoCacheBlobs bool - BlobFlushInterval time.Duration -} - -func NewSquirrelCache(opts NewDirectStorageOpts) (_ *SquirrelCache, err error) { - conn, err := newConn(opts.NewConnOpts) - if err != nil { - return - } - if opts.PageSize == 0 { - // The largest size sqlite supports. I think we want this to be the smallest SquirrelBlob size we - // can expect, which is probably 1<<17. - opts.PageSize = 1 << 16 - } - err = initDatabase(conn, opts.InitDbOpts) - if err != nil { - conn.Close() - return - } - err = initConn(conn, opts.InitConnOpts) - if err != nil { - conn.Close() - return - } - if opts.BlobFlushInterval == 0 && !opts.GcBlobs { - // This is influenced by typical busy timeouts, of 5-10s. We want to give other connections - // a few chances at getting a transaction through. - opts.BlobFlushInterval = time.Second - } - cl := &SquirrelCache{ - conn: conn, - blobs: make(map[string]*sqlite.Blob), - opts: opts, - } - // Avoid race with cl.blobFlusherFunc - cl.l.Lock() - defer cl.l.Unlock() - if opts.BlobFlushInterval != 0 { - cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc) - } - cl.capacity = cl.getCapacity - return cl, nil -} - // A convenience function that creates a connection pool, resource provider, and a pieces storage // ClientImpl and returns them all with a Close attached. func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) { - cache, err := NewSquirrelCache(opts) + cache, err := squirrel.NewCache(opts) if err != nil { return } - return client{cache}, nil -} - -func (cl *SquirrelCache) getCapacity() (ret *int64) { - cl.l.Lock() - defer cl.l.Unlock() - err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error { - ret = new(int64) - *ret = stmt.ColumnInt64(0) - return nil - }) - if err != nil { - panic(err) - } - return -} - -type SquirrelCache struct { - l sync.Mutex - conn conn - blobs map[string]*sqlite.Blob - blobFlusher *time.Timer - opts NewDirectStorageOpts - closed bool - capacity func() *int64 + return &client{ + cache, + cache.GetCapacity}, nil } type client struct { - *SquirrelCache -} - -func (c *SquirrelCache) blobFlusherFunc() { - c.l.Lock() - defer c.l.Unlock() - c.flushBlobs() - if !c.closed { - c.blobFlusher.Reset(c.opts.BlobFlushInterval) - } -} - -func (c *SquirrelCache) flushBlobs() { - for key, b := range c.blobs { - // Need the lock to prevent racing with the GC finalizers. - b.Close() - delete(c.blobs, key) - } + *squirrel.Cache + capacity func() *int64 } -func (c client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { - t := torrent{c.SquirrelCache} +func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { + t := torrent{c.Cache} return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil } -func (c *SquirrelCache) Close() (err error) { - c.l.Lock() - defer c.l.Unlock() - c.flushBlobs() - if c.opts.BlobFlushInterval != 0 { - c.blobFlusher.Stop() - } - if !c.closed { - c.closed = true - err = c.conn.Close() - c.conn = nil - } - return -} - type torrent struct { - c *SquirrelCache -} - -func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) { - rowidOk := false - err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error { - if rowidOk { - panic("expected at most one row") - } - // TODO: How do we know if we got this wrong? - rowid = stmt.ColumnInt64(0) - rowidOk = true - return nil - }, name) - if err != nil { - return - } - if rowidOk { - return - } - if !create { - err = errors.New("no existing row") - return - } - err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length) - if err != nil { - return - } - rowid = c.LastInsertRowID() - return + c *squirrel.Cache } func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl { ret := piece{ - sb: SquirrelBlob{ + sb: squirrel.Blob{ p.Hash().HexString(), p.Length(), t.c, @@ -192,104 +56,20 @@ func (t torrent) Close() error { return nil } -type SquirrelBlob struct { - name string - length int64 - *SquirrelCache -} - type piece struct { - sb SquirrelBlob + sb squirrel.Blob io.ReaderAt io.WriterAt } -func (p SquirrelBlob) doAtIoWithBlob( - atIo func(*sqlite.Blob) func([]byte, int64) (int, error), - b []byte, - off int64, - create bool, -) (n int, err error) { - p.l.Lock() - defer p.l.Unlock() - if p.opts.NoCacheBlobs { - defer p.forgetBlob() - } - blob, err := p.getBlob(create) - if err != nil { - err = fmt.Errorf("getting blob: %w", err) - return - } - n, err = atIo(blob)(b, off) - if err == nil { - return - } - var se sqlite.Error - if !errors.As(err, &se) { - return - } - // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs - // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers, - // because they may be attached to names that have since moved on to another blob. - if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") { - return - } - p.forgetBlob() - // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It - // might be possible to skip to this version if we don't cache blobs. - blob, err = p.getBlob(create) - if err != nil { - err = fmt.Errorf("getting blob: %w", err) - return - } - return atIo(blob)(b, off) -} - -func (p SquirrelBlob) ReadAt(b []byte, off int64) (n int, err error) { - return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) { - return blob.ReadAt - }, b, off, false) -} - -func (p SquirrelBlob) WriteAt(b []byte, off int64) (n int, err error) { - return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) { - return blob.WriteAt - }, b, off, true) -} - -func (p SquirrelBlob) SetTag(name string, value interface{}) error { - p.l.Lock() - defer p.l.Unlock() - return sqlitex.Exec(p.conn, "insert or replace into tag (blob_name, tag_name, value) values (?, ?, ?)", nil, - p.name, name, value) -} - func (p piece) MarkComplete() error { return p.sb.SetTag("verified", true) } -func (p SquirrelBlob) forgetBlob() { - blob, ok := p.blobs[p.name] - if !ok { - return - } - blob.Close() - delete(p.blobs, p.name) -} - func (p piece) MarkNotComplete() error { return p.sb.SetTag("verified", false) } -func (p SquirrelBlob) GetTag(name string, result func(*sqlite.Stmt)) error { - p.l.Lock() - defer p.l.Unlock() - return sqlitex.Exec(p.conn, "select value from tag where blob_name=? and tag_name=?", func(stmt *sqlite.Stmt) error { - result(stmt) - return nil - }, p.name, name) -} - func (p piece) Completion() (ret storage.Completion) { err := p.sb.GetTag("verified", func(stmt *sqlite.Stmt) { ret.Complete = stmt.ColumnInt(0) != 0 @@ -300,30 +80,3 @@ func (p piece) Completion() (ret storage.Completion) { } return } - -func (p SquirrelBlob) getBlob(create bool) (*sqlite.Blob, error) { - blob, ok := p.blobs[p.name] - if !ok { - rowid, err := rowidForBlob(p.conn, p.name, p.length, create) - if err != nil { - return nil, fmt.Errorf("getting rowid for blob: %w", err) - } - blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true) - if err != nil { - panic(err) - } - if p.opts.GcBlobs { - herp := new(byte) - runtime.SetFinalizer(herp, func(*byte) { - p.l.Lock() - defer p.l.Unlock() - // Note there's no guarantee that the finalizer fired while this blob is the same - // one in the blob cache. It might be possible to rework this so that we check, or - // strip finalizers as appropriate. - blob.Close() - }) - } - p.blobs[p.name] = blob - } - return blob, nil -} diff --git a/storage/sqlite/init-triggers.sql b/storage/sqlite/init-triggers.sql deleted file mode 100644 index 596486c8..00000000 --- a/storage/sqlite/init-triggers.sql +++ /dev/null @@ -1,25 +0,0 @@ -create trigger if not exists delete_blob_tags_before_blob_deleted -before delete on blob -begin - delete from tag where blob_name=old.name; -end; - -create trigger if not exists after_insert_blob -after insert on blob -begin - update blob_meta set value=value+length(cast(new.data as blob)) where key='size'; - delete from blob where rowid in (select blob_rowid from deletable_blob); -end; - -create trigger if not exists after_update_blob -after update of data on blob -begin - update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size'; - delete from blob where rowid in (select blob_rowid from deletable_blob); -end; - -create trigger if not exists after_delete_blob -after delete on blob -begin - update blob_meta set value=value-length(cast(old.data as blob)) where key='size'; -end; diff --git a/storage/sqlite/init.sql b/storage/sqlite/init.sql deleted file mode 100644 index c8e88b06..00000000 --- a/storage/sqlite/init.sql +++ /dev/null @@ -1,64 +0,0 @@ --- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we --- can trim the database file size with partial vacuums without having to do a full vacuum, which --- locks everything. -pragma auto_vacuum=incremental; - -create table if not exists blob ( - name text, - last_used timestamp default (datetime('now')), - data blob, - primary key (name) -); - -create table if not exists blob_meta ( - key text primary key, - value -); - -create index if not exists blob_last_used on blob(last_used); - --- While sqlite *seems* to be faster to get sum(length(data)) instead of --- sum(length(data)), it may still require a large table scan at start-up or with a --- cold-cache. With this we can be assured that it doesn't. -insert or ignore into blob_meta values ('size', 0); - -create table if not exists setting ( - name primary key on conflict replace, - value -); - -create table if not exists tag ( - blob_name references blob(name), - tag_name, - value, - primary key (blob_name, tag_name) -); - -create view if not exists deletable_blob as -with recursive excess ( - usage_with, - last_used, - blob_rowid, - data_length -) as ( - select * - from ( - select - (select value from blob_meta where key='size') as usage_with, - last_used, - rowid, - length(data) - from blob order by last_used, rowid limit 1 - ) - where usage_with > (select value from setting where name='capacity') - union all - select - usage_with-data_length as new_usage_with, - blob.last_used, - blob.rowid, - length(data) - from excess join blob - on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid)) - where new_usage_with > (select value from setting where name='capacity') -) -select * from excess; diff --git a/storage/sqlite/sql b/storage/sqlite/sql deleted file mode 100644 index 31a76686..00000000 --- a/storage/sqlite/sql +++ /dev/null @@ -1,29 +0,0 @@ -pragma auto_vacuum=incremental; -create table if not exists blob( - name text, - last_used timestamp default (datetime('now')), - data blob, - primary key (name) -); - -create view if not exists deletable_blob as -with recursive excess_blob( - usage_with, - last_used, - blob_rowid, - data_length -) as ( - select * from (select (select sum(length(data)) from blob) as usage_with, last_used, rowid, length(data) from blob order by last_used, rowid limit 1) - where usage_with >= (select value from setting where name='capacity') - union all - select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess_blob join blob - on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid)) - where usage_with >= (select value from setting where name='capacity') -) select * from excess; - -CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin - delete from blob where rowid in (select blob_rowid from deletable_blob); -end; -CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin - delete from blob where rowid in (select blob_rowid from deletable_blob); -end; diff --git a/storage/sqlite/sqlite-storage-cli/main.go b/storage/sqlite/sqlite-storage-cli/main.go deleted file mode 100644 index 7db859f1..00000000 --- a/storage/sqlite/sqlite-storage-cli/main.go +++ /dev/null @@ -1,46 +0,0 @@ -//go:build cgo -// +build cgo - -package main - -import ( - "fmt" - "log" - "os" - - "crawshaw.io/sqlite" - "github.com/alexflint/go-arg" - - sqliteStorage "github.com/anacrolix/torrent/storage/sqlite" -) - -type InitCommand struct { - Path string `arg:"positional"` -} - -func main() { - err := mainErr() - if err != nil { - log.Printf("error in main: %v", err) - os.Exit(1) - } -} - -func mainErr() error { - var args struct { - Init *InitCommand `arg:"subcommand"` - } - p := arg.MustParse(&args) - switch { - case args.Init != nil: - conn, err := sqlite.OpenConn(args.Init.Path, 0) - if err != nil { - return fmt.Errorf("opening sqlite conn: %w", err) - } - defer conn.Close() - return sqliteStorage.InitSchema(conn, 1<<14, true) - default: - p.Fail("expected subcommand") - panic("unreachable") - } -} diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go deleted file mode 100644 index f8381649..00000000 --- a/storage/sqlite/sqlite-storage.go +++ /dev/null @@ -1,213 +0,0 @@ -//go:build cgo -// +build cgo - -package sqliteStorage - -import ( - _ "embed" - "errors" - "fmt" - "net/url" - - "crawshaw.io/sqlite" - "crawshaw.io/sqlite/sqlitex" -) - -type conn = *sqlite.Conn - -type InitConnOpts struct { - SetSynchronous int - SetJournalMode string - MmapSizeOk bool // If false, a package-specific default will be used. - MmapSize int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value. -} - -type UnexpectedJournalMode struct { - JournalMode string -} - -func (me UnexpectedJournalMode) Error() string { - return fmt.Sprintf("unexpected journal mode: %q", me.JournalMode) -} - -func setSynchronous(conn conn, syncInt int) (err error) { - err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma synchronous=%v`, syncInt), nil) - if err != nil { - return err - } - var ( - actual int - actualOk bool - ) - err = sqlitex.ExecTransient(conn, `pragma synchronous`, func(stmt *sqlite.Stmt) error { - actual = stmt.ColumnInt(0) - actualOk = true - return nil - }) - if err != nil { - return - } - if !actualOk { - return errors.New("synchronous setting query didn't return anything") - } - if actual != syncInt { - return fmt.Errorf("set synchronous %q, got %q", syncInt, actual) - } - return nil -} - -func initConn(conn conn, opts InitConnOpts) (err error) { - err = sqlitex.ExecTransient(conn, "pragma foreign_keys=on", nil) - if err != nil { - return err - } - err = setSynchronous(conn, opts.SetSynchronous) - if err != nil { - return - } - // Recursive triggers are required because we need to trim the blob_meta size after trimming to - // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown. - err = sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil) - if err != nil { - return err - } - if opts.SetJournalMode != "" { - err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error { - ret := stmt.ColumnText(0) - if ret != opts.SetJournalMode { - return UnexpectedJournalMode{ret} - } - return nil - }) - if err != nil { - return err - } - } - if !opts.MmapSizeOk { - // Set the default. Currently it seems the library picks reasonable defaults, especially for - // wal. - opts.MmapSize = -1 - //opts.MmapSize = 1 << 24 // 8 MiB - } - if opts.MmapSize >= 0 { - err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil) - if err != nil { - return err - } - } - return nil -} - -func setPageSize(conn conn, pageSize int) error { - if pageSize == 0 { - return nil - } - var retSize int64 - err := sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma page_size=%d`, pageSize), nil) - if err != nil { - return err - } - err = sqlitex.ExecTransient(conn, "pragma page_size", func(stmt *sqlite.Stmt) error { - retSize = stmt.ColumnInt64(0) - return nil - }) - if err != nil { - return err - } - if retSize != int64(pageSize) { - return fmt.Errorf("requested page size %v but got %v", pageSize, retSize) - } - return nil -} - -var ( - //go:embed init.sql - initScript string - //go:embed init-triggers.sql - initTriggers string -) - -func InitSchema(conn conn, pageSize int, triggers bool) error { - err := setPageSize(conn, pageSize) - if err != nil { - return fmt.Errorf("setting page size: %w", err) - } - err = sqlitex.ExecScript(conn, initScript) - if err != nil { - return err - } - if triggers { - err := sqlitex.ExecScript(conn, initTriggers) - if err != nil { - return err - } - } - return nil -} - -type InitDbOpts struct { - DontInitSchema bool - PageSize int - // If non-zero, overrides the existing setting. - Capacity int64 - NoTriggers bool -} - -// Remove any capacity limits. -func unlimitCapacity(conn conn) error { - return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil) -} - -// Set the capacity limit to exactly this value. -func setCapacity(conn conn, cap int64) error { - return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap) -} - -type NewConnOpts struct { - // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a - // private, temporary on-disk database will be created. This private database will be - // automatically deleted as soon as the database connection is closed." - Path string - Memory bool - // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL, - // and NumConns < 2. - NoConcurrentBlobReads bool -} - -func newOpenUri(opts NewConnOpts) string { - path := url.PathEscape(opts.Path) - if opts.Memory { - path = ":memory:" - } - values := make(url.Values) - if opts.NoConcurrentBlobReads || opts.Memory { - values.Add("cache", "shared") - } - return fmt.Sprintf("file:%s?%s", path, values.Encode()) -} - -func initDatabase(conn conn, opts InitDbOpts) (err error) { - if !opts.DontInitSchema { - err = InitSchema(conn, opts.PageSize, !opts.NoTriggers) - if err != nil { - return - } - } - if opts.Capacity < 0 { - err = unlimitCapacity(conn) - } else if opts.Capacity > 0 { - err = setCapacity(conn, opts.Capacity) - } - return -} - -// Go fmt, why you so shit? -const openConnFlags = 0 | - sqlite.SQLITE_OPEN_READWRITE | - sqlite.SQLITE_OPEN_CREATE | - sqlite.SQLITE_OPEN_URI | - sqlite.SQLITE_OPEN_NOMUTEX - -func newConn(opts NewConnOpts) (conn, error) { - return sqlite.OpenConn(newOpenUri(opts), openConnFlags) -} diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go index 1013a8b2..64ad836d 100644 --- a/storage/sqlite/sqlite-storage_test.go +++ b/storage/sqlite/sqlite-storage_test.go @@ -11,6 +11,7 @@ import ( "time" _ "github.com/anacrolix/envpprof" + "github.com/anacrolix/squirrel" "github.com/anacrolix/torrent/storage" test_storage "github.com/anacrolix/torrent/storage/test" "github.com/dustin/go-humanize" @@ -30,7 +31,7 @@ func BenchmarkMarkComplete(b *testing.B) { } c := qt.New(b) b.Run("CustomDirect", func(b *testing.B) { - var opts NewDirectStorageOpts + var opts squirrel.NewCacheOpts opts.Capacity = capacity opts.NoTriggers = noTriggers benchOpts := func(b *testing.B) { @@ -54,7 +55,7 @@ func BenchmarkMarkComplete(b *testing.B) { directBench := func(b *testing.B) { opts.Path = filepath.Join(b.TempDir(), "storage.db") ci, err := NewDirectStorage(opts) - var ujm UnexpectedJournalMode + var ujm squirrel.UnexpectedJournalMode if errors.As(err, &ujm) { b.Skipf("setting journal mode %q: %v", opts.SetJournalMode, err) } -- 2.44.0