]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/sqlite-storage.go
Expose SetSynchronous as an option
[btrtrc.git] / storage / sqlite / sqlite-storage.go
1 package sqliteStorage
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "expvar"
8         "fmt"
9         "io"
10         "log"
11         "net/url"
12         "os"
13         "runtime"
14         "runtime/pprof"
15         "strings"
16         "sync"
17         "time"
18
19         "crawshaw.io/sqlite"
20         "crawshaw.io/sqlite/sqlitex"
21         "github.com/anacrolix/missinggo/iter"
22         "github.com/anacrolix/missinggo/v2/resource"
23         "github.com/anacrolix/torrent/storage"
24 )
25
26 type conn = *sqlite.Conn
27
28 type InitConnOpts struct {
29         SetSynchronous string
30         SetJournalMode string
31         MmapSizeOk     bool  // If false, a package-specific default will be used.
32         MmapSize       int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
33 }
34
35 func (me InitConnOpts) JournalMode() string {
36         if me.SetJournalMode != "" {
37                 return me.SetJournalMode
38         }
39         return "wal"
40 }
41
42 var UnexpectedJournalMode = errors.New("unexpected journal mode")
43
44 func initConn(conn conn, opts InitConnOpts) (err error) {
45         // Recursive triggers are required because we need to trim the blob_meta size after trimming to
46         // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
47         err = sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
48         if err != nil {
49                 return err
50         }
51         if opts.SetSynchronous != "" {
52                 err = sqlitex.ExecTransient(conn, `pragma synchronous=`+opts.SetSynchronous, nil)
53                 if err != nil {
54                         return err
55                 }
56         }
57         if opts.SetJournalMode != "" {
58                 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
59                         ret := stmt.ColumnText(0)
60                         if ret != opts.SetJournalMode {
61                                 return UnexpectedJournalMode
62                         }
63                         return nil
64                 })
65                 if err != nil {
66                         return err
67                 }
68         }
69         if !opts.MmapSizeOk {
70                 // Set the default. Currently it seems the library picks reasonable defaults, especially for
71                 // wal.
72                 opts.MmapSize = -1
73                 //opts.MmapSize = 1 << 24 // 8 MiB
74         }
75         if opts.MmapSize >= 0 {
76                 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
77                 if err != nil {
78                         return err
79                 }
80         }
81         return nil
82 }
83
84 func InitSchema(conn conn, pageSize int, triggers bool) error {
85         if pageSize != 0 {
86                 err := sqlitex.ExecScript(conn, fmt.Sprintf("pragma page_size=%d", pageSize))
87                 if err != nil {
88                         return err
89                 }
90         }
91         err := sqlitex.ExecScript(conn, `
92                 -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
93                 -- can trim the database file size with partial vacuums without having to do a full vacuum, which 
94                 -- locks everything.
95                 pragma auto_vacuum=incremental;
96                 
97                 create table if not exists blob (
98                         name text,
99                         last_used timestamp default (datetime('now')),
100                         data blob,
101                         verified bool,
102                         primary key (name)
103                 );
104                 
105                 create table if not exists blob_meta (
106                         key text primary key,
107                         value
108                 );
109
110                 create index if not exists blob_last_used on blob(last_used);
111                 
112                 -- While sqlite *seems* to be faster to get sum(length(data)) instead of 
113                 -- sum(length(data)), it may still require a large table scan at start-up or with a 
114                 -- cold-cache. With this we can be assured that it doesn't.
115                 insert or ignore into blob_meta values ('size', 0);
116                 
117                 create table if not exists setting (
118                         name primary key on conflict replace,
119                         value
120                 );
121         
122                 create view if not exists deletable_blob as
123                 with recursive excess (
124                         usage_with,
125                         last_used,
126                         blob_rowid,
127                         data_length
128                 ) as (
129                         select * 
130                         from (
131                                 select 
132                                         (select value from blob_meta where key='size') as usage_with,
133                                         last_used,
134                                         rowid,
135                                         length(data)
136                                 from blob order by last_used, rowid limit 1
137                         )
138                         where usage_with > (select value from setting where name='capacity')
139                         union all
140                         select 
141                                 usage_with-data_length as new_usage_with,
142                                 blob.last_used,
143                                 blob.rowid,
144                                 length(data)
145                         from excess join blob
146                         on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
147                         where new_usage_with > (select value from setting where name='capacity')
148                 )
149                 select * from excess;
150         `)
151         if err != nil {
152                 return err
153         }
154         if triggers {
155                 err := sqlitex.ExecScript(conn, `
156                         create trigger if not exists after_insert_blob
157                         after insert on blob
158                         begin
159                                 update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
160                                 delete from blob where rowid in (select blob_rowid from deletable_blob);
161                         end;
162                         
163                         create trigger if not exists after_update_blob
164                         after update of data on blob
165                         begin
166                                 update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
167                                 delete from blob where rowid in (select blob_rowid from deletable_blob);
168                         end;
169                         
170                         create trigger if not exists after_delete_blob
171                         after delete on blob
172                         begin
173                                 update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
174                         end;
175                 `)
176                 if err != nil {
177                         return err
178                 }
179         }
180         return nil
181 }
182
183 type NewPiecesStorageOpts struct {
184         NewPoolOpts
185         InitDbOpts
186         ProvOpts    func(*ProviderOpts)
187         StorageOpts func(*storage.ResourcePiecesOpts)
188 }
189
190 // A convenience function that creates a connection pool, resource provider, and a pieces storage
191 // ClientImpl and returns them all with a Close attached.
192 func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
193         conns, err := NewPool(opts.NewPoolOpts)
194         if err != nil {
195                 return
196         }
197         err = initPoolDatabase(conns, opts.InitDbOpts)
198         if err != nil {
199                 return
200         }
201         provOpts := ProviderOpts{
202                 BatchWrites: conns.NumConns() > 1,
203         }
204         if f := opts.ProvOpts; f != nil {
205                 f(&provOpts)
206         }
207         prov, err := NewProvider(conns, provOpts)
208         if err != nil {
209                 conns.Close()
210                 return
211         }
212         var (
213                 journalMode string
214         )
215         withPoolConn(conns, func(c conn) {
216                 err = sqlitex.Exec(c, "pragma journal_mode", func(stmt *sqlite.Stmt) error {
217                         journalMode = stmt.ColumnText(0)
218                         return nil
219                 })
220         })
221         if err != nil {
222                 err = fmt.Errorf("getting journal mode: %w", err)
223                 prov.Close()
224                 return
225         }
226         if journalMode == "" {
227                 err = errors.New("didn't get journal mode")
228                 prov.Close()
229                 return
230         }
231         storageOpts := storage.ResourcePiecesOpts{
232                 NoSizedPuts: journalMode != "wal" || conns.NumConns() == 1,
233         }
234         if f := opts.StorageOpts; f != nil {
235                 f(&storageOpts)
236         }
237         store := storage.NewResourcePiecesOpts(prov, storageOpts)
238         return struct {
239                 storage.ClientImpl
240                 io.Closer
241         }{
242                 store,
243                 prov,
244         }, nil
245 }
246
247 type NewPoolOpts struct {
248         NewConnOpts
249         InitConnOpts
250         NumConns int
251 }
252
253 type InitDbOpts struct {
254         DontInitSchema bool
255         PageSize       int
256         // If non-zero, overrides the existing setting.
257         Capacity   int64
258         NoTriggers bool
259 }
260
261 // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
262 // the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
263 // top-level option type.
264 type PoolConf struct {
265         NumConns    int
266         JournalMode string
267 }
268
269 // Remove any capacity limits.
270 func UnlimitCapacity(conn conn) error {
271         return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
272 }
273
274 // Set the capacity limit to exactly this value.
275 func SetCapacity(conn conn, cap int64) error {
276         return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
277 }
278
279 type NewConnOpts struct {
280         // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
281         // private, temporary on-disk database will be created. This private database will be
282         // automatically deleted as soon as the database connection is closed."
283         Path   string
284         Memory bool
285         // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
286         // and NumConns < 2.
287         NoConcurrentBlobReads bool
288 }
289
290 func newOpenUri(opts NewConnOpts) string {
291         path := url.PathEscape(opts.Path)
292         if opts.Memory {
293                 path = ":memory:"
294         }
295         values := make(url.Values)
296         if opts.NoConcurrentBlobReads || opts.Memory {
297                 values.Add("cache", "shared")
298         }
299         return fmt.Sprintf("file:%s?%s", path, values.Encode())
300 }
301
302 func initDatabase(conn conn, opts InitDbOpts) (err error) {
303         if !opts.DontInitSchema {
304                 if opts.PageSize == 0 {
305                         // There doesn't seem to be an optimal size. I did try with the standard chunk size, but
306                         // the difference is not convincing.
307
308                         //opts.PageSize = 1 << 14
309                 }
310                 err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
311                 if err != nil {
312                         return
313                 }
314         }
315         if opts.Capacity != 0 {
316                 err = SetCapacity(conn, opts.Capacity)
317                 if err != nil {
318                         return
319                 }
320         }
321         return
322 }
323
324 func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
325         withPoolConn(pool, func(c conn) {
326                 err = initDatabase(c, opts)
327         })
328         return
329 }
330
331 func newConn(opts NewConnOpts) (conn, error) {
332         return sqlite.OpenConn(newOpenUri(opts), 0)
333 }
334
335 type poolWithNumConns struct {
336         *sqlitex.Pool
337         numConns int
338 }
339
340 func (me poolWithNumConns) NumConns() int {
341         return me.numConns
342 }
343
344 func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
345         if opts.NumConns == 0 {
346                 opts.NumConns = runtime.NumCPU()
347         }
348         conns, err := func() (ConnPool, error) {
349                 switch opts.NumConns {
350                 case 1:
351                         conn, err := newConn(opts.NewConnOpts)
352                         return &poolFromConn{conn: conn}, err
353                 default:
354                         _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), 0, opts.NumConns)
355                         return poolWithNumConns{_pool, opts.NumConns}, err
356                 }
357         }()
358         if err != nil {
359                 return
360         }
361         defer func() {
362                 if err != nil {
363                         conns.Close()
364                 }
365         }()
366         return conns, initPoolConns(nil, conns, InitPoolOpts{
367                 NumConns:     opts.NumConns,
368                 InitConnOpts: opts.InitConnOpts,
369         })
370 }
371
372 // Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
373 type poolFromConn struct {
374         mu   sync.Mutex
375         conn conn
376 }
377
378 func (me *poolFromConn) Get(ctx context.Context) conn {
379         me.mu.Lock()
380         return me.conn
381 }
382
383 func (me *poolFromConn) Put(conn conn) {
384         if conn != me.conn {
385                 panic("expected to same conn")
386         }
387         me.mu.Unlock()
388 }
389
390 func (me *poolFromConn) Close() error {
391         return me.conn.Close()
392 }
393
394 func (poolFromConn) NumConns() int { return 1 }
395
396 type ProviderOpts struct {
397         BatchWrites bool
398 }
399
400 // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
401 // the ConnPool (since it has to initialize all the connections anyway).
402 func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
403         prov := &provider{pool: pool, opts: opts}
404         if opts.BatchWrites {
405                 writes := make(chan writeRequest)
406                 prov.writes = writes
407                 // This is retained for backwards compatibility. It may not be necessary.
408                 runtime.SetFinalizer(prov, func(p *provider) {
409                         p.Close()
410                 })
411                 go providerWriter(writes, prov.pool)
412         }
413         return prov, nil
414 }
415
416 type InitPoolOpts struct {
417         NumConns int
418         InitConnOpts
419 }
420
421 func initPoolConns(ctx context.Context, pool ConnPool, opts InitPoolOpts) (err error) {
422         var conns []conn
423         defer func() {
424                 for _, c := range conns {
425                         pool.Put(c)
426                 }
427         }()
428         for range iter.N(opts.NumConns) {
429                 conn := pool.Get(ctx)
430                 if conn == nil {
431                         break
432                 }
433                 conns = append(conns, conn)
434                 err = initConn(conn, opts.InitConnOpts)
435                 if err != nil {
436                         err = fmt.Errorf("initing conn %v: %w", len(conns), err)
437                         return
438                 }
439         }
440         return
441 }
442
443 type ConnPool interface {
444         Get(context.Context) conn
445         Put(conn)
446         Close() error
447         NumConns() int
448 }
449
450 func withPoolConn(pool ConnPool, with func(conn)) {
451         c := pool.Get(nil)
452         defer pool.Put(c)
453         with(c)
454 }
455
456 type provider struct {
457         pool     ConnPool
458         writes   chan<- writeRequest
459         opts     ProviderOpts
460         closeMu  sync.RWMutex
461         closed   bool
462         closeErr error
463 }
464
465 var _ storage.ConsecutiveChunkReader = (*provider)(nil)
466
467 func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
468         p.closeMu.RLock()
469         runner, err := p.getReadWithConnRunner()
470         if err != nil {
471                 p.closeMu.RUnlock()
472                 return nil, err
473         }
474         r, w := io.Pipe()
475         go func() {
476                 defer p.closeMu.RUnlock()
477                 err = runner(func(_ context.Context, conn conn) error {
478                         var written int64
479                         err = sqlitex.Exec(conn, `
480                                 select
481                                         data,
482                                         cast(substr(name, ?+1) as integer) as offset
483                                 from blob
484                                 where name like ?||'%'
485                                 order by offset`,
486                                 func(stmt *sqlite.Stmt) error {
487                                         offset := stmt.ColumnInt64(1)
488                                         if offset != written {
489                                                 return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
490                                         }
491                                         // TODO: Avoid intermediate buffers here
492                                         r := stmt.ColumnReader(0)
493                                         w1, err := io.Copy(w, r)
494                                         written += w1
495                                         return err
496                                 },
497                                 len(prefix),
498                                 prefix,
499                         )
500                         return err
501                 })
502                 w.CloseWithError(err)
503         }()
504         return r, nil
505 }
506
507 func (me *provider) Close() error {
508         me.closeMu.Lock()
509         defer me.closeMu.Unlock()
510         if me.closed {
511                 return me.closeErr
512         }
513         if me.writes != nil {
514                 close(me.writes)
515         }
516         me.closeErr = me.pool.Close()
517         me.closed = true
518         return me.closeErr
519 }
520
521 type writeRequest struct {
522         query  withConn
523         done   chan<- error
524         labels pprof.LabelSet
525 }
526
527 var expvars = expvar.NewMap("sqliteStorage")
528
529 func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
530         pprof.Do(context.Background(), labels, func(ctx context.Context) {
531                 // We pass in the context in the hope that the CPU profiler might incorporate sqlite
532                 // activity the action that triggered it. It doesn't seem that way, since those calls don't
533                 // take a context.Context themselves. It may come in useful in the goroutine profiles
534                 // though, and doesn't hurt to expose it here for other purposes should things change.
535                 err = query(ctx, conn)
536         })
537         return
538 }
539
540 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
541 // stronger typing on the writes channel.
542 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
543         conn := pool.Get(context.TODO())
544         if conn == nil {
545                 return
546         }
547         defer pool.Put(conn)
548         for {
549                 first, ok := <-writes
550                 if !ok {
551                         return
552                 }
553                 var buf []func()
554                 var cantFail error
555                 func() {
556                         defer sqlitex.Save(conn)(&cantFail)
557                         firstErr := runQueryWithLabels(first.query, first.labels, conn)
558                         buf = append(buf, func() { first.done <- firstErr })
559                         for {
560                                 select {
561                                 case wr, ok := <-writes:
562                                         if ok {
563                                                 err := runQueryWithLabels(wr.query, wr.labels, conn)
564                                                 buf = append(buf, func() { wr.done <- err })
565                                                 continue
566                                         }
567                                 default:
568                                 }
569                                 break
570                         }
571                 }()
572                 // Not sure what to do if this failed.
573                 if cantFail != nil {
574                         expvars.Add("batchTransactionErrors", 1)
575                 }
576                 // Signal done after we know the transaction succeeded.
577                 for _, done := range buf {
578                         done()
579                 }
580                 expvars.Add("batchTransactions", 1)
581                 expvars.Add("batchedQueries", int64(len(buf)))
582                 //log.Printf("batched %v write queries", len(buf))
583         }
584 }
585
586 func (p *provider) NewInstance(s string) (resource.Instance, error) {
587         return instance{s, p}, nil
588 }
589
590 type instance struct {
591         location string
592         p        *provider
593 }
594
595 func getLabels(skip int) pprof.LabelSet {
596         return pprof.Labels("sqlite-storage-action", func() string {
597                 var pcs [8]uintptr
598                 runtime.Callers(skip+3, pcs[:])
599                 fs := runtime.CallersFrames(pcs[:])
600                 f, _ := fs.Next()
601                 funcName := f.Func.Name()
602                 funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
603                 //log.Printf("func name: %q", funcName)
604                 return funcName
605         }())
606 }
607
608 func (p *provider) withConn(with withConn, write bool, skip int) error {
609         p.closeMu.RLock()
610         // I think we need to check this here because it may not be valid to send to the writes channel
611         // if we're already closed. So don't try to move this check into getReadWithConnRunner.
612         if p.closed {
613                 p.closeMu.RUnlock()
614                 return errors.New("closed")
615         }
616         if write && p.opts.BatchWrites {
617                 done := make(chan error)
618                 p.writes <- writeRequest{
619                         query:  with,
620                         done:   done,
621                         labels: getLabels(skip + 1),
622                 }
623                 p.closeMu.RUnlock()
624                 return <-done
625         } else {
626                 defer p.closeMu.RUnlock()
627                 runner, err := p.getReadWithConnRunner()
628                 if err != nil {
629                         return err
630                 }
631                 return runner(with)
632         }
633 }
634
635 // Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
636 // function, the runner *must* be used or the conn is leaked. You should check the provider isn't
637 // closed before using this.
638 func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
639         conn := p.pool.Get(context.TODO())
640         if conn == nil {
641                 err = errors.New("couldn't get pool conn")
642                 return
643         }
644         with = func(with withConn) error {
645                 defer p.pool.Put(conn)
646                 return runQueryWithLabels(with, getLabels(1), conn)
647         }
648         return
649 }
650
651 type withConn func(context.Context, conn) error
652
653 func (i instance) withConn(with withConn, write bool) error {
654         return i.p.withConn(with, write, 1)
655 }
656
657 func (i instance) getConn() *sqlite.Conn {
658         return i.p.pool.Get(context.TODO())
659 }
660
661 func (i instance) putConn(conn *sqlite.Conn) {
662         i.p.pool.Put(conn)
663 }
664
665 func (i instance) Readdirnames() (names []string, err error) {
666         prefix := i.location + "/"
667         err = i.withConn(func(_ context.Context, conn conn) error {
668                 return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
669                         names = append(names, stmt.ColumnText(0)[len(prefix):])
670                         return nil
671                 }, prefix+"%")
672         }, false)
673         //log.Printf("readdir %q gave %q", i.location, names)
674         return
675 }
676
677 func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
678         rows := 0
679         err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
680                 rowid = stmt.ColumnInt64(0)
681                 rows++
682                 return nil
683         }, i.location)
684         if err != nil {
685                 return
686         }
687         if rows == 1 {
688                 return
689         }
690         if rows == 0 {
691                 err = errors.New("blob not found")
692                 return
693         }
694         panic(rows)
695 }
696
697 type connBlob struct {
698         *sqlite.Blob
699         onClose func()
700 }
701
702 func (me connBlob) Close() error {
703         err := me.Blob.Close()
704         me.onClose()
705         return err
706 }
707
708 func (i instance) Get() (ret io.ReadCloser, err error) {
709         conn := i.getConn()
710         if conn == nil {
711                 panic("nil sqlite conn")
712         }
713         blob, err := i.openBlob(conn, false, true)
714         if err != nil {
715                 i.putConn(conn)
716                 return
717         }
718         var once sync.Once
719         return connBlob{blob, func() {
720                 once.Do(func() { i.putConn(conn) })
721         }}, nil
722 }
723
724 func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
725         rowid, err := i.getBlobRowid(conn)
726         if err != nil {
727                 return nil, err
728         }
729         // This seems to cause locking issues with in-memory databases. Is it something to do with not
730         // having WAL?
731         if updateAccess {
732                 err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
733                 if err != nil {
734                         err = fmt.Errorf("updating last_used: %w", err)
735                         return nil, err
736                 }
737                 if conn.Changes() != 1 {
738                         panic(conn.Changes())
739                 }
740         }
741         return conn.OpenBlob("main", "blob", "data", rowid, write)
742 }
743
744 func (i instance) PutSized(reader io.Reader, size int64) (err error) {
745         err = i.withConn(func(_ context.Context, conn conn) error {
746                 err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
747                         nil,
748                         i.location, size)
749                 if err != nil {
750                         return err
751                 }
752                 blob, err := i.openBlob(conn, true, false)
753                 if err != nil {
754                         return err
755                 }
756                 defer blob.Close()
757                 _, err = io.Copy(blob, reader)
758                 return err
759         }, true)
760         return
761 }
762
763 func (i instance) Put(reader io.Reader) (err error) {
764         var buf bytes.Buffer
765         _, err = io.Copy(&buf, reader)
766         if err != nil {
767                 return err
768         }
769         if false {
770                 return i.PutSized(&buf, int64(buf.Len()))
771         } else {
772                 return i.withConn(func(_ context.Context, conn conn) error {
773                         for range iter.N(10) {
774                                 err = sqlitex.Exec(conn,
775                                         "insert or replace into blob(name, data) values(?, cast(? as blob))",
776                                         nil,
777                                         i.location, buf.Bytes())
778                                 if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
779                                         log.Print("sqlite busy")
780                                         time.Sleep(time.Second)
781                                         continue
782                                 }
783                                 break
784                         }
785                         return err
786                 }, true)
787         }
788 }
789
790 type fileInfo struct {
791         size int64
792 }
793
794 func (f fileInfo) Name() string {
795         panic("implement me")
796 }
797
798 func (f fileInfo) Size() int64 {
799         return f.size
800 }
801
802 func (f fileInfo) Mode() os.FileMode {
803         panic("implement me")
804 }
805
806 func (f fileInfo) ModTime() time.Time {
807         panic("implement me")
808 }
809
810 func (f fileInfo) IsDir() bool {
811         panic("implement me")
812 }
813
814 func (f fileInfo) Sys() interface{} {
815         panic("implement me")
816 }
817
818 func (i instance) Stat() (ret os.FileInfo, err error) {
819         err = i.withConn(func(_ context.Context, conn conn) error {
820                 var blob *sqlite.Blob
821                 blob, err = i.openBlob(conn, false, false)
822                 if err != nil {
823                         return err
824                 }
825                 defer blob.Close()
826                 ret = fileInfo{blob.Size()}
827                 return nil
828         }, false)
829         return
830 }
831
832 func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
833         err = i.withConn(func(_ context.Context, conn conn) error {
834                 if false {
835                         var blob *sqlite.Blob
836                         blob, err = i.openBlob(conn, false, true)
837                         if err != nil {
838                                 return err
839                         }
840                         defer blob.Close()
841                         if off >= blob.Size() {
842                                 err = io.EOF
843                                 return err
844                         }
845                         if off+int64(len(p)) > blob.Size() {
846                                 p = p[:blob.Size()-off]
847                         }
848                         n, err = blob.ReadAt(p, off)
849                 } else {
850                         gotRow := false
851                         err = sqlitex.Exec(
852                                 conn,
853                                 "select substr(data, ?, ?) from blob where name=?",
854                                 func(stmt *sqlite.Stmt) error {
855                                         if gotRow {
856                                                 panic("found multiple matching blobs")
857                                         } else {
858                                                 gotRow = true
859                                         }
860                                         n = stmt.ColumnBytes(0, p)
861                                         return nil
862                                 },
863                                 off+1, len(p), i.location,
864                         )
865                         if err != nil {
866                                 return err
867                         }
868                         if !gotRow {
869                                 err = errors.New("blob not found")
870                                 return err
871                         }
872                         if n < len(p) {
873                                 err = io.EOF
874                         }
875                 }
876                 return nil
877         }, false)
878         return
879 }
880
881 func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
882         panic("implement me")
883 }
884
885 func (i instance) Delete() error {
886         return i.withConn(func(_ context.Context, conn conn) error {
887                 return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
888         }, true)
889 }