23 "crawshaw.io/sqlite/sqlitex"
24 "github.com/anacrolix/missinggo/iter"
25 "github.com/anacrolix/missinggo/v2/resource"
27 "github.com/anacrolix/torrent/storage"
30 type conn = *sqlite.Conn
32 type InitConnOpts struct {
35 MmapSizeOk bool // If false, a package-specific default will be used.
36 MmapSize int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
39 type UnexpectedJournalMode struct {
43 func (me UnexpectedJournalMode) Error() string {
44 return fmt.Sprintf("unexpected journal mode: %q", me.JournalMode)
47 func setSynchronous(conn conn, syncInt int) (err error) {
48 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma synchronous=%v`, syncInt), nil)
56 err = sqlitex.ExecTransient(conn, `pragma synchronous`, func(stmt *sqlite.Stmt) error {
57 actual = stmt.ColumnInt(0)
65 return errors.New("synchronous setting query didn't return anything")
67 if actual != syncInt {
68 return fmt.Errorf("set synchronous %q, got %q", syncInt, actual)
73 func initConn(conn conn, opts InitConnOpts) (err error) {
74 err = setSynchronous(conn, opts.SetSynchronous)
78 // Recursive triggers are required because we need to trim the blob_meta size after trimming to
79 // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
80 err = sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
84 if opts.SetJournalMode != "" {
85 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
86 ret := stmt.ColumnText(0)
87 if ret != opts.SetJournalMode {
88 return UnexpectedJournalMode{ret}
97 // Set the default. Currently it seems the library picks reasonable defaults, especially for
100 //opts.MmapSize = 1 << 24 // 8 MiB
102 if opts.MmapSize >= 0 {
103 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
111 func setPageSize(conn conn, pageSize int) error {
116 err := sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma page_size=%d`, pageSize), nil)
120 err = sqlitex.ExecTransient(conn, "pragma page_size", func(stmt *sqlite.Stmt) error {
121 retSize = stmt.ColumnInt64(0)
127 if retSize != int64(pageSize) {
128 return fmt.Errorf("requested page size %v but got %v", pageSize, retSize)
133 func InitSchema(conn conn, pageSize int, triggers bool) error {
134 err := setPageSize(conn, pageSize)
136 return fmt.Errorf("setting page size: %w", err)
138 err = sqlitex.ExecScript(conn, `
139 -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
140 -- can trim the database file size with partial vacuums without having to do a full vacuum, which
142 pragma auto_vacuum=incremental;
144 create table if not exists blob (
146 last_used timestamp default (datetime('now')),
152 create table if not exists blob_meta (
153 key text primary key,
157 create index if not exists blob_last_used on blob(last_used);
159 -- While sqlite *seems* to be faster to get sum(length(data)) instead of
160 -- sum(length(data)), it may still require a large table scan at start-up or with a
161 -- cold-cache. With this we can be assured that it doesn't.
162 insert or ignore into blob_meta values ('size', 0);
164 create table if not exists setting (
165 name primary key on conflict replace,
169 create view if not exists deletable_blob as
170 with recursive excess (
179 (select value from blob_meta where key='size') as usage_with,
183 from blob order by last_used, rowid limit 1
185 where usage_with > (select value from setting where name='capacity')
188 usage_with-data_length as new_usage_with,
192 from excess join blob
193 on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
194 where new_usage_with > (select value from setting where name='capacity')
196 select * from excess;
202 err := sqlitex.ExecScript(conn, `
203 create trigger if not exists after_insert_blob
206 update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
207 delete from blob where rowid in (select blob_rowid from deletable_blob);
210 create trigger if not exists after_update_blob
211 after update of data on blob
213 update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
214 delete from blob where rowid in (select blob_rowid from deletable_blob);
217 create trigger if not exists after_delete_blob
220 update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
230 type NewPiecesStorageOpts struct {
233 ProvOpts func(*ProviderOpts)
234 StorageOpts func(*storage.ResourcePiecesOpts)
237 type NewPoolOpts struct {
243 type InitDbOpts struct {
246 // If non-zero, overrides the existing setting.
251 // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
252 // the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
253 // top-level option type.
254 type PoolConf struct {
259 // Remove any capacity limits.
260 func UnlimitCapacity(conn conn) error {
261 return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
264 // Set the capacity limit to exactly this value.
265 func SetCapacity(conn conn, cap int64) error {
266 return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
269 type NewConnOpts struct {
270 // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
271 // private, temporary on-disk database will be created. This private database will be
272 // automatically deleted as soon as the database connection is closed."
275 // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
277 NoConcurrentBlobReads bool
280 func newOpenUri(opts NewConnOpts) string {
281 path := url.PathEscape(opts.Path)
285 values := make(url.Values)
286 if opts.NoConcurrentBlobReads || opts.Memory {
287 values.Add("cache", "shared")
289 return fmt.Sprintf("file:%s?%s", path, values.Encode())
292 func initDatabase(conn conn, opts InitDbOpts) (err error) {
293 if !opts.DontInitSchema {
294 err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
299 if opts.Capacity != 0 {
300 err = SetCapacity(conn, opts.Capacity)
308 func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
309 withPoolConn(pool, func(c conn) {
310 err = initDatabase(c, opts)
315 // Go fmt, why you so shit?
316 const openConnFlags = 0 |
317 sqlite.SQLITE_OPEN_READWRITE |
318 sqlite.SQLITE_OPEN_CREATE |
319 sqlite.SQLITE_OPEN_URI |
320 sqlite.SQLITE_OPEN_NOMUTEX
322 func newConn(opts NewConnOpts) (conn, error) {
323 return sqlite.OpenConn(newOpenUri(opts), openConnFlags)
326 type poolWithNumConns struct {
331 func (me poolWithNumConns) NumConns() int {
335 func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
336 if opts.NumConns == 0 {
337 opts.NumConns = runtime.NumCPU()
339 switch opts.NumConns {
341 conn, err := newConn(opts.NewConnOpts)
342 return &poolFromConn{conn: conn}, err
344 _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), openConnFlags, opts.NumConns)
345 return poolWithNumConns{_pool, opts.NumConns}, err
349 // Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
350 type poolFromConn struct {
355 func (me *poolFromConn) Get(ctx context.Context) conn {
360 func (me *poolFromConn) Put(conn conn) {
362 panic("expected to same conn")
367 func (me *poolFromConn) Close() error {
368 return me.conn.Close()
371 func (poolFromConn) NumConns() int { return 1 }
373 type ProviderOpts struct {
377 // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
378 // the ConnPool (since it has to initialize all the connections anyway).
379 func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
380 prov := &provider{pool: pool, opts: opts}
381 if opts.BatchWrites {
382 writes := make(chan writeRequest)
384 // This is retained for backwards compatibility. It may not be necessary.
385 runtime.SetFinalizer(prov, func(p *provider) {
388 go providerWriter(writes, prov.pool)
393 type InitPoolOpts struct {
398 func initPoolConns(ctx context.Context, pool ConnPool, opts InitConnOpts) (err error) {
401 for _, c := range conns {
405 for range iter.N(pool.NumConns()) {
406 conn := pool.Get(ctx)
410 conns = append(conns, conn)
411 err = initConn(conn, opts)
413 err = fmt.Errorf("initing conn %v: %w", len(conns), err)
420 type ConnPool interface {
421 Get(context.Context) conn
427 func withPoolConn(pool ConnPool, with func(conn)) {
428 c := pool.Get(context.TODO())
433 type provider struct {
435 writes chan<- writeRequest
442 var _ storage.ConsecutiveChunkReader = (*provider)(nil)
444 func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
446 runner, err := p.getReadWithConnRunner()
453 defer p.closeMu.RUnlock()
454 err = runner(func(_ context.Context, conn conn) error {
456 err = sqlitex.Exec(conn, `
459 cast(substr(name, ?+1) as integer) as offset
461 where name like ?||'%'
463 func(stmt *sqlite.Stmt) error {
464 offset := stmt.ColumnInt64(1)
465 if offset != written {
466 return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
468 // TODO: Avoid intermediate buffers here
469 r := stmt.ColumnReader(0)
470 w1, err := io.Copy(w, r)
479 w.CloseWithError(err)
484 func (me *provider) Close() error {
486 defer me.closeMu.Unlock()
490 if me.writes != nil {
493 me.closeErr = me.pool.Close()
498 type writeRequest struct {
501 labels pprof.LabelSet
504 var expvars = expvar.NewMap("sqliteStorage")
506 func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
507 pprof.Do(context.Background(), labels, func(ctx context.Context) {
508 // We pass in the context in the hope that the CPU profiler might incorporate sqlite
509 // activity the action that triggered it. It doesn't seem that way, since those calls don't
510 // take a context.Context themselves. It may come in useful in the goroutine profiles
511 // though, and doesn't hurt to expose it here for other purposes should things change.
512 err = query(ctx, conn)
517 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
518 // stronger typing on the writes channel.
519 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
520 conn := pool.Get(context.TODO())
526 first, ok := <-writes
533 defer sqlitex.Save(conn)(&cantFail)
534 firstErr := runQueryWithLabels(first.query, first.labels, conn)
535 buf = append(buf, func() { first.done <- firstErr })
538 case wr, ok := <-writes:
540 err := runQueryWithLabels(wr.query, wr.labels, conn)
541 buf = append(buf, func() { wr.done <- err })
549 // Not sure what to do if this failed.
551 expvars.Add("batchTransactionErrors", 1)
553 // Signal done after we know the transaction succeeded.
554 for _, done := range buf {
557 expvars.Add("batchTransactions", 1)
558 expvars.Add("batchedQueries", int64(len(buf)))
559 //log.Printf("batched %v write queries", len(buf))
563 func (p *provider) NewInstance(s string) (resource.Instance, error) {
564 return instance{s, p}, nil
567 type instance struct {
572 func getLabels(skip int) pprof.LabelSet {
573 return pprof.Labels("sqlite-storage-action", func() string {
575 runtime.Callers(skip+3, pcs[:])
576 fs := runtime.CallersFrames(pcs[:])
578 funcName := f.Func.Name()
579 funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
580 //log.Printf("func name: %q", funcName)
585 func (p *provider) withConn(with withConn, write bool, skip int) error {
587 // I think we need to check this here because it may not be valid to send to the writes channel
588 // if we're already closed. So don't try to move this check into getReadWithConnRunner.
591 return errors.New("closed")
593 if write && p.opts.BatchWrites {
594 done := make(chan error)
595 p.writes <- writeRequest{
598 labels: getLabels(skip + 1),
603 defer p.closeMu.RUnlock()
604 runner, err := p.getReadWithConnRunner()
612 // Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
613 // function, the runner *must* be used or the conn is leaked. You should check the provider isn't
614 // closed before using this.
615 func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
616 conn := p.pool.Get(context.TODO())
618 err = errors.New("couldn't get pool conn")
621 with = func(with withConn) error {
622 defer p.pool.Put(conn)
623 return runQueryWithLabels(with, getLabels(1), conn)
628 type withConn func(context.Context, conn) error
630 func (i instance) withConn(with withConn, write bool) error {
631 return i.p.withConn(with, write, 1)
634 func (i instance) getConn() *sqlite.Conn {
635 return i.p.pool.Get(context.TODO())
638 func (i instance) putConn(conn *sqlite.Conn) {
642 func (i instance) Readdirnames() (names []string, err error) {
643 prefix := i.location + "/"
644 err = i.withConn(func(_ context.Context, conn conn) error {
645 return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
646 names = append(names, stmt.ColumnText(0)[len(prefix):])
650 //log.Printf("readdir %q gave %q", i.location, names)
654 func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
656 err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
657 rowid = stmt.ColumnInt64(0)
668 err = errors.New("blob not found")
674 type connBlob struct {
679 func (me connBlob) Close() error {
680 err := me.Blob.Close()
685 func (i instance) Get() (ret io.ReadCloser, err error) {
688 panic("nil sqlite conn")
690 blob, err := i.openBlob(conn, false, true)
696 return connBlob{blob, func() {
697 once.Do(func() { i.putConn(conn) })
701 func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
702 rowid, err := i.getBlobRowid(conn)
706 // This seems to cause locking issues with in-memory databases. Is it something to do with not
709 err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
711 err = fmt.Errorf("updating last_used: %w", err)
714 if conn.Changes() != 1 {
715 panic(conn.Changes())
718 return conn.OpenBlob("main", "blob", "data", rowid, write)
721 func (i instance) PutSized(reader io.Reader, size int64) (err error) {
722 err = i.withConn(func(_ context.Context, conn conn) error {
723 err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
729 blob, err := i.openBlob(conn, true, false)
734 _, err = io.Copy(blob, reader)
740 func (i instance) Put(reader io.Reader) (err error) {
742 _, err = io.Copy(&buf, reader)
747 return i.PutSized(&buf, int64(buf.Len()))
749 return i.withConn(func(_ context.Context, conn conn) error {
750 for range iter.N(10) {
751 err = sqlitex.Exec(conn,
752 "insert or replace into blob(name, data) values(?, cast(? as blob))",
754 i.location, buf.Bytes())
755 if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
756 log.Print("sqlite busy")
757 time.Sleep(time.Second)
767 type fileInfo struct {
771 func (f fileInfo) Name() string {
772 panic("implement me")
775 func (f fileInfo) Size() int64 {
779 func (f fileInfo) Mode() os.FileMode {
780 panic("implement me")
783 func (f fileInfo) ModTime() time.Time {
784 panic("implement me")
787 func (f fileInfo) IsDir() bool {
788 panic("implement me")
791 func (f fileInfo) Sys() interface{} {
792 panic("implement me")
795 func (i instance) Stat() (ret os.FileInfo, err error) {
796 err = i.withConn(func(_ context.Context, conn conn) error {
797 var blob *sqlite.Blob
798 blob, err = i.openBlob(conn, false, false)
803 ret = fileInfo{blob.Size()}
809 func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
810 err = i.withConn(func(_ context.Context, conn conn) error {
812 var blob *sqlite.Blob
813 blob, err = i.openBlob(conn, false, true)
818 if off >= blob.Size() {
822 if off+int64(len(p)) > blob.Size() {
823 p = p[:blob.Size()-off]
825 n, err = blob.ReadAt(p, off)
830 "select substr(data, ?, ?) from blob where name=?",
831 func(stmt *sqlite.Stmt) error {
833 panic("found multiple matching blobs")
837 n = stmt.ColumnBytes(0, p)
840 off+1, len(p), i.location,
846 err = errors.New("blob not found")
858 func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
859 panic("implement me")
862 func (i instance) Delete() error {
863 return i.withConn(func(_ context.Context, conn conn) error {
864 return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)