20 "crawshaw.io/sqlite/sqlitex"
21 "github.com/anacrolix/missinggo/iter"
22 "github.com/anacrolix/missinggo/v2/resource"
23 "github.com/anacrolix/torrent/storage"
26 type conn = *sqlite.Conn
28 type InitConnOpts struct {
31 MmapSizeOk bool // If false, a package-specific default will be used.
32 MmapSize int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
35 func (me InitConnOpts) JournalMode() string {
36 if me.SetJournalMode != "" {
37 return me.SetJournalMode
42 var UnexpectedJournalMode = errors.New("unexpected journal mode")
44 func initConn(conn conn, opts InitConnOpts) (err error) {
45 // Recursive triggers are required because we need to trim the blob_meta size after trimming to
46 // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
47 err = sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
51 if opts.SetSynchronous != "" {
52 err = sqlitex.ExecTransient(conn, `pragma synchronous=`+opts.SetSynchronous, nil)
57 if opts.SetJournalMode != "" {
58 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
59 ret := stmt.ColumnText(0)
60 if ret != opts.SetJournalMode {
61 return UnexpectedJournalMode
70 // Set the default. Currently it seems the library picks reasonable defaults, especially for
73 //opts.MmapSize = 1 << 24 // 8 MiB
75 if opts.MmapSize >= 0 {
76 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
84 func InitSchema(conn conn, pageSize int, triggers bool) error {
86 err := sqlitex.ExecScript(conn, fmt.Sprintf("pragma page_size=%d", pageSize))
91 err := sqlitex.ExecScript(conn, `
92 -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
93 -- can trim the database file size with partial vacuums without having to do a full vacuum, which
95 pragma auto_vacuum=incremental;
97 create table if not exists blob (
99 last_used timestamp default (datetime('now')),
105 create table if not exists blob_meta (
106 key text primary key,
110 create index if not exists blob_last_used on blob(last_used);
112 -- While sqlite *seems* to be faster to get sum(length(data)) instead of
113 -- sum(length(data)), it may still require a large table scan at start-up or with a
114 -- cold-cache. With this we can be assured that it doesn't.
115 insert or ignore into blob_meta values ('size', 0);
117 create table if not exists setting (
118 name primary key on conflict replace,
122 create view if not exists deletable_blob as
123 with recursive excess (
132 (select value from blob_meta where key='size') as usage_with,
136 from blob order by last_used, rowid limit 1
138 where usage_with > (select value from setting where name='capacity')
141 usage_with-data_length as new_usage_with,
145 from excess join blob
146 on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
147 where new_usage_with > (select value from setting where name='capacity')
149 select * from excess;
155 err := sqlitex.ExecScript(conn, `
156 create trigger if not exists after_insert_blob
159 update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
160 delete from blob where rowid in (select blob_rowid from deletable_blob);
163 create trigger if not exists after_update_blob
164 after update of data on blob
166 update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
167 delete from blob where rowid in (select blob_rowid from deletable_blob);
170 create trigger if not exists after_delete_blob
173 update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
183 type NewPiecesStorageOpts struct {
186 ProvOpts func(*ProviderOpts)
187 StorageOpts func(*storage.ResourcePiecesOpts)
190 // A convenience function that creates a connection pool, resource provider, and a pieces storage
191 // ClientImpl and returns them all with a Close attached.
192 func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
193 conns, err := NewPool(opts.NewPoolOpts)
197 err = initPoolDatabase(conns, opts.InitDbOpts)
201 provOpts := ProviderOpts{
202 BatchWrites: conns.NumConns() > 1,
204 if f := opts.ProvOpts; f != nil {
207 prov, err := NewProvider(conns, provOpts)
215 withPoolConn(conns, func(c conn) {
216 err = sqlitex.Exec(c, "pragma journal_mode", func(stmt *sqlite.Stmt) error {
217 journalMode = stmt.ColumnText(0)
222 err = fmt.Errorf("getting journal mode: %w", err)
226 if journalMode == "" {
227 err = errors.New("didn't get journal mode")
231 storageOpts := storage.ResourcePiecesOpts{
232 NoSizedPuts: journalMode != "wal" || conns.NumConns() == 1,
234 if f := opts.StorageOpts; f != nil {
237 store := storage.NewResourcePiecesOpts(prov, storageOpts)
247 type NewPoolOpts struct {
253 type InitDbOpts struct {
256 // If non-zero, overrides the existing setting.
261 // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
262 // the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
263 // top-level option type.
264 type PoolConf struct {
269 // Remove any capacity limits.
270 func UnlimitCapacity(conn conn) error {
271 return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
274 // Set the capacity limit to exactly this value.
275 func SetCapacity(conn conn, cap int64) error {
276 return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
279 type NewConnOpts struct {
280 // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
281 // private, temporary on-disk database will be created. This private database will be
282 // automatically deleted as soon as the database connection is closed."
285 // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
287 NoConcurrentBlobReads bool
290 func newOpenUri(opts NewConnOpts) string {
291 path := url.PathEscape(opts.Path)
295 values := make(url.Values)
296 if opts.NoConcurrentBlobReads || opts.Memory {
297 values.Add("cache", "shared")
299 return fmt.Sprintf("file:%s?%s", path, values.Encode())
302 func initDatabase(conn conn, opts InitDbOpts) (err error) {
303 if !opts.DontInitSchema {
304 if opts.PageSize == 0 {
305 // There doesn't seem to be an optimal size. I did try with the standard chunk size, but
306 // the difference is not convincing.
308 //opts.PageSize = 1 << 14
310 err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
315 if opts.Capacity != 0 {
316 err = SetCapacity(conn, opts.Capacity)
324 func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
325 withPoolConn(pool, func(c conn) {
326 err = initDatabase(c, opts)
331 func newConn(opts NewConnOpts) (conn, error) {
332 return sqlite.OpenConn(newOpenUri(opts), 0)
335 type poolWithNumConns struct {
340 func (me poolWithNumConns) NumConns() int {
344 func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
345 if opts.NumConns == 0 {
346 opts.NumConns = runtime.NumCPU()
348 conns, err := func() (ConnPool, error) {
349 switch opts.NumConns {
351 conn, err := newConn(opts.NewConnOpts)
352 return &poolFromConn{conn: conn}, err
354 _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), 0, opts.NumConns)
355 return poolWithNumConns{_pool, opts.NumConns}, err
366 return conns, initPoolConns(nil, conns, InitPoolOpts{
367 NumConns: opts.NumConns,
368 InitConnOpts: opts.InitConnOpts,
372 // Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
373 type poolFromConn struct {
378 func (me *poolFromConn) Get(ctx context.Context) conn {
383 func (me *poolFromConn) Put(conn conn) {
385 panic("expected to same conn")
390 func (me *poolFromConn) Close() error {
391 return me.conn.Close()
394 func (poolFromConn) NumConns() int { return 1 }
396 type ProviderOpts struct {
400 // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
401 // the ConnPool (since it has to initialize all the connections anyway).
402 func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
403 prov := &provider{pool: pool, opts: opts}
404 if opts.BatchWrites {
405 writes := make(chan writeRequest)
407 // This is retained for backwards compatibility. It may not be necessary.
408 runtime.SetFinalizer(prov, func(p *provider) {
411 go providerWriter(writes, prov.pool)
416 type InitPoolOpts struct {
421 func initPoolConns(ctx context.Context, pool ConnPool, opts InitPoolOpts) (err error) {
424 for _, c := range conns {
428 for range iter.N(opts.NumConns) {
429 conn := pool.Get(ctx)
433 conns = append(conns, conn)
434 err = initConn(conn, opts.InitConnOpts)
436 err = fmt.Errorf("initing conn %v: %w", len(conns), err)
443 type ConnPool interface {
444 Get(context.Context) conn
450 func withPoolConn(pool ConnPool, with func(conn)) {
456 type provider struct {
458 writes chan<- writeRequest
465 var _ storage.ConsecutiveChunkReader = (*provider)(nil)
467 func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
469 runner, err := p.getReadWithConnRunner()
476 defer p.closeMu.RUnlock()
477 err = runner(func(_ context.Context, conn conn) error {
479 err = sqlitex.Exec(conn, `
482 cast(substr(name, ?+1) as integer) as offset
484 where name like ?||'%'
486 func(stmt *sqlite.Stmt) error {
487 offset := stmt.ColumnInt64(1)
488 if offset != written {
489 return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
491 // TODO: Avoid intermediate buffers here
492 r := stmt.ColumnReader(0)
493 w1, err := io.Copy(w, r)
502 w.CloseWithError(err)
507 func (me *provider) Close() error {
509 defer me.closeMu.Unlock()
513 if me.writes != nil {
516 me.closeErr = me.pool.Close()
521 type writeRequest struct {
524 labels pprof.LabelSet
527 var expvars = expvar.NewMap("sqliteStorage")
529 func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
530 pprof.Do(context.Background(), labels, func(ctx context.Context) {
531 // We pass in the context in the hope that the CPU profiler might incorporate sqlite
532 // activity the action that triggered it. It doesn't seem that way, since those calls don't
533 // take a context.Context themselves. It may come in useful in the goroutine profiles
534 // though, and doesn't hurt to expose it here for other purposes should things change.
535 err = query(ctx, conn)
540 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
541 // stronger typing on the writes channel.
542 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
543 conn := pool.Get(context.TODO())
549 first, ok := <-writes
556 defer sqlitex.Save(conn)(&cantFail)
557 firstErr := runQueryWithLabels(first.query, first.labels, conn)
558 buf = append(buf, func() { first.done <- firstErr })
561 case wr, ok := <-writes:
563 err := runQueryWithLabels(wr.query, wr.labels, conn)
564 buf = append(buf, func() { wr.done <- err })
572 // Not sure what to do if this failed.
574 expvars.Add("batchTransactionErrors", 1)
576 // Signal done after we know the transaction succeeded.
577 for _, done := range buf {
580 expvars.Add("batchTransactions", 1)
581 expvars.Add("batchedQueries", int64(len(buf)))
582 //log.Printf("batched %v write queries", len(buf))
586 func (p *provider) NewInstance(s string) (resource.Instance, error) {
587 return instance{s, p}, nil
590 type instance struct {
595 func getLabels(skip int) pprof.LabelSet {
596 return pprof.Labels("sqlite-storage-action", func() string {
598 runtime.Callers(skip+3, pcs[:])
599 fs := runtime.CallersFrames(pcs[:])
601 funcName := f.Func.Name()
602 funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
603 //log.Printf("func name: %q", funcName)
608 func (p *provider) withConn(with withConn, write bool, skip int) error {
610 // I think we need to check this here because it may not be valid to send to the writes channel
611 // if we're already closed. So don't try to move this check into getReadWithConnRunner.
614 return errors.New("closed")
616 if write && p.opts.BatchWrites {
617 done := make(chan error)
618 p.writes <- writeRequest{
621 labels: getLabels(skip + 1),
626 defer p.closeMu.RUnlock()
627 runner, err := p.getReadWithConnRunner()
635 // Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
636 // function, the runner *must* be used or the conn is leaked. You should check the provider isn't
637 // closed before using this.
638 func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
639 conn := p.pool.Get(context.TODO())
641 err = errors.New("couldn't get pool conn")
644 with = func(with withConn) error {
645 defer p.pool.Put(conn)
646 return runQueryWithLabels(with, getLabels(1), conn)
651 type withConn func(context.Context, conn) error
653 func (i instance) withConn(with withConn, write bool) error {
654 return i.p.withConn(with, write, 1)
657 func (i instance) getConn() *sqlite.Conn {
658 return i.p.pool.Get(context.TODO())
661 func (i instance) putConn(conn *sqlite.Conn) {
665 func (i instance) Readdirnames() (names []string, err error) {
666 prefix := i.location + "/"
667 err = i.withConn(func(_ context.Context, conn conn) error {
668 return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
669 names = append(names, stmt.ColumnText(0)[len(prefix):])
673 //log.Printf("readdir %q gave %q", i.location, names)
677 func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
679 err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
680 rowid = stmt.ColumnInt64(0)
691 err = errors.New("blob not found")
697 type connBlob struct {
702 func (me connBlob) Close() error {
703 err := me.Blob.Close()
708 func (i instance) Get() (ret io.ReadCloser, err error) {
711 panic("nil sqlite conn")
713 blob, err := i.openBlob(conn, false, true)
719 return connBlob{blob, func() {
720 once.Do(func() { i.putConn(conn) })
724 func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
725 rowid, err := i.getBlobRowid(conn)
729 // This seems to cause locking issues with in-memory databases. Is it something to do with not
732 err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
734 err = fmt.Errorf("updating last_used: %w", err)
737 if conn.Changes() != 1 {
738 panic(conn.Changes())
741 return conn.OpenBlob("main", "blob", "data", rowid, write)
744 func (i instance) PutSized(reader io.Reader, size int64) (err error) {
745 err = i.withConn(func(_ context.Context, conn conn) error {
746 err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
752 blob, err := i.openBlob(conn, true, false)
757 _, err = io.Copy(blob, reader)
763 func (i instance) Put(reader io.Reader) (err error) {
765 _, err = io.Copy(&buf, reader)
770 return i.PutSized(&buf, int64(buf.Len()))
772 return i.withConn(func(_ context.Context, conn conn) error {
773 for range iter.N(10) {
774 err = sqlitex.Exec(conn,
775 "insert or replace into blob(name, data) values(?, cast(? as blob))",
777 i.location, buf.Bytes())
778 if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
779 log.Print("sqlite busy")
780 time.Sleep(time.Second)
790 type fileInfo struct {
794 func (f fileInfo) Name() string {
795 panic("implement me")
798 func (f fileInfo) Size() int64 {
802 func (f fileInfo) Mode() os.FileMode {
803 panic("implement me")
806 func (f fileInfo) ModTime() time.Time {
807 panic("implement me")
810 func (f fileInfo) IsDir() bool {
811 panic("implement me")
814 func (f fileInfo) Sys() interface{} {
815 panic("implement me")
818 func (i instance) Stat() (ret os.FileInfo, err error) {
819 err = i.withConn(func(_ context.Context, conn conn) error {
820 var blob *sqlite.Blob
821 blob, err = i.openBlob(conn, false, false)
826 ret = fileInfo{blob.Size()}
832 func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
833 err = i.withConn(func(_ context.Context, conn conn) error {
835 var blob *sqlite.Blob
836 blob, err = i.openBlob(conn, false, true)
841 if off >= blob.Size() {
845 if off+int64(len(p)) > blob.Size() {
846 p = p[:blob.Size()-off]
848 n, err = blob.ReadAt(p, off)
853 "select substr(data, ?, ?) from blob where name=?",
854 func(stmt *sqlite.Stmt) error {
856 panic("found multiple matching blobs")
860 n = stmt.ColumnBytes(0, p)
863 off+1, len(p), i.location,
869 err = errors.New("blob not found")
881 func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
882 panic("implement me")
885 func (i instance) Delete() error {
886 return i.withConn(func(_ context.Context, conn conn) error {
887 return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)