]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/sqlite-storage.go
Rework lots of option handling
[btrtrc.git] / storage / sqlite / sqlite-storage.go
1 package sqliteStorage
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "expvar"
8         "fmt"
9         "io"
10         "log"
11         "net/url"
12         "os"
13         "runtime"
14         "runtime/pprof"
15         "strings"
16         "sync"
17         "time"
18
19         "crawshaw.io/sqlite"
20         "crawshaw.io/sqlite/sqlitex"
21         "github.com/anacrolix/missinggo/iter"
22         "github.com/anacrolix/missinggo/v2/resource"
23         "github.com/anacrolix/torrent/storage"
24 )
25
26 type conn = *sqlite.Conn
27
28 type InitConnOpts struct {
29         SetJournalMode string
30         MmapSizeOk     bool  // If false, a package-specific default will be used.
31         MmapSize       int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
32 }
33
34 func (me InitConnOpts) JournalMode() string {
35         if me.SetJournalMode != "" {
36                 return me.SetJournalMode
37         }
38         return "wal"
39 }
40
41 func initConn(conn conn, opts InitConnOpts) error {
42         // Recursive triggers are required because we need to trim the blob_meta size after trimming to
43         // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
44         err := sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
45         if err != nil {
46                 return err
47         }
48         err = sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
49         if err != nil {
50                 return err
51         }
52         if opts.SetJournalMode != "" {
53                 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
54                         ret := stmt.ColumnText(0)
55                         if ret != opts.SetJournalMode {
56                                 panic(ret)
57                         }
58                         return nil
59                 })
60                 if err != nil {
61                         return err
62                 }
63         }
64         if !opts.MmapSizeOk {
65                 opts.MmapSize = 1 << 24 // 8 MiB
66         }
67         if opts.MmapSize >= 0 {
68                 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
69                 if err != nil {
70                         return err
71                 }
72         }
73         return nil
74 }
75
76 func InitSchema(conn conn, pageSize int, triggers bool) error {
77         if pageSize != 0 {
78                 err := sqlitex.ExecScript(conn, fmt.Sprintf("pragma page_size=%d", pageSize))
79                 if err != nil {
80                         return err
81                 }
82         }
83         err := sqlitex.ExecScript(conn, `
84                 -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
85                 -- can trim the database file size with partial vacuums without having to do a full vacuum, which 
86                 -- locks everything.
87                 pragma auto_vacuum=incremental;
88                 
89                 create table if not exists blob (
90                         name text,
91                         last_used timestamp default (datetime('now')),
92                         data blob,
93                         verified bool,
94                         primary key (name)
95                 );
96                 
97                 create table if not exists blob_meta (
98                         key text primary key,
99                         value
100                 );
101
102                 create index if not exists blob_last_used on blob(last_used);
103                 
104                 -- While sqlite *seems* to be faster to get sum(length(data)) instead of 
105                 -- sum(length(data)), it may still require a large table scan at start-up or with a 
106                 -- cold-cache. With this we can be assured that it doesn't.
107                 insert or ignore into blob_meta values ('size', 0);
108                 
109                 create table if not exists setting (
110                         name primary key on conflict replace,
111                         value
112                 );
113         
114                 create view if not exists deletable_blob as
115                 with recursive excess (
116                         usage_with,
117                         last_used,
118                         blob_rowid,
119                         data_length
120                 ) as (
121                         select * 
122                         from (
123                                 select 
124                                         (select value from blob_meta where key='size') as usage_with,
125                                         last_used,
126                                         rowid,
127                                         length(data)
128                                 from blob order by last_used, rowid limit 1
129                         )
130                         where usage_with > (select value from setting where name='capacity')
131                         union all
132                         select 
133                                 usage_with-data_length as new_usage_with,
134                                 blob.last_used,
135                                 blob.rowid,
136                                 length(data)
137                         from excess join blob
138                         on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
139                         where new_usage_with > (select value from setting where name='capacity')
140                 )
141                 select * from excess;
142         `)
143         if err != nil {
144                 return err
145         }
146         if triggers {
147                 err := sqlitex.ExecScript(conn, `
148                         create trigger if not exists after_insert_blob
149                         after insert on blob
150                         begin
151                                 update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
152                                 delete from blob where rowid in (select blob_rowid from deletable_blob);
153                         end;
154                         
155                         create trigger if not exists after_update_blob
156                         after update of data on blob
157                         begin
158                                 update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
159                                 delete from blob where rowid in (select blob_rowid from deletable_blob);
160                         end;
161                         
162                         create trigger if not exists after_delete_blob
163                         after delete on blob
164                         begin
165                                 update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
166                         end;
167                 `)
168                 if err != nil {
169                         return err
170                 }
171         }
172         return nil
173 }
174
175 type NewPiecesStorageOpts struct {
176         NewPoolOpts
177         InitDbOpts
178         ProvOpts    func(*ProviderOpts)
179         StorageOpts func(*storage.ResourcePiecesOpts)
180 }
181
182 // A convenience function that creates a connection pool, resource provider, and a pieces storage
183 // ClientImpl and returns them all with a Close attached.
184 func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
185         conns, err := NewPool(opts.NewPoolOpts)
186         if err != nil {
187                 return
188         }
189         err = initPoolDatabase(conns, opts.InitDbOpts)
190         if err != nil {
191                 return
192         }
193         provOpts := ProviderOpts{
194                 BatchWrites: conns.NumConns() > 1,
195         }
196         if f := opts.ProvOpts; f != nil {
197                 f(&provOpts)
198         }
199         prov, err := NewProvider(conns, provOpts)
200         if err != nil {
201                 conns.Close()
202                 return
203         }
204         var (
205                 journalMode string
206         )
207         withPoolConn(conns, func(c conn) {
208                 err = sqlitex.Exec(c, "pragma journal_mode", func(stmt *sqlite.Stmt) error {
209                         journalMode = stmt.ColumnText(0)
210                         return nil
211                 })
212         })
213         if err != nil {
214                 err = fmt.Errorf("getting journal mode: %w", err)
215                 prov.Close()
216                 return
217         }
218         if journalMode == "" {
219                 err = errors.New("didn't get journal mode")
220                 prov.Close()
221                 return
222         }
223         storageOpts := storage.ResourcePiecesOpts{
224                 NoSizedPuts: journalMode != "wal" || conns.NumConns() == 1,
225         }
226         if f := opts.StorageOpts; f != nil {
227                 f(&storageOpts)
228         }
229         store := storage.NewResourcePiecesOpts(prov, storageOpts)
230         return struct {
231                 storage.ClientImpl
232                 io.Closer
233         }{
234                 store,
235                 prov,
236         }, nil
237 }
238
239 type NewPoolOpts struct {
240         NewConnOpts
241         InitConnOpts
242         NumConns int
243 }
244
245 type InitDbOpts struct {
246         DontInitSchema bool
247         PageSize       int
248         // If non-zero, overrides the existing setting.
249         Capacity int64
250 }
251
252 // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
253 // the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
254 // top-level option type.
255 type PoolConf struct {
256         NumConns    int
257         JournalMode string
258 }
259
260 // Remove any capacity limits.
261 func UnlimitCapacity(conn conn) error {
262         return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
263 }
264
265 // Set the capacity limit to exactly this value.
266 func SetCapacity(conn conn, cap int64) error {
267         return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
268 }
269
270 type NewConnOpts struct {
271         // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
272         // private, temporary on-disk database will be created. This private database will be
273         // automatically deleted as soon as the database connection is closed."
274         Path   string
275         Memory bool
276         // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
277         // and NumConns < 2.
278         NoConcurrentBlobReads bool
279 }
280
281 func newOpenUri(opts NewConnOpts) string {
282         path := url.PathEscape(opts.Path)
283         if opts.Memory {
284                 path = ":memory:"
285         }
286         values := make(url.Values)
287         if opts.NoConcurrentBlobReads || opts.Memory {
288                 values.Add("cache", "shared")
289         }
290         return fmt.Sprintf("file:%s?%s", path, values.Encode())
291 }
292
293 func initDatabase(conn conn, opts InitDbOpts) (err error) {
294         if !opts.DontInitSchema {
295                 if opts.PageSize == 0 {
296                         opts.PageSize = 1 << 14
297                 }
298                 err = InitSchema(conn, opts.PageSize, true)
299                 if err != nil {
300                         return
301                 }
302         }
303         if opts.Capacity != 0 {
304                 err = SetCapacity(conn, opts.Capacity)
305                 if err != nil {
306                         return
307                 }
308         }
309         return
310 }
311
312 func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
313         withPoolConn(pool, func(c conn) {
314                 err = initDatabase(c, opts)
315         })
316         return
317 }
318
319 func newConn(opts NewConnOpts) (conn, error) {
320         return sqlite.OpenConn(newOpenUri(opts), 0)
321 }
322
323 type poolWithNumConns struct {
324         *sqlitex.Pool
325         numConns int
326 }
327
328 func (me poolWithNumConns) NumConns() int {
329         return me.numConns
330 }
331
332 func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
333         if opts.NumConns == 0 {
334                 opts.NumConns = runtime.NumCPU()
335         }
336         conns, err := func() (ConnPool, error) {
337                 switch opts.NumConns {
338                 case 1:
339                         conn, err := newConn(opts.NewConnOpts)
340                         return &poolFromConn{conn: conn}, err
341                 default:
342                         _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), 0, opts.NumConns)
343                         return poolWithNumConns{_pool, opts.NumConns}, err
344                 }
345         }()
346         if err != nil {
347                 return
348         }
349         defer func() {
350                 if err != nil {
351                         conns.Close()
352                 }
353         }()
354         return conns, initPoolConns(nil, conns, InitPoolOpts{
355                 NumConns:     opts.NumConns,
356                 InitConnOpts: opts.InitConnOpts,
357         })
358 }
359
360 // Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
361 type poolFromConn struct {
362         mu   sync.Mutex
363         conn conn
364 }
365
366 func (me *poolFromConn) Get(ctx context.Context) conn {
367         me.mu.Lock()
368         return me.conn
369 }
370
371 func (me *poolFromConn) Put(conn conn) {
372         if conn != me.conn {
373                 panic("expected to same conn")
374         }
375         me.mu.Unlock()
376 }
377
378 func (me *poolFromConn) Close() error {
379         return me.conn.Close()
380 }
381
382 func (poolFromConn) NumConns() int { return 1 }
383
384 type ProviderOpts struct {
385         BatchWrites bool
386 }
387
388 // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
389 // the ConnPool (since it has to initialize all the connections anyway).
390 func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
391         prov := &provider{pool: pool, opts: opts}
392         if opts.BatchWrites {
393                 writes := make(chan writeRequest)
394                 prov.writes = writes
395                 // This is retained for backwards compatibility. It may not be necessary.
396                 runtime.SetFinalizer(prov, func(p *provider) {
397                         p.Close()
398                 })
399                 go providerWriter(writes, prov.pool)
400         }
401         return prov, nil
402 }
403
404 type InitPoolOpts struct {
405         NumConns int
406         InitConnOpts
407 }
408
409 func initPoolConns(ctx context.Context, pool ConnPool, opts InitPoolOpts) (err error) {
410         var conns []conn
411         defer func() {
412                 for _, c := range conns {
413                         pool.Put(c)
414                 }
415         }()
416         for range iter.N(opts.NumConns) {
417                 conn := pool.Get(ctx)
418                 if conn == nil {
419                         break
420                 }
421                 conns = append(conns, conn)
422                 err = initConn(conn, opts.InitConnOpts)
423                 if err != nil {
424                         err = fmt.Errorf("initing conn %v: %w", len(conns), err)
425                         return
426                 }
427         }
428         return
429 }
430
431 type ConnPool interface {
432         Get(context.Context) conn
433         Put(conn)
434         Close() error
435         NumConns() int
436 }
437
438 func withPoolConn(pool ConnPool, with func(conn)) {
439         c := pool.Get(nil)
440         defer pool.Put(c)
441         with(c)
442 }
443
444 type provider struct {
445         pool     ConnPool
446         writes   chan<- writeRequest
447         opts     ProviderOpts
448         closeMu  sync.RWMutex
449         closed   bool
450         closeErr error
451 }
452
453 var _ storage.ConsecutiveChunkReader = (*provider)(nil)
454
455 func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
456         p.closeMu.RLock()
457         runner, err := p.getReadWithConnRunner()
458         if err != nil {
459                 p.closeMu.RUnlock()
460                 return nil, err
461         }
462         r, w := io.Pipe()
463         go func() {
464                 defer p.closeMu.RUnlock()
465                 err = runner(func(_ context.Context, conn conn) error {
466                         var written int64
467                         err = sqlitex.Exec(conn, `
468                                 select
469                                         data,
470                                         cast(substr(name, ?+1) as integer) as offset
471                                 from blob
472                                 where name like ?||'%'
473                                 order by offset`,
474                                 func(stmt *sqlite.Stmt) error {
475                                         offset := stmt.ColumnInt64(1)
476                                         if offset != written {
477                                                 return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
478                                         }
479                                         // TODO: Avoid intermediate buffers here
480                                         r := stmt.ColumnReader(0)
481                                         w1, err := io.Copy(w, r)
482                                         written += w1
483                                         return err
484                                 },
485                                 len(prefix),
486                                 prefix,
487                         )
488                         return err
489                 })
490                 w.CloseWithError(err)
491         }()
492         return r, nil
493 }
494
495 func (me *provider) Close() error {
496         me.closeMu.Lock()
497         defer me.closeMu.Unlock()
498         if me.closed {
499                 return me.closeErr
500         }
501         if me.writes != nil {
502                 close(me.writes)
503         }
504         me.closeErr = me.pool.Close()
505         me.closed = true
506         return me.closeErr
507 }
508
509 type writeRequest struct {
510         query  withConn
511         done   chan<- error
512         labels pprof.LabelSet
513 }
514
515 var expvars = expvar.NewMap("sqliteStorage")
516
517 func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
518         pprof.Do(context.Background(), labels, func(ctx context.Context) {
519                 // We pass in the context in the hope that the CPU profiler might incorporate sqlite
520                 // activity the action that triggered it. It doesn't seem that way, since those calls don't
521                 // take a context.Context themselves. It may come in useful in the goroutine profiles
522                 // though, and doesn't hurt to expose it here for other purposes should things change.
523                 err = query(ctx, conn)
524         })
525         return
526 }
527
528 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
529 // stronger typing on the writes channel.
530 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
531         conn := pool.Get(context.TODO())
532         if conn == nil {
533                 return
534         }
535         defer pool.Put(conn)
536         for {
537                 first, ok := <-writes
538                 if !ok {
539                         return
540                 }
541                 var buf []func()
542                 var cantFail error
543                 func() {
544                         defer sqlitex.Save(conn)(&cantFail)
545                         firstErr := runQueryWithLabels(first.query, first.labels, conn)
546                         buf = append(buf, func() { first.done <- firstErr })
547                         for {
548                                 select {
549                                 case wr, ok := <-writes:
550                                         if ok {
551                                                 err := runQueryWithLabels(wr.query, wr.labels, conn)
552                                                 buf = append(buf, func() { wr.done <- err })
553                                                 continue
554                                         }
555                                 default:
556                                 }
557                                 break
558                         }
559                 }()
560                 // Not sure what to do if this failed.
561                 if cantFail != nil {
562                         expvars.Add("batchTransactionErrors", 1)
563                 }
564                 // Signal done after we know the transaction succeeded.
565                 for _, done := range buf {
566                         done()
567                 }
568                 expvars.Add("batchTransactions", 1)
569                 expvars.Add("batchedQueries", int64(len(buf)))
570                 //log.Printf("batched %v write queries", len(buf))
571         }
572 }
573
574 func (p *provider) NewInstance(s string) (resource.Instance, error) {
575         return instance{s, p}, nil
576 }
577
578 type instance struct {
579         location string
580         p        *provider
581 }
582
583 func getLabels(skip int) pprof.LabelSet {
584         return pprof.Labels("sqlite-storage-action", func() string {
585                 var pcs [8]uintptr
586                 runtime.Callers(skip+3, pcs[:])
587                 fs := runtime.CallersFrames(pcs[:])
588                 f, _ := fs.Next()
589                 funcName := f.Func.Name()
590                 funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
591                 //log.Printf("func name: %q", funcName)
592                 return funcName
593         }())
594 }
595
596 func (p *provider) withConn(with withConn, write bool, skip int) error {
597         p.closeMu.RLock()
598         // I think we need to check this here because it may not be valid to send to the writes channel
599         // if we're already closed. So don't try to move this check into getReadWithConnRunner.
600         if p.closed {
601                 p.closeMu.RUnlock()
602                 return errors.New("closed")
603         }
604         if write && p.opts.BatchWrites {
605                 done := make(chan error)
606                 p.writes <- writeRequest{
607                         query:  with,
608                         done:   done,
609                         labels: getLabels(skip + 1),
610                 }
611                 p.closeMu.RUnlock()
612                 return <-done
613         } else {
614                 defer p.closeMu.RUnlock()
615                 runner, err := p.getReadWithConnRunner()
616                 if err != nil {
617                         return err
618                 }
619                 return runner(with)
620         }
621 }
622
623 // Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
624 // function, the runner *must* be used or the conn is leaked. You should check the provider isn't
625 // closed before using this.
626 func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
627         conn := p.pool.Get(context.TODO())
628         if conn == nil {
629                 err = errors.New("couldn't get pool conn")
630                 return
631         }
632         with = func(with withConn) error {
633                 defer p.pool.Put(conn)
634                 return runQueryWithLabels(with, getLabels(1), conn)
635         }
636         return
637 }
638
639 type withConn func(context.Context, conn) error
640
641 func (i instance) withConn(with withConn, write bool) error {
642         return i.p.withConn(with, write, 1)
643 }
644
645 func (i instance) getConn() *sqlite.Conn {
646         return i.p.pool.Get(context.TODO())
647 }
648
649 func (i instance) putConn(conn *sqlite.Conn) {
650         i.p.pool.Put(conn)
651 }
652
653 func (i instance) Readdirnames() (names []string, err error) {
654         prefix := i.location + "/"
655         err = i.withConn(func(_ context.Context, conn conn) error {
656                 return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
657                         names = append(names, stmt.ColumnText(0)[len(prefix):])
658                         return nil
659                 }, prefix+"%")
660         }, false)
661         //log.Printf("readdir %q gave %q", i.location, names)
662         return
663 }
664
665 func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
666         rows := 0
667         err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
668                 rowid = stmt.ColumnInt64(0)
669                 rows++
670                 return nil
671         }, i.location)
672         if err != nil {
673                 return
674         }
675         if rows == 1 {
676                 return
677         }
678         if rows == 0 {
679                 err = errors.New("blob not found")
680                 return
681         }
682         panic(rows)
683 }
684
685 type connBlob struct {
686         *sqlite.Blob
687         onClose func()
688 }
689
690 func (me connBlob) Close() error {
691         err := me.Blob.Close()
692         me.onClose()
693         return err
694 }
695
696 func (i instance) Get() (ret io.ReadCloser, err error) {
697         conn := i.getConn()
698         if conn == nil {
699                 panic("nil sqlite conn")
700         }
701         blob, err := i.openBlob(conn, false, true)
702         if err != nil {
703                 i.putConn(conn)
704                 return
705         }
706         var once sync.Once
707         return connBlob{blob, func() {
708                 once.Do(func() { i.putConn(conn) })
709         }}, nil
710 }
711
712 func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
713         rowid, err := i.getBlobRowid(conn)
714         if err != nil {
715                 return nil, err
716         }
717         // This seems to cause locking issues with in-memory databases. Is it something to do with not
718         // having WAL?
719         if updateAccess {
720                 err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
721                 if err != nil {
722                         err = fmt.Errorf("updating last_used: %w", err)
723                         return nil, err
724                 }
725                 if conn.Changes() != 1 {
726                         panic(conn.Changes())
727                 }
728         }
729         return conn.OpenBlob("main", "blob", "data", rowid, write)
730 }
731
732 func (i instance) PutSized(reader io.Reader, size int64) (err error) {
733         err = i.withConn(func(_ context.Context, conn conn) error {
734                 err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
735                         nil,
736                         i.location, size)
737                 if err != nil {
738                         return err
739                 }
740                 blob, err := i.openBlob(conn, true, false)
741                 if err != nil {
742                         return err
743                 }
744                 defer blob.Close()
745                 _, err = io.Copy(blob, reader)
746                 return err
747         }, true)
748         return
749 }
750
751 func (i instance) Put(reader io.Reader) (err error) {
752         var buf bytes.Buffer
753         _, err = io.Copy(&buf, reader)
754         if err != nil {
755                 return err
756         }
757         if false {
758                 return i.PutSized(&buf, int64(buf.Len()))
759         } else {
760                 return i.withConn(func(_ context.Context, conn conn) error {
761                         for range iter.N(10) {
762                                 err = sqlitex.Exec(conn,
763                                         "insert or replace into blob(name, data) values(?, cast(? as blob))",
764                                         nil,
765                                         i.location, buf.Bytes())
766                                 if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
767                                         log.Print("sqlite busy")
768                                         time.Sleep(time.Second)
769                                         continue
770                                 }
771                                 break
772                         }
773                         return err
774                 }, true)
775         }
776 }
777
778 type fileInfo struct {
779         size int64
780 }
781
782 func (f fileInfo) Name() string {
783         panic("implement me")
784 }
785
786 func (f fileInfo) Size() int64 {
787         return f.size
788 }
789
790 func (f fileInfo) Mode() os.FileMode {
791         panic("implement me")
792 }
793
794 func (f fileInfo) ModTime() time.Time {
795         panic("implement me")
796 }
797
798 func (f fileInfo) IsDir() bool {
799         panic("implement me")
800 }
801
802 func (f fileInfo) Sys() interface{} {
803         panic("implement me")
804 }
805
806 func (i instance) Stat() (ret os.FileInfo, err error) {
807         err = i.withConn(func(_ context.Context, conn conn) error {
808                 var blob *sqlite.Blob
809                 blob, err = i.openBlob(conn, false, false)
810                 if err != nil {
811                         return err
812                 }
813                 defer blob.Close()
814                 ret = fileInfo{blob.Size()}
815                 return nil
816         }, false)
817         return
818 }
819
820 func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
821         err = i.withConn(func(_ context.Context, conn conn) error {
822                 if false {
823                         var blob *sqlite.Blob
824                         blob, err = i.openBlob(conn, false, true)
825                         if err != nil {
826                                 return err
827                         }
828                         defer blob.Close()
829                         if off >= blob.Size() {
830                                 err = io.EOF
831                                 return err
832                         }
833                         if off+int64(len(p)) > blob.Size() {
834                                 p = p[:blob.Size()-off]
835                         }
836                         n, err = blob.ReadAt(p, off)
837                 } else {
838                         gotRow := false
839                         err = sqlitex.Exec(
840                                 conn,
841                                 "select substr(data, ?, ?) from blob where name=?",
842                                 func(stmt *sqlite.Stmt) error {
843                                         if gotRow {
844                                                 panic("found multiple matching blobs")
845                                         } else {
846                                                 gotRow = true
847                                         }
848                                         n = stmt.ColumnBytes(0, p)
849                                         return nil
850                                 },
851                                 off+1, len(p), i.location,
852                         )
853                         if err != nil {
854                                 return err
855                         }
856                         if !gotRow {
857                                 err = errors.New("blob not found")
858                                 return err
859                         }
860                         if n < len(p) {
861                                 err = io.EOF
862                         }
863                 }
864                 return nil
865         }, false)
866         return
867 }
868
869 func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
870         panic("implement me")
871 }
872
873 func (i instance) Delete() error {
874         return i.withConn(func(_ context.Context, conn conn) error {
875                 return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
876         }, true)
877 }