github.com/anacrolix/missinggo/perf v1.0.0
github.com/anacrolix/missinggo/v2 v2.5.2
github.com/anacrolix/multiless v0.1.1-0.20210529082330-de2f6cf29619
+ github.com/anacrolix/squirrel v0.0.0-00010101000000-000000000000
github.com/anacrolix/sync v0.4.0
github.com/anacrolix/tagflag v1.3.0
github.com/anacrolix/upnp v0.1.2-0.20200416075019-5e9378ed1425
--- /dev/null
+package sqliteStorage
+
+import (
+ "github.com/anacrolix/squirrel"
+)
+
+type NewDirectStorageOpts = squirrel.NewCacheOpts
package sqliteStorage
import (
- "errors"
- "fmt"
"io"
- "runtime"
- "sync"
- "time"
"crawshaw.io/sqlite"
- "crawshaw.io/sqlite/sqlitex"
+ "github.com/anacrolix/squirrel"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
)
-type NewDirectStorageOpts struct {
- NewConnOpts
- InitDbOpts
- InitConnOpts
- GcBlobs bool
- NoCacheBlobs bool
- BlobFlushInterval time.Duration
-}
-
-func NewSquirrelCache(opts NewDirectStorageOpts) (_ *SquirrelCache, err error) {
- conn, err := newConn(opts.NewConnOpts)
- if err != nil {
- return
- }
- if opts.PageSize == 0 {
- // The largest size sqlite supports. I think we want this to be the smallest SquirrelBlob size we
- // can expect, which is probably 1<<17.
- opts.PageSize = 1 << 16
- }
- err = initDatabase(conn, opts.InitDbOpts)
- if err != nil {
- conn.Close()
- return
- }
- err = initConn(conn, opts.InitConnOpts)
- if err != nil {
- conn.Close()
- return
- }
- if opts.BlobFlushInterval == 0 && !opts.GcBlobs {
- // This is influenced by typical busy timeouts, of 5-10s. We want to give other connections
- // a few chances at getting a transaction through.
- opts.BlobFlushInterval = time.Second
- }
- cl := &SquirrelCache{
- conn: conn,
- blobs: make(map[string]*sqlite.Blob),
- opts: opts,
- }
- // Avoid race with cl.blobFlusherFunc
- cl.l.Lock()
- defer cl.l.Unlock()
- if opts.BlobFlushInterval != 0 {
- cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
- }
- cl.capacity = cl.getCapacity
- return cl, nil
-}
-
// A convenience function that creates a connection pool, resource provider, and a pieces storage
// ClientImpl and returns them all with a Close attached.
func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
- cache, err := NewSquirrelCache(opts)
+ cache, err := squirrel.NewCache(opts)
if err != nil {
return
}
- return client{cache}, nil
-}
-
-func (cl *SquirrelCache) getCapacity() (ret *int64) {
- cl.l.Lock()
- defer cl.l.Unlock()
- err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error {
- ret = new(int64)
- *ret = stmt.ColumnInt64(0)
- return nil
- })
- if err != nil {
- panic(err)
- }
- return
-}
-
-type SquirrelCache struct {
- l sync.Mutex
- conn conn
- blobs map[string]*sqlite.Blob
- blobFlusher *time.Timer
- opts NewDirectStorageOpts
- closed bool
- capacity func() *int64
+ return &client{
+ cache,
+ cache.GetCapacity}, nil
}
type client struct {
- *SquirrelCache
-}
-
-func (c *SquirrelCache) blobFlusherFunc() {
- c.l.Lock()
- defer c.l.Unlock()
- c.flushBlobs()
- if !c.closed {
- c.blobFlusher.Reset(c.opts.BlobFlushInterval)
- }
-}
-
-func (c *SquirrelCache) flushBlobs() {
- for key, b := range c.blobs {
- // Need the lock to prevent racing with the GC finalizers.
- b.Close()
- delete(c.blobs, key)
- }
+ *squirrel.Cache
+ capacity func() *int64
}
-func (c client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
- t := torrent{c.SquirrelCache}
+func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
+ t := torrent{c.Cache}
return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil
}
-func (c *SquirrelCache) Close() (err error) {
- c.l.Lock()
- defer c.l.Unlock()
- c.flushBlobs()
- if c.opts.BlobFlushInterval != 0 {
- c.blobFlusher.Stop()
- }
- if !c.closed {
- c.closed = true
- err = c.conn.Close()
- c.conn = nil
- }
- return
-}
-
type torrent struct {
- c *SquirrelCache
-}
-
-func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
- rowidOk := false
- err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
- if rowidOk {
- panic("expected at most one row")
- }
- // TODO: How do we know if we got this wrong?
- rowid = stmt.ColumnInt64(0)
- rowidOk = true
- return nil
- }, name)
- if err != nil {
- return
- }
- if rowidOk {
- return
- }
- if !create {
- err = errors.New("no existing row")
- return
- }
- err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
- if err != nil {
- return
- }
- rowid = c.LastInsertRowID()
- return
+ c *squirrel.Cache
}
func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
ret := piece{
- sb: SquirrelBlob{
+ sb: squirrel.Blob{
p.Hash().HexString(),
p.Length(),
t.c,
return nil
}
-type SquirrelBlob struct {
- name string
- length int64
- *SquirrelCache
-}
-
type piece struct {
- sb SquirrelBlob
+ sb squirrel.Blob
io.ReaderAt
io.WriterAt
}
-func (p SquirrelBlob) doAtIoWithBlob(
- atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
- b []byte,
- off int64,
- create bool,
-) (n int, err error) {
- p.l.Lock()
- defer p.l.Unlock()
- if p.opts.NoCacheBlobs {
- defer p.forgetBlob()
- }
- blob, err := p.getBlob(create)
- if err != nil {
- err = fmt.Errorf("getting blob: %w", err)
- return
- }
- n, err = atIo(blob)(b, off)
- if err == nil {
- return
- }
- var se sqlite.Error
- if !errors.As(err, &se) {
- return
- }
- // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
- // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
- // because they may be attached to names that have since moved on to another blob.
- if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
- return
- }
- p.forgetBlob()
- // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
- // might be possible to skip to this version if we don't cache blobs.
- blob, err = p.getBlob(create)
- if err != nil {
- err = fmt.Errorf("getting blob: %w", err)
- return
- }
- return atIo(blob)(b, off)
-}
-
-func (p SquirrelBlob) ReadAt(b []byte, off int64) (n int, err error) {
- return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
- return blob.ReadAt
- }, b, off, false)
-}
-
-func (p SquirrelBlob) WriteAt(b []byte, off int64) (n int, err error) {
- return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
- return blob.WriteAt
- }, b, off, true)
-}
-
-func (p SquirrelBlob) SetTag(name string, value interface{}) error {
- p.l.Lock()
- defer p.l.Unlock()
- return sqlitex.Exec(p.conn, "insert or replace into tag (blob_name, tag_name, value) values (?, ?, ?)", nil,
- p.name, name, value)
-}
-
func (p piece) MarkComplete() error {
return p.sb.SetTag("verified", true)
}
-func (p SquirrelBlob) forgetBlob() {
- blob, ok := p.blobs[p.name]
- if !ok {
- return
- }
- blob.Close()
- delete(p.blobs, p.name)
-}
-
func (p piece) MarkNotComplete() error {
return p.sb.SetTag("verified", false)
}
-func (p SquirrelBlob) GetTag(name string, result func(*sqlite.Stmt)) error {
- p.l.Lock()
- defer p.l.Unlock()
- return sqlitex.Exec(p.conn, "select value from tag where blob_name=? and tag_name=?", func(stmt *sqlite.Stmt) error {
- result(stmt)
- return nil
- }, p.name, name)
-}
-
func (p piece) Completion() (ret storage.Completion) {
err := p.sb.GetTag("verified", func(stmt *sqlite.Stmt) {
ret.Complete = stmt.ColumnInt(0) != 0
}
return
}
-
-func (p SquirrelBlob) getBlob(create bool) (*sqlite.Blob, error) {
- blob, ok := p.blobs[p.name]
- if !ok {
- rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
- if err != nil {
- return nil, fmt.Errorf("getting rowid for blob: %w", err)
- }
- blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
- if err != nil {
- panic(err)
- }
- if p.opts.GcBlobs {
- herp := new(byte)
- runtime.SetFinalizer(herp, func(*byte) {
- p.l.Lock()
- defer p.l.Unlock()
- // Note there's no guarantee that the finalizer fired while this blob is the same
- // one in the blob cache. It might be possible to rework this so that we check, or
- // strip finalizers as appropriate.
- blob.Close()
- })
- }
- p.blobs[p.name] = blob
- }
- return blob, nil
-}
+++ /dev/null
-create trigger if not exists delete_blob_tags_before_blob_deleted
-before delete on blob
-begin
- delete from tag where blob_name=old.name;
-end;
-
-create trigger if not exists after_insert_blob
-after insert on blob
-begin
- update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
- delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
-
-create trigger if not exists after_update_blob
-after update of data on blob
-begin
- update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
- delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
-
-create trigger if not exists after_delete_blob
-after delete on blob
-begin
- update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
-end;
+++ /dev/null
--- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
--- can trim the database file size with partial vacuums without having to do a full vacuum, which
--- locks everything.
-pragma auto_vacuum=incremental;
-
-create table if not exists blob (
- name text,
- last_used timestamp default (datetime('now')),
- data blob,
- primary key (name)
-);
-
-create table if not exists blob_meta (
- key text primary key,
- value
-);
-
-create index if not exists blob_last_used on blob(last_used);
-
--- While sqlite *seems* to be faster to get sum(length(data)) instead of
--- sum(length(data)), it may still require a large table scan at start-up or with a
--- cold-cache. With this we can be assured that it doesn't.
-insert or ignore into blob_meta values ('size', 0);
-
-create table if not exists setting (
- name primary key on conflict replace,
- value
-);
-
-create table if not exists tag (
- blob_name references blob(name),
- tag_name,
- value,
- primary key (blob_name, tag_name)
-);
-
-create view if not exists deletable_blob as
-with recursive excess (
- usage_with,
- last_used,
- blob_rowid,
- data_length
-) as (
- select *
- from (
- select
- (select value from blob_meta where key='size') as usage_with,
- last_used,
- rowid,
- length(data)
- from blob order by last_used, rowid limit 1
- )
- where usage_with > (select value from setting where name='capacity')
- union all
- select
- usage_with-data_length as new_usage_with,
- blob.last_used,
- blob.rowid,
- length(data)
- from excess join blob
- on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
- where new_usage_with > (select value from setting where name='capacity')
-)
-select * from excess;
+++ /dev/null
-pragma auto_vacuum=incremental;
-create table if not exists blob(
- name text,
- last_used timestamp default (datetime('now')),
- data blob,
- primary key (name)
-);
-
-create view if not exists deletable_blob as
-with recursive excess_blob(
- usage_with,
- last_used,
- blob_rowid,
- data_length
-) as (
- select * from (select (select sum(length(data)) from blob) as usage_with, last_used, rowid, length(data) from blob order by last_used, rowid limit 1)
- where usage_with >= (select value from setting where name='capacity')
- union all
- select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess_blob join blob
- on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
- where usage_with >= (select value from setting where name='capacity')
-) select * from excess;
-
-CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin
- delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
-CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin
- delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
+++ /dev/null
-//go:build cgo
-// +build cgo
-
-package main
-
-import (
- "fmt"
- "log"
- "os"
-
- "crawshaw.io/sqlite"
- "github.com/alexflint/go-arg"
-
- sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
-)
-
-type InitCommand struct {
- Path string `arg:"positional"`
-}
-
-func main() {
- err := mainErr()
- if err != nil {
- log.Printf("error in main: %v", err)
- os.Exit(1)
- }
-}
-
-func mainErr() error {
- var args struct {
- Init *InitCommand `arg:"subcommand"`
- }
- p := arg.MustParse(&args)
- switch {
- case args.Init != nil:
- conn, err := sqlite.OpenConn(args.Init.Path, 0)
- if err != nil {
- return fmt.Errorf("opening sqlite conn: %w", err)
- }
- defer conn.Close()
- return sqliteStorage.InitSchema(conn, 1<<14, true)
- default:
- p.Fail("expected subcommand")
- panic("unreachable")
- }
-}
+++ /dev/null
-//go:build cgo
-// +build cgo
-
-package sqliteStorage
-
-import (
- _ "embed"
- "errors"
- "fmt"
- "net/url"
-
- "crawshaw.io/sqlite"
- "crawshaw.io/sqlite/sqlitex"
-)
-
-type conn = *sqlite.Conn
-
-type InitConnOpts struct {
- SetSynchronous int
- SetJournalMode string
- MmapSizeOk bool // If false, a package-specific default will be used.
- MmapSize int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
-}
-
-type UnexpectedJournalMode struct {
- JournalMode string
-}
-
-func (me UnexpectedJournalMode) Error() string {
- return fmt.Sprintf("unexpected journal mode: %q", me.JournalMode)
-}
-
-func setSynchronous(conn conn, syncInt int) (err error) {
- err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma synchronous=%v`, syncInt), nil)
- if err != nil {
- return err
- }
- var (
- actual int
- actualOk bool
- )
- err = sqlitex.ExecTransient(conn, `pragma synchronous`, func(stmt *sqlite.Stmt) error {
- actual = stmt.ColumnInt(0)
- actualOk = true
- return nil
- })
- if err != nil {
- return
- }
- if !actualOk {
- return errors.New("synchronous setting query didn't return anything")
- }
- if actual != syncInt {
- return fmt.Errorf("set synchronous %q, got %q", syncInt, actual)
- }
- return nil
-}
-
-func initConn(conn conn, opts InitConnOpts) (err error) {
- err = sqlitex.ExecTransient(conn, "pragma foreign_keys=on", nil)
- if err != nil {
- return err
- }
- err = setSynchronous(conn, opts.SetSynchronous)
- if err != nil {
- return
- }
- // Recursive triggers are required because we need to trim the blob_meta size after trimming to
- // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
- err = sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
- if err != nil {
- return err
- }
- if opts.SetJournalMode != "" {
- err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
- ret := stmt.ColumnText(0)
- if ret != opts.SetJournalMode {
- return UnexpectedJournalMode{ret}
- }
- return nil
- })
- if err != nil {
- return err
- }
- }
- if !opts.MmapSizeOk {
- // Set the default. Currently it seems the library picks reasonable defaults, especially for
- // wal.
- opts.MmapSize = -1
- //opts.MmapSize = 1 << 24 // 8 MiB
- }
- if opts.MmapSize >= 0 {
- err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-func setPageSize(conn conn, pageSize int) error {
- if pageSize == 0 {
- return nil
- }
- var retSize int64
- err := sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma page_size=%d`, pageSize), nil)
- if err != nil {
- return err
- }
- err = sqlitex.ExecTransient(conn, "pragma page_size", func(stmt *sqlite.Stmt) error {
- retSize = stmt.ColumnInt64(0)
- return nil
- })
- if err != nil {
- return err
- }
- if retSize != int64(pageSize) {
- return fmt.Errorf("requested page size %v but got %v", pageSize, retSize)
- }
- return nil
-}
-
-var (
- //go:embed init.sql
- initScript string
- //go:embed init-triggers.sql
- initTriggers string
-)
-
-func InitSchema(conn conn, pageSize int, triggers bool) error {
- err := setPageSize(conn, pageSize)
- if err != nil {
- return fmt.Errorf("setting page size: %w", err)
- }
- err = sqlitex.ExecScript(conn, initScript)
- if err != nil {
- return err
- }
- if triggers {
- err := sqlitex.ExecScript(conn, initTriggers)
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-type InitDbOpts struct {
- DontInitSchema bool
- PageSize int
- // If non-zero, overrides the existing setting.
- Capacity int64
- NoTriggers bool
-}
-
-// Remove any capacity limits.
-func unlimitCapacity(conn conn) error {
- return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
-}
-
-// Set the capacity limit to exactly this value.
-func setCapacity(conn conn, cap int64) error {
- return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
-}
-
-type NewConnOpts struct {
- // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
- // private, temporary on-disk database will be created. This private database will be
- // automatically deleted as soon as the database connection is closed."
- Path string
- Memory bool
- // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
- // and NumConns < 2.
- NoConcurrentBlobReads bool
-}
-
-func newOpenUri(opts NewConnOpts) string {
- path := url.PathEscape(opts.Path)
- if opts.Memory {
- path = ":memory:"
- }
- values := make(url.Values)
- if opts.NoConcurrentBlobReads || opts.Memory {
- values.Add("cache", "shared")
- }
- return fmt.Sprintf("file:%s?%s", path, values.Encode())
-}
-
-func initDatabase(conn conn, opts InitDbOpts) (err error) {
- if !opts.DontInitSchema {
- err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
- if err != nil {
- return
- }
- }
- if opts.Capacity < 0 {
- err = unlimitCapacity(conn)
- } else if opts.Capacity > 0 {
- err = setCapacity(conn, opts.Capacity)
- }
- return
-}
-
-// Go fmt, why you so shit?
-const openConnFlags = 0 |
- sqlite.SQLITE_OPEN_READWRITE |
- sqlite.SQLITE_OPEN_CREATE |
- sqlite.SQLITE_OPEN_URI |
- sqlite.SQLITE_OPEN_NOMUTEX
-
-func newConn(opts NewConnOpts) (conn, error) {
- return sqlite.OpenConn(newOpenUri(opts), openConnFlags)
-}
"time"
_ "github.com/anacrolix/envpprof"
+ "github.com/anacrolix/squirrel"
"github.com/anacrolix/torrent/storage"
test_storage "github.com/anacrolix/torrent/storage/test"
"github.com/dustin/go-humanize"
}
c := qt.New(b)
b.Run("CustomDirect", func(b *testing.B) {
- var opts NewDirectStorageOpts
+ var opts squirrel.NewCacheOpts
opts.Capacity = capacity
opts.NoTriggers = noTriggers
benchOpts := func(b *testing.B) {
directBench := func(b *testing.B) {
opts.Path = filepath.Join(b.TempDir(), "storage.db")
ci, err := NewDirectStorage(opts)
- var ujm UnexpectedJournalMode
+ var ujm squirrel.UnexpectedJournalMode
if errors.As(err, &ujm) {
b.Skipf("setting journal mode %q: %v", opts.SetJournalMode, err)
}