20 "crawshaw.io/sqlite/sqlitex"
21 "github.com/anacrolix/missinggo/iter"
22 "github.com/anacrolix/missinggo/v2/resource"
23 "github.com/anacrolix/torrent/storage"
26 type conn = *sqlite.Conn
28 func initConn(conn conn, wal bool) error {
29 // Recursive triggers are required because we need to trim the blob_meta size after trimming to
30 // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
31 err := sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
35 err = sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
40 err = sqlitex.ExecTransient(conn, `pragma journal_mode=off`, nil)
45 err = sqlitex.ExecTransient(conn, `pragma mmap_size=1000000000000`, nil)
52 func initSchema(conn conn, pageSize int, triggers bool) error {
54 err := sqlitex.ExecScript(conn, fmt.Sprintf("pragma page_size=%d", pageSize))
59 err := sqlitex.ExecScript(conn, `
60 -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
61 -- can trim the database file size with partial vacuums without having to do a full vacuum, which
63 pragma auto_vacuum=incremental;
65 create table if not exists blob (
67 last_used timestamp default (datetime('now')),
72 create table if not exists blob_meta (
77 -- While sqlite *seems* to be faster to get sum(length(data)) instead of
78 -- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a
79 -- cold-cache. With this we can be assured that it doesn't.
80 insert or ignore into blob_meta values ('size', 0);
82 create table if not exists setting (
83 name primary key on conflict replace,
87 create view if not exists deletable_blob as
88 with recursive excess (
97 (select value from blob_meta where key='size') as usage_with,
100 length(cast(data as blob))
101 from blob order by last_used, rowid limit 1
103 where usage_with > (select value from setting where name='capacity')
106 usage_with-data_length as new_usage_with,
109 length(cast(data as blob))
110 from excess join blob
111 on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
112 where new_usage_with > (select value from setting where name='capacity')
114 select * from excess;
120 err := sqlitex.ExecScript(conn, `
121 create trigger if not exists after_insert_blob
124 update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
125 delete from blob where rowid in (select blob_rowid from deletable_blob);
128 create trigger if not exists after_update_blob
129 after update of data on blob
131 update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
132 delete from blob where rowid in (select blob_rowid from deletable_blob);
135 create trigger if not exists after_delete_blob
138 update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
148 type NewPiecesStorageOpts struct {
150 ProvOpts func(*ProviderOpts)
151 storage.ResourcePiecesOpts
154 // A convenience function that creates a connection pool, resource provider, and a pieces storage
155 // ClientImpl and returns them all with a Close attached.
156 func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
157 conns, provOpts, err := NewPool(opts.NewPoolOpts)
161 if f := opts.ProvOpts; f != nil {
164 prov, err := NewProvider(conns, provOpts)
169 store := storage.NewResourcePiecesOpts(prov, opts.ResourcePiecesOpts)
179 type NewPoolOpts struct {
183 // Forces WAL, disables shared caching.
184 ConcurrentBlobReads bool
187 // If non-zero, overrides the existing setting.
191 // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
192 // the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
193 // top-level option type.
194 type ProviderOpts struct {
196 // Concurrent blob reads require WAL.
197 ConcurrentBlobRead bool
201 // Remove any capacity limits.
202 func UnlimitCapacity(conn conn) error {
203 return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
206 // Set the capacity limit to exactly this value.
207 func SetCapacity(conn conn, cap int64) error {
208 return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
211 func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
212 if opts.NumConns == 0 {
213 opts.NumConns = runtime.NumCPU()
216 opts.Path = ":memory:"
218 values := make(url.Values)
219 if !opts.ConcurrentBlobReads {
220 values.Add("cache", "shared")
222 path := fmt.Sprintf("file:%s?%s", opts.Path, values.Encode())
223 conns, err := func() (ConnPool, error) {
224 switch opts.NumConns {
226 conn, err := sqlite.OpenConn(path, 0)
227 return &poolFromConn{conn: conn}, err
229 return sqlitex.Open(path, 0, opts.NumConns)
240 conn := conns.Get(context.TODO())
241 defer conns.Put(conn)
242 if !opts.DontInitSchema {
243 if opts.PageSize == 0 {
244 opts.PageSize = 1 << 14
246 err = initSchema(conn, opts.PageSize, true)
251 if opts.Capacity != 0 {
252 err = SetCapacity(conn, opts.Capacity)
257 return conns, ProviderOpts{
258 NumConns: opts.NumConns,
259 ConcurrentBlobRead: opts.ConcurrentBlobReads,
264 // Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
265 type poolFromConn struct {
270 func (me *poolFromConn) Get(ctx context.Context) conn {
275 func (me *poolFromConn) Put(conn conn) {
277 panic("expected to same conn")
282 func (me *poolFromConn) Close() error {
283 return me.conn.Close()
286 // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
287 // the ConnPool (since it has to initialize all the connections anyway).
288 func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
289 _, err = initPoolConns(context.TODO(), pool, opts.NumConns, true)
293 prov := &provider{pool: pool, opts: opts}
294 if opts.BatchWrites {
295 if opts.NumConns < 2 {
296 err = errors.New("batch writes requires more than 1 conn")
299 writes := make(chan writeRequest)
301 // This is retained for backwards compatibility. It may not be necessary.
302 runtime.SetFinalizer(prov, func(p *provider) {
305 go providerWriter(writes, prov.pool)
310 func initPoolConns(ctx context.Context, pool ConnPool, numConn int, wal bool) (numInited int, err error) {
313 for _, c := range conns {
317 for range iter.N(numConn) {
318 conn := pool.Get(ctx)
322 conns = append(conns, conn)
323 err = initConn(conn, wal)
325 err = fmt.Errorf("initing conn %v: %w", len(conns), err)
333 type ConnPool interface {
334 Get(context.Context) conn
339 type provider struct {
341 writes chan<- writeRequest
347 var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
349 func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written int64, err error) {
350 err = p.withConn(func(conn conn) error {
352 err = sqlitex.Exec(conn, `
355 cast(substr(name, ?+1) as integer) as offset
357 where name like ?||'%'
359 func(stmt *sqlite.Stmt) error {
360 offset := stmt.ColumnInt64(1)
361 if offset != written {
362 return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
364 r := stmt.ColumnReader(0)
365 w1, err := io.Copy(w, r)
377 func (me *provider) Close() error {
378 me.closed.Do(func() {
379 if me.writes != nil {
382 me.closeErr = me.pool.Close()
387 type writeRequest struct {
390 labels pprof.LabelSet
393 var expvars = expvar.NewMap("sqliteStorage")
395 func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
396 pprof.Do(context.Background(), labels, func(context.Context) {
402 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
403 // stronger typing on the writes channel.
404 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
405 conn := pool.Get(context.TODO())
411 first, ok := <-writes
418 defer sqlitex.Save(conn)(&cantFail)
419 firstErr := runQueryWithLabels(first.query, first.labels, conn)
420 buf = append(buf, func() { first.done <- firstErr })
423 case wr, ok := <-writes:
425 err := runQueryWithLabels(wr.query, wr.labels, conn)
426 buf = append(buf, func() { wr.done <- err })
434 // Not sure what to do if this failed.
436 expvars.Add("batchTransactionErrors", 1)
438 // Signal done after we know the transaction succeeded.
439 for _, done := range buf {
442 expvars.Add("batchTransactions", 1)
443 expvars.Add("batchedQueries", int64(len(buf)))
444 //log.Printf("batched %v write queries", len(buf))
448 func (p *provider) NewInstance(s string) (resource.Instance, error) {
449 return instance{s, p}, nil
452 type instance struct {
457 func getLabels(skip int) pprof.LabelSet {
458 return pprof.Labels("f", func() string {
460 runtime.Callers(skip+3, pcs[:])
461 fs := runtime.CallersFrames(pcs[:])
463 funcName := f.Func.Name()
464 funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
465 //log.Printf("func name: %q", funcName)
470 func (p *provider) withConn(with withConn, write bool, skip int) error {
471 if write && p.opts.BatchWrites {
472 done := make(chan error)
473 p.writes <- writeRequest{
476 labels: getLabels(skip + 1),
480 conn := p.pool.Get(context.TODO())
482 return errors.New("couldn't get pool conn")
484 defer p.pool.Put(conn)
485 return runQueryWithLabels(with, getLabels(skip+1), conn)
489 type withConn func(conn) error
491 func (i instance) withConn(with withConn, write bool) error {
492 return i.p.withConn(with, write, 1)
495 func (i instance) getConn() *sqlite.Conn {
496 return i.p.pool.Get(context.TODO())
499 func (i instance) putConn(conn *sqlite.Conn) {
503 func (i instance) Readdirnames() (names []string, err error) {
504 prefix := i.location + "/"
505 err = i.withConn(func(conn conn) error {
506 return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
507 names = append(names, stmt.ColumnText(0)[len(prefix):])
511 //log.Printf("readdir %q gave %q", i.location, names)
515 func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
517 err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
518 rowid = stmt.ColumnInt64(0)
529 err = errors.New("blob not found")
535 type connBlob struct {
540 func (me connBlob) Close() error {
541 err := me.Blob.Close()
546 func (i instance) Get() (ret io.ReadCloser, err error) {
549 panic("nil sqlite conn")
551 blob, err := i.openBlob(conn, false, true)
557 return connBlob{blob, func() {
558 once.Do(func() { i.putConn(conn) })
562 func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
563 rowid, err := i.getBlobRowid(conn)
567 // This seems to cause locking issues with in-memory databases. Is it something to do with not
570 err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
572 err = fmt.Errorf("updating last_used: %w", err)
575 if conn.Changes() != 1 {
576 panic(conn.Changes())
579 return conn.OpenBlob("main", "blob", "data", rowid, write)
582 func (i instance) PutSized(reader io.Reader, size int64) (err error) {
583 err = i.withConn(func(conn conn) error {
584 err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
590 blob, err := i.openBlob(conn, true, false)
595 _, err = io.Copy(blob, reader)
601 func (i instance) Put(reader io.Reader) (err error) {
603 _, err = io.Copy(&buf, reader)
608 return i.PutSized(&buf, int64(buf.Len()))
610 return i.withConn(func(conn conn) error {
611 for range iter.N(10) {
612 err = sqlitex.Exec(conn,
613 "insert or replace into blob(name, data) values(?, cast(? as blob))",
615 i.location, buf.Bytes())
616 if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
617 log.Print("sqlite busy")
618 time.Sleep(time.Second)
628 type fileInfo struct {
632 func (f fileInfo) Name() string {
633 panic("implement me")
636 func (f fileInfo) Size() int64 {
640 func (f fileInfo) Mode() os.FileMode {
641 panic("implement me")
644 func (f fileInfo) ModTime() time.Time {
645 panic("implement me")
648 func (f fileInfo) IsDir() bool {
649 panic("implement me")
652 func (f fileInfo) Sys() interface{} {
653 panic("implement me")
656 func (i instance) Stat() (ret os.FileInfo, err error) {
657 err = i.withConn(func(conn conn) error {
658 var blob *sqlite.Blob
659 blob, err = i.openBlob(conn, false, false)
664 ret = fileInfo{blob.Size()}
670 func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
671 err = i.withConn(func(conn conn) error {
673 var blob *sqlite.Blob
674 blob, err = i.openBlob(conn, false, true)
679 if off >= blob.Size() {
683 if off+int64(len(p)) > blob.Size() {
684 p = p[:blob.Size()-off]
686 n, err = blob.ReadAt(p, off)
691 "select substr(cast(data as blob), ?, ?) from blob where name=?",
692 func(stmt *sqlite.Stmt) error {
694 panic("found multiple matching blobs")
698 n = stmt.ColumnBytes(0, p)
701 off+1, len(p), i.location,
707 err = errors.New("blob not found")
719 func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
720 panic("implement me")
723 func (i instance) Delete() error {
724 return i.withConn(func(conn conn) error {
725 return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)