package sqliteStorage
import (
- "errors"
- "fmt"
- "runtime"
- "sync"
- "time"
+ "io"
"crawshaw.io/sqlite"
- "crawshaw.io/sqlite/sqlitex"
+ "github.com/anacrolix/squirrel"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
)
-type NewDirectStorageOpts struct {
- NewConnOpts
- InitDbOpts
- InitConnOpts
- GcBlobs bool
- NoCacheBlobs bool
- BlobFlushInterval time.Duration
-}
-
// A convenience function that creates a connection pool, resource provider, and a pieces storage
// ClientImpl and returns them all with a Close attached.
func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
- conn, err := newConn(opts.NewConnOpts)
- if err != nil {
- return
- }
- if opts.PageSize == 0 {
- // The largest size sqlite supports. I think we want this to be the smallest piece size we
- // can expect, which is probably 1<<17.
- opts.PageSize = 1 << 16
- }
- err = initDatabase(conn, opts.InitDbOpts)
- if err != nil {
- conn.Close()
- return
- }
- err = initConn(conn, opts.InitConnOpts)
+ cache, err := squirrel.NewCache(opts)
if err != nil {
- conn.Close()
return
}
- if opts.BlobFlushInterval == 0 && !opts.GcBlobs {
- // This is influenced by typical busy timeouts, of 5-10s. We want to give other connections
- // a few chances at getting a transaction through.
- opts.BlobFlushInterval = time.Second
- }
- cl := &client{
- conn: conn,
- blobs: make(map[string]*sqlite.Blob),
- opts: opts,
- }
- // Avoid race with cl.blobFlusherFunc
- cl.l.Lock()
- defer cl.l.Unlock()
- if opts.BlobFlushInterval != 0 {
- cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
- }
- cl.capacity = cl.getCapacity
- return cl, nil
+ return &client{
+ cache,
+ cache.GetCapacity,
+ }, nil
}
-func (cl *client) getCapacity() (ret *int64) {
- cl.l.Lock()
- defer cl.l.Unlock()
- err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error {
- ret = new(int64)
- *ret = stmt.ColumnInt64(0)
- return nil
- })
- if err != nil {
- panic(err)
+func NewWrappingClient(cache *squirrel.Cache) storage.ClientImpl {
+ return &client{
+ cache,
+ cache.GetCapacity,
}
- return
}
type client struct {
- l sync.Mutex
- conn conn
- blobs map[string]*sqlite.Blob
- blobFlusher *time.Timer
- opts NewDirectStorageOpts
- closed bool
- capacity func() *int64
-}
-
-func (c *client) blobFlusherFunc() {
- c.l.Lock()
- defer c.l.Unlock()
- c.flushBlobs()
- if !c.closed {
- c.blobFlusher.Reset(c.opts.BlobFlushInterval)
- }
-}
-
-func (c *client) flushBlobs() {
- for key, b := range c.blobs {
- // Need the lock to prevent racing with the GC finalizers.
- b.Close()
- delete(c.blobs, key)
- }
+ *squirrel.Cache
+ capacity func() *int64
}
func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
- t := torrent{c}
+ t := torrent{c.Cache}
return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil
}
-func (c *client) Close() (err error) {
- c.l.Lock()
- defer c.l.Unlock()
- c.flushBlobs()
- if c.opts.BlobFlushInterval != 0 {
- c.blobFlusher.Stop()
- }
- if !c.closed {
- c.closed = true
- err = c.conn.Close()
- c.conn = nil
- }
- return
-}
-
type torrent struct {
- c *client
-}
-
-func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
- rowidOk := false
- err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
- if rowidOk {
- panic("expected at most one row")
- }
- // TODO: How do we know if we got this wrong?
- rowid = stmt.ColumnInt64(0)
- rowidOk = true
- return nil
- }, name)
- if err != nil {
- return
- }
- if rowidOk {
- return
- }
- if !create {
- err = errors.New("no existing row")
- return
- }
- err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
- if err != nil {
- return
- }
- rowid = c.LastInsertRowID()
- return
+ c *squirrel.Cache
}
func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
- t.c.l.Lock()
- defer t.c.l.Unlock()
- name := p.Hash().HexString()
- return piece{
- name,
- p.Length(),
- t.c,
+ ret := piece{
+ sb: t.c.OpenWithLength(p.Hash().HexString(), p.Length()),
}
+ ret.ReaderAt = &ret.sb
+ ret.WriterAt = &ret.sb
+ return ret
}
func (t torrent) Close() error {
}
type piece struct {
- name string
- length int64
- *client
-}
-
-func (p piece) doAtIoWithBlob(
- atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
- b []byte,
- off int64,
- create bool,
-) (n int, err error) {
- p.l.Lock()
- defer p.l.Unlock()
- if p.opts.NoCacheBlobs {
- defer p.forgetBlob()
- }
- blob, err := p.getBlob(create)
- if err != nil {
- err = fmt.Errorf("getting blob: %w", err)
- return
- }
- n, err = atIo(blob)(b, off)
- if err == nil {
- return
- }
- var se sqlite.Error
- if !errors.As(err, &se) {
- return
- }
- // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
- // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
- // because they may be attached to names that have since moved on to another blob.
- if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
- return
- }
- p.forgetBlob()
- // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
- // might be possible to skip to this version if we don't cache blobs.
- blob, err = p.getBlob(create)
- if err != nil {
- err = fmt.Errorf("getting blob: %w", err)
- return
- }
- return atIo(blob)(b, off)
-}
-
-func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
- return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
- return blob.ReadAt
- }, b, off, false)
-}
-
-func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
- return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
- return blob.WriteAt
- }, b, off, true)
+ sb squirrel.Blob
+ io.ReaderAt
+ io.WriterAt
}
func (p piece) MarkComplete() error {
- p.l.Lock()
- defer p.l.Unlock()
- err := sqlitex.Exec(p.conn, "update blob set verified=true where name=?", nil, p.name)
- if err != nil {
- return err
- }
- changes := p.conn.Changes()
- if changes != 1 {
- panic(changes)
- }
- return nil
-}
-
-func (p piece) forgetBlob() {
- blob, ok := p.blobs[p.name]
- if !ok {
- return
- }
- blob.Close()
- delete(p.blobs, p.name)
+ return p.sb.SetTag("verified", true)
}
func (p piece) MarkNotComplete() error {
- p.l.Lock()
- defer p.l.Unlock()
- return sqlitex.Exec(p.conn, "update blob set verified=false where name=?", nil, p.name)
+ return p.sb.SetTag("verified", false)
}
func (p piece) Completion() (ret storage.Completion) {
- p.l.Lock()
- defer p.l.Unlock()
- err := sqlitex.Exec(p.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
+ err := p.sb.GetTag("verified", func(stmt *sqlite.Stmt) {
ret.Complete = stmt.ColumnInt(0) != 0
- return nil
- }, p.name)
+ })
ret.Ok = err == nil
if err != nil {
panic(err)
}
return
}
-
-func (p piece) getBlob(create bool) (*sqlite.Blob, error) {
- blob, ok := p.blobs[p.name]
- if !ok {
- rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
- if err != nil {
- return nil, fmt.Errorf("getting rowid for blob: %w", err)
- }
- blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
- if err != nil {
- panic(err)
- }
- if p.opts.GcBlobs {
- herp := new(byte)
- runtime.SetFinalizer(herp, func(*byte) {
- p.l.Lock()
- defer p.l.Unlock()
- // Note there's no guarantee that the finalizer fired while this blob is the same
- // one in the blob cache. It might be possible to rework this so that we check, or
- // strip finalizers as appropriate.
- blob.Close()
- })
- }
- p.blobs[p.name] = blob
- }
- return blob, nil
-}
+++ /dev/null
-//go:build cgo
-// +build cgo
-
-package sqliteStorage
-
-import (
- "bytes"
- "context"
- "errors"
- "expvar"
- "fmt"
- "io"
- "log"
- "net/url"
- "os"
- "runtime"
- "runtime/pprof"
- "strings"
- "sync"
- "time"
-
- "crawshaw.io/sqlite"
- "crawshaw.io/sqlite/sqlitex"
- "github.com/anacrolix/missinggo/iter"
- "github.com/anacrolix/missinggo/v2/resource"
-
- "github.com/anacrolix/torrent/storage"
-)
-
-type conn = *sqlite.Conn
-
-type InitConnOpts struct {
- SetSynchronous int
- SetJournalMode string
- MmapSizeOk bool // If false, a package-specific default will be used.
- MmapSize int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
-}
-
-type UnexpectedJournalMode struct {
- JournalMode string
-}
-
-func (me UnexpectedJournalMode) Error() string {
- return fmt.Sprintf("unexpected journal mode: %q", me.JournalMode)
-}
-
-func setSynchronous(conn conn, syncInt int) (err error) {
- err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma synchronous=%v`, syncInt), nil)
- if err != nil {
- return err
- }
- var (
- actual int
- actualOk bool
- )
- err = sqlitex.ExecTransient(conn, `pragma synchronous`, func(stmt *sqlite.Stmt) error {
- actual = stmt.ColumnInt(0)
- actualOk = true
- return nil
- })
- if err != nil {
- return
- }
- if !actualOk {
- return errors.New("synchronous setting query didn't return anything")
- }
- if actual != syncInt {
- return fmt.Errorf("set synchronous %q, got %q", syncInt, actual)
- }
- return nil
-}
-
-func initConn(conn conn, opts InitConnOpts) (err error) {
- err = setSynchronous(conn, opts.SetSynchronous)
- if err != nil {
- return
- }
- // Recursive triggers are required because we need to trim the blob_meta size after trimming to
- // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
- err = sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
- if err != nil {
- return err
- }
- if opts.SetJournalMode != "" {
- err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
- ret := stmt.ColumnText(0)
- if ret != opts.SetJournalMode {
- return UnexpectedJournalMode{ret}
- }
- return nil
- })
- if err != nil {
- return err
- }
- }
- if !opts.MmapSizeOk {
- // Set the default. Currently it seems the library picks reasonable defaults, especially for
- // wal.
- opts.MmapSize = -1
- //opts.MmapSize = 1 << 24 // 8 MiB
- }
- if opts.MmapSize >= 0 {
- err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-func setPageSize(conn conn, pageSize int) error {
- if pageSize == 0 {
- return nil
- }
- var retSize int64
- err := sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma page_size=%d`, pageSize), nil)
- if err != nil {
- return err
- }
- err = sqlitex.ExecTransient(conn, "pragma page_size", func(stmt *sqlite.Stmt) error {
- retSize = stmt.ColumnInt64(0)
- return nil
- })
- if err != nil {
- return err
- }
- if retSize != int64(pageSize) {
- return fmt.Errorf("requested page size %v but got %v", pageSize, retSize)
- }
- return nil
-}
-
-func InitSchema(conn conn, pageSize int, triggers bool) error {
- err := setPageSize(conn, pageSize)
- if err != nil {
- return fmt.Errorf("setting page size: %w", err)
- }
- err = sqlitex.ExecScript(conn, `
- -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
- -- can trim the database file size with partial vacuums without having to do a full vacuum, which
- -- locks everything.
- pragma auto_vacuum=incremental;
-
- create table if not exists blob (
- name text,
- last_used timestamp default (datetime('now')),
- data blob,
- verified bool,
- primary key (name)
- );
-
- create table if not exists blob_meta (
- key text primary key,
- value
- );
-
- create index if not exists blob_last_used on blob(last_used);
-
- -- While sqlite *seems* to be faster to get sum(length(data)) instead of
- -- sum(length(data)), it may still require a large table scan at start-up or with a
- -- cold-cache. With this we can be assured that it doesn't.
- insert or ignore into blob_meta values ('size', 0);
-
- create table if not exists setting (
- name primary key on conflict replace,
- value
- );
-
- create view if not exists deletable_blob as
- with recursive excess (
- usage_with,
- last_used,
- blob_rowid,
- data_length
- ) as (
- select *
- from (
- select
- (select value from blob_meta where key='size') as usage_with,
- last_used,
- rowid,
- length(data)
- from blob order by last_used, rowid limit 1
- )
- where usage_with > (select value from setting where name='capacity')
- union all
- select
- usage_with-data_length as new_usage_with,
- blob.last_used,
- blob.rowid,
- length(data)
- from excess join blob
- on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
- where new_usage_with > (select value from setting where name='capacity')
- )
- select * from excess;
- `)
- if err != nil {
- return err
- }
- if triggers {
- err := sqlitex.ExecScript(conn, `
- create trigger if not exists after_insert_blob
- after insert on blob
- begin
- update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
- delete from blob where rowid in (select blob_rowid from deletable_blob);
- end;
-
- create trigger if not exists after_update_blob
- after update of data on blob
- begin
- update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
- delete from blob where rowid in (select blob_rowid from deletable_blob);
- end;
-
- create trigger if not exists after_delete_blob
- after delete on blob
- begin
- update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
- end;
- `)
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-type NewPiecesStorageOpts struct {
- NewPoolOpts
- InitDbOpts
- ProvOpts func(*ProviderOpts)
- StorageOpts func(*storage.ResourcePiecesOpts)
-}
-
-type NewPoolOpts struct {
- NewConnOpts
- InitConnOpts
- NumConns int
-}
-
-type InitDbOpts struct {
- DontInitSchema bool
- PageSize int
- // If non-zero, overrides the existing setting.
- Capacity int64
- NoTriggers bool
-}
-
-// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
-// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
-// top-level option type.
-type PoolConf struct {
- NumConns int
- JournalMode string
-}
-
-// Remove any capacity limits.
-func UnlimitCapacity(conn conn) error {
- return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
-}
-
-// Set the capacity limit to exactly this value.
-func SetCapacity(conn conn, cap int64) error {
- return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
-}
-
-type NewConnOpts struct {
- // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
- // private, temporary on-disk database will be created. This private database will be
- // automatically deleted as soon as the database connection is closed."
- Path string
- Memory bool
- // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
- // and NumConns < 2.
- NoConcurrentBlobReads bool
-}
-
-func newOpenUri(opts NewConnOpts) string {
- path := url.PathEscape(opts.Path)
- if opts.Memory {
- path = ":memory:"
- }
- values := make(url.Values)
- if opts.NoConcurrentBlobReads || opts.Memory {
- values.Add("cache", "shared")
- }
- return fmt.Sprintf("file:%s?%s", path, values.Encode())
-}
-
-func initDatabase(conn conn, opts InitDbOpts) (err error) {
- if !opts.DontInitSchema {
- err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
- if err != nil {
- return
- }
- }
- if opts.Capacity != 0 {
- err = SetCapacity(conn, opts.Capacity)
- if err != nil {
- return
- }
- }
- return
-}
-
-func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
- withPoolConn(pool, func(c conn) {
- err = initDatabase(c, opts)
- })
- return
-}
-
-// Go fmt, why you so shit?
-const openConnFlags = 0 |
- sqlite.SQLITE_OPEN_READWRITE |
- sqlite.SQLITE_OPEN_CREATE |
- sqlite.SQLITE_OPEN_URI |
- sqlite.SQLITE_OPEN_NOMUTEX
-
-func newConn(opts NewConnOpts) (conn, error) {
- return sqlite.OpenConn(newOpenUri(opts), openConnFlags)
-}
-
-type poolWithNumConns struct {
- *sqlitex.Pool
- numConns int
-}
-
-func (me poolWithNumConns) NumConns() int {
- return me.numConns
-}
-
-func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
- if opts.NumConns == 0 {
- opts.NumConns = runtime.NumCPU()
- }
- switch opts.NumConns {
- case 1:
- conn, err := newConn(opts.NewConnOpts)
- return &poolFromConn{conn: conn}, err
- default:
- _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), openConnFlags, opts.NumConns)
- return poolWithNumConns{_pool, opts.NumConns}, err
- }
-}
-
-// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
-type poolFromConn struct {
- mu sync.Mutex
- conn conn
-}
-
-func (me *poolFromConn) Get(ctx context.Context) conn {
- me.mu.Lock()
- return me.conn
-}
-
-func (me *poolFromConn) Put(conn conn) {
- if conn != me.conn {
- panic("expected to same conn")
- }
- me.mu.Unlock()
-}
-
-func (me *poolFromConn) Close() error {
- return me.conn.Close()
-}
-
-func (me *poolFromConn) NumConns() int { return 1 }
-
-type ProviderOpts struct {
- BatchWrites bool
-}
-
-// Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
-// the ConnPool (since it has to initialize all the connections anyway).
-func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
- prov := &provider{pool: pool, opts: opts}
- if opts.BatchWrites {
- writes := make(chan writeRequest)
- prov.writes = writes
- // This is retained for backwards compatibility. It may not be necessary.
- runtime.SetFinalizer(prov, func(p *provider) {
- p.Close()
- })
- go providerWriter(writes, prov.pool)
- }
- return prov, nil
-}
-
-type InitPoolOpts struct {
- NumConns int
- InitConnOpts
-}
-
-func initPoolConns(ctx context.Context, pool ConnPool, opts InitConnOpts) (err error) {
- var conns []conn
- defer func() {
- for _, c := range conns {
- pool.Put(c)
- }
- }()
- for range iter.N(pool.NumConns()) {
- conn := pool.Get(ctx)
- if conn == nil {
- break
- }
- conns = append(conns, conn)
- err = initConn(conn, opts)
- if err != nil {
- err = fmt.Errorf("initing conn %v: %w", len(conns), err)
- return
- }
- }
- return
-}
-
-type ConnPool interface {
- Get(context.Context) conn
- Put(conn)
- Close() error
- NumConns() int
-}
-
-func withPoolConn(pool ConnPool, with func(conn)) {
- c := pool.Get(context.TODO())
- defer pool.Put(c)
- with(c)
-}
-
-type provider struct {
- pool ConnPool
- writes chan<- writeRequest
- opts ProviderOpts
- closeMu sync.RWMutex
- closed bool
- closeErr error
-}
-
-var _ storage.ConsecutiveChunkReader = (*provider)(nil)
-
-func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
- p.closeMu.RLock()
- runner, err := p.getReadWithConnRunner()
- if err != nil {
- p.closeMu.RUnlock()
- return nil, err
- }
- r, w := io.Pipe()
- go func() {
- defer p.closeMu.RUnlock()
- err = runner(func(_ context.Context, conn conn) error {
- var written int64
- err = sqlitex.Exec(conn, `
- select
- data,
- cast(substr(name, ?+1) as integer) as offset
- from blob
- where name like ?||'%'
- order by offset`,
- func(stmt *sqlite.Stmt) error {
- offset := stmt.ColumnInt64(1)
- if offset != written {
- return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
- }
- // TODO: Avoid intermediate buffers here
- r := stmt.ColumnReader(0)
- w1, err := io.Copy(w, r)
- written += w1
- return err
- },
- len(prefix),
- prefix,
- )
- return err
- })
- w.CloseWithError(err)
- }()
- return r, nil
-}
-
-func (me *provider) Close() error {
- me.closeMu.Lock()
- defer me.closeMu.Unlock()
- if me.closed {
- return me.closeErr
- }
- if me.writes != nil {
- close(me.writes)
- }
- me.closeErr = me.pool.Close()
- me.closed = true
- return me.closeErr
-}
-
-type writeRequest struct {
- query withConn
- done chan<- error
- labels pprof.LabelSet
-}
-
-var expvars = expvar.NewMap("sqliteStorage")
-
-func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
- pprof.Do(context.Background(), labels, func(ctx context.Context) {
- // We pass in the context in the hope that the CPU profiler might incorporate sqlite
- // activity the action that triggered it. It doesn't seem that way, since those calls don't
- // take a context.Context themselves. It may come in useful in the goroutine profiles
- // though, and doesn't hurt to expose it here for other purposes should things change.
- err = query(ctx, conn)
- })
- return
-}
-
-// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
-// stronger typing on the writes channel.
-func providerWriter(writes <-chan writeRequest, pool ConnPool) {
- conn := pool.Get(context.TODO())
- if conn == nil {
- return
- }
- defer pool.Put(conn)
- for {
- first, ok := <-writes
- if !ok {
- return
- }
- var buf []func()
- var cantFail error
- func() {
- defer sqlitex.Save(conn)(&cantFail)
- firstErr := runQueryWithLabels(first.query, first.labels, conn)
- buf = append(buf, func() { first.done <- firstErr })
- for {
- select {
- case wr, ok := <-writes:
- if ok {
- err := runQueryWithLabels(wr.query, wr.labels, conn)
- buf = append(buf, func() { wr.done <- err })
- continue
- }
- default:
- }
- break
- }
- }()
- // Not sure what to do if this failed.
- if cantFail != nil {
- expvars.Add("batchTransactionErrors", 1)
- }
- // Signal done after we know the transaction succeeded.
- for _, done := range buf {
- done()
- }
- expvars.Add("batchTransactions", 1)
- expvars.Add("batchedQueries", int64(len(buf)))
- //log.Printf("batched %v write queries", len(buf))
- }
-}
-
-func (p *provider) NewInstance(s string) (resource.Instance, error) {
- return instance{s, p}, nil
-}
-
-type instance struct {
- location string
- p *provider
-}
-
-func getLabels(skip int) pprof.LabelSet {
- return pprof.Labels("sqlite-storage-action", func() string {
- var pcs [8]uintptr
- runtime.Callers(skip+3, pcs[:])
- fs := runtime.CallersFrames(pcs[:])
- f, _ := fs.Next()
- funcName := f.Func.Name()
- funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
- //log.Printf("func name: %q", funcName)
- return funcName
- }())
-}
-
-func (p *provider) withConn(with withConn, write bool, skip int) error {
- p.closeMu.RLock()
- // I think we need to check this here because it may not be valid to send to the writes channel
- // if we're already closed. So don't try to move this check into getReadWithConnRunner.
- if p.closed {
- p.closeMu.RUnlock()
- return errors.New("closed")
- }
- if write && p.opts.BatchWrites {
- done := make(chan error)
- p.writes <- writeRequest{
- query: with,
- done: done,
- labels: getLabels(skip + 1),
- }
- p.closeMu.RUnlock()
- return <-done
- } else {
- defer p.closeMu.RUnlock()
- runner, err := p.getReadWithConnRunner()
- if err != nil {
- return err
- }
- return runner(with)
- }
-}
-
-// Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
-// function, the runner *must* be used or the conn is leaked. You should check the provider isn't
-// closed before using this.
-func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
- conn := p.pool.Get(context.TODO())
- if conn == nil {
- err = errors.New("couldn't get pool conn")
- return
- }
- with = func(with withConn) error {
- defer p.pool.Put(conn)
- return runQueryWithLabels(with, getLabels(1), conn)
- }
- return
-}
-
-type withConn func(context.Context, conn) error
-
-func (i instance) withConn(with withConn, write bool) error {
- return i.p.withConn(with, write, 1)
-}
-
-func (i instance) getConn() *sqlite.Conn {
- return i.p.pool.Get(context.TODO())
-}
-
-func (i instance) putConn(conn *sqlite.Conn) {
- i.p.pool.Put(conn)
-}
-
-func (i instance) Readdirnames() (names []string, err error) {
- prefix := i.location + "/"
- err = i.withConn(func(_ context.Context, conn conn) error {
- return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
- names = append(names, stmt.ColumnText(0)[len(prefix):])
- return nil
- }, prefix+"%")
- }, false)
- //log.Printf("readdir %q gave %q", i.location, names)
- return
-}
-
-func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
- rows := 0
- err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
- rowid = stmt.ColumnInt64(0)
- rows++
- return nil
- }, i.location)
- if err != nil {
- return
- }
- if rows == 1 {
- return
- }
- if rows == 0 {
- err = errors.New("blob not found")
- return
- }
- panic(rows)
-}
-
-type connBlob struct {
- *sqlite.Blob
- onClose func()
-}
-
-func (me connBlob) Close() error {
- err := me.Blob.Close()
- me.onClose()
- return err
-}
-
-func (i instance) Get() (ret io.ReadCloser, err error) {
- conn := i.getConn()
- if conn == nil {
- panic("nil sqlite conn")
- }
- blob, err := i.openBlob(conn, false, true)
- if err != nil {
- i.putConn(conn)
- return
- }
- var once sync.Once
- return connBlob{blob, func() {
- once.Do(func() { i.putConn(conn) })
- }}, nil
-}
-
-func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
- rowid, err := i.getBlobRowid(conn)
- if err != nil {
- return nil, err
- }
- // This seems to cause locking issues with in-memory databases. Is it something to do with not
- // having WAL?
- if updateAccess {
- err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
- if err != nil {
- err = fmt.Errorf("updating last_used: %w", err)
- return nil, err
- }
- if conn.Changes() != 1 {
- panic(conn.Changes())
- }
- }
- return conn.OpenBlob("main", "blob", "data", rowid, write)
-}
-
-func (i instance) PutSized(reader io.Reader, size int64) (err error) {
- err = i.withConn(func(_ context.Context, conn conn) error {
- err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
- nil,
- i.location, size)
- if err != nil {
- return err
- }
- blob, err := i.openBlob(conn, true, false)
- if err != nil {
- return err
- }
- defer blob.Close()
- _, err = io.Copy(blob, reader)
- return err
- }, true)
- return
-}
-
-func (i instance) Put(reader io.Reader) (err error) {
- var buf bytes.Buffer
- _, err = io.Copy(&buf, reader)
- if err != nil {
- return err
- }
- if false {
- return i.PutSized(&buf, int64(buf.Len()))
- } else {
- return i.withConn(func(_ context.Context, conn conn) error {
- for range iter.N(10) {
- err = sqlitex.Exec(conn,
- "insert or replace into blob(name, data) values(?, cast(? as blob))",
- nil,
- i.location, buf.Bytes())
- if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
- log.Print("sqlite busy")
- time.Sleep(time.Second)
- continue
- }
- break
- }
- return err
- }, true)
- }
-}
-
-type fileInfo struct {
- size int64
-}
-
-func (f fileInfo) Name() string {
- panic("implement me")
-}
-
-func (f fileInfo) Size() int64 {
- return f.size
-}
-
-func (f fileInfo) Mode() os.FileMode {
- panic("implement me")
-}
-
-func (f fileInfo) ModTime() time.Time {
- panic("implement me")
-}
-
-func (f fileInfo) IsDir() bool {
- panic("implement me")
-}
-
-func (f fileInfo) Sys() interface{} {
- panic("implement me")
-}
-
-func (i instance) Stat() (ret os.FileInfo, err error) {
- err = i.withConn(func(_ context.Context, conn conn) error {
- var blob *sqlite.Blob
- blob, err = i.openBlob(conn, false, false)
- if err != nil {
- return err
- }
- defer blob.Close()
- ret = fileInfo{blob.Size()}
- return nil
- }, false)
- return
-}
-
-func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
- err = i.withConn(func(_ context.Context, conn conn) error {
- if false {
- var blob *sqlite.Blob
- blob, err = i.openBlob(conn, false, true)
- if err != nil {
- return err
- }
- defer blob.Close()
- if off >= blob.Size() {
- err = io.EOF
- return err
- }
- if off+int64(len(p)) > blob.Size() {
- p = p[:blob.Size()-off]
- }
- n, err = blob.ReadAt(p, off)
- } else {
- gotRow := false
- err = sqlitex.Exec(
- conn,
- "select substr(data, ?, ?) from blob where name=?",
- func(stmt *sqlite.Stmt) error {
- if gotRow {
- panic("found multiple matching blobs")
- } else {
- gotRow = true
- }
- n = stmt.ColumnBytes(0, p)
- return nil
- },
- off+1, len(p), i.location,
- )
- if err != nil {
- return err
- }
- if !gotRow {
- err = errors.New("blob not found")
- return err
- }
- if n < len(p) {
- err = io.EOF
- }
- }
- return nil
- }, false)
- return
-}
-
-func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
- panic("implement me")
-}
-
-func (i instance) Delete() error {
- return i.withConn(func(_ context.Context, conn conn) error {
- return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
- }, true)
-}
package sqliteStorage
import (
- "bytes"
- "context"
"errors"
"fmt"
- "io"
- "io/ioutil"
"path/filepath"
- "sync"
"testing"
"time"
_ "github.com/anacrolix/envpprof"
- "github.com/dustin/go-humanize"
- qt "github.com/frankban/quicktest"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
-
+ "github.com/anacrolix/squirrel"
"github.com/anacrolix/torrent/storage"
test_storage "github.com/anacrolix/torrent/storage/test"
+ "github.com/dustin/go-humanize"
+ qt "github.com/frankban/quicktest"
)
-func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
- opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
- pool, err := NewPool(opts)
- qt.Assert(t, err, qt.IsNil)
- // sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now.
- //t.Cleanup(func() { pool.Close() })
- qt.Assert(t, initPoolDatabase(pool, InitDbOpts{}), qt.IsNil)
- if !opts.Memory && opts.SetJournalMode == "" {
- opts.SetJournalMode = "wal"
- }
- qt.Assert(t, initPoolConns(context.TODO(), pool, opts.InitConnOpts), qt.IsNil)
- prov, err := NewProvider(pool, ProviderOpts{BatchWrites: pool.NumConns() > 1})
- require.NoError(t, err)
- t.Cleanup(func() { prov.Close() })
- return pool, prov
-}
-
-func TestTextBlobSize(t *testing.T) {
- _, prov := newConnsAndProv(t, NewPoolOpts{})
- a, _ := prov.NewInstance("a")
- err := a.Put(bytes.NewBufferString("\x00hello"))
- qt.Assert(t, err, qt.IsNil)
- fi, err := a.Stat()
- qt.Assert(t, err, qt.IsNil)
- assert.EqualValues(t, 6, fi.Size())
-}
-
-func TestSimultaneousIncrementalBlob(t *testing.T) {
- _, p := newConnsAndProv(t, NewPoolOpts{
- NumConns: 3,
- })
- a, err := p.NewInstance("a")
- require.NoError(t, err)
- const contents = "hello, world"
- require.NoError(t, a.Put(bytes.NewReader([]byte("hello, world"))))
- rc1, err := a.Get()
- require.NoError(t, err)
- rc2, err := a.Get()
- require.NoError(t, err)
- var b1, b2 []byte
- var e1, e2 error
- var wg sync.WaitGroup
- doRead := func(b *[]byte, e *error, rc io.ReadCloser, n int) {
- defer wg.Done()
- defer rc.Close()
- *b, *e = ioutil.ReadAll(rc)
- require.NoError(t, *e, n)
- assert.EqualValues(t, contents, *b)
- }
- wg.Add(2)
- go doRead(&b2, &e2, rc2, 2)
- go doRead(&b1, &e1, rc1, 1)
- wg.Wait()
-}
-
func BenchmarkMarkComplete(b *testing.B) {
const pieceSize = test_storage.DefaultPieceSize
const noTriggers = false
}
c := qt.New(b)
b.Run("CustomDirect", func(b *testing.B) {
- var opts NewDirectStorageOpts
+ var opts squirrel.NewCacheOpts
opts.Capacity = capacity
opts.NoTriggers = noTriggers
benchOpts := func(b *testing.B) {
directBench := func(b *testing.B) {
opts.Path = filepath.Join(b.TempDir(), "storage.db")
ci, err := NewDirectStorage(opts)
- var ujm UnexpectedJournalMode
+ var ujm squirrel.ErrUnexpectedJournalMode
if errors.As(err, &ujm) {
b.Skipf("setting journal mode %q: %v", opts.SetJournalMode, err)
}