]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/sqlite-storage.go
Expose a variety of blob cleanup styles
[btrtrc.git] / storage / sqlite / sqlite-storage.go
1 package sqliteStorage
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "expvar"
8         "fmt"
9         "io"
10         "log"
11         "net/url"
12         "os"
13         "runtime"
14         "runtime/pprof"
15         "strings"
16         "sync"
17         "time"
18
19         "crawshaw.io/sqlite"
20         "crawshaw.io/sqlite/sqlitex"
21         "github.com/anacrolix/missinggo/iter"
22         "github.com/anacrolix/missinggo/v2/resource"
23         "github.com/anacrolix/torrent/storage"
24 )
25
26 type conn = *sqlite.Conn
27
28 type InitConnOpts struct {
29         SetJournalMode string
30         MmapSizeOk     bool  // If false, a package-specific default will be used.
31         MmapSize       int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
32 }
33
34 func (me InitConnOpts) JournalMode() string {
35         if me.SetJournalMode != "" {
36                 return me.SetJournalMode
37         }
38         return "wal"
39 }
40
41 var UnexpectedJournalMode = errors.New("unexpected journal mode")
42
43 func initConn(conn conn, opts InitConnOpts) error {
44         // Recursive triggers are required because we need to trim the blob_meta size after trimming to
45         // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
46         err := sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
47         if err != nil {
48                 return err
49         }
50         err = sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
51         if err != nil {
52                 return err
53         }
54         if opts.SetJournalMode != "" {
55                 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
56                         ret := stmt.ColumnText(0)
57                         if ret != opts.SetJournalMode {
58                                 return UnexpectedJournalMode
59                         }
60                         return nil
61                 })
62                 if err != nil {
63                         return err
64                 }
65         }
66         if !opts.MmapSizeOk {
67                 // Set the default. Currently it seems the library picks reasonable defaults, especially for
68                 // wal.
69                 opts.MmapSize = -1
70                 //opts.MmapSize = 1 << 24 // 8 MiB
71         }
72         if opts.MmapSize >= 0 {
73                 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
74                 if err != nil {
75                         return err
76                 }
77         }
78         return nil
79 }
80
81 func InitSchema(conn conn, pageSize int, triggers bool) error {
82         if pageSize != 0 {
83                 err := sqlitex.ExecScript(conn, fmt.Sprintf("pragma page_size=%d", pageSize))
84                 if err != nil {
85                         return err
86                 }
87         }
88         err := sqlitex.ExecScript(conn, `
89                 -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
90                 -- can trim the database file size with partial vacuums without having to do a full vacuum, which 
91                 -- locks everything.
92                 pragma auto_vacuum=incremental;
93                 
94                 create table if not exists blob (
95                         name text,
96                         last_used timestamp default (datetime('now')),
97                         data blob,
98                         verified bool,
99                         primary key (name)
100                 );
101                 
102                 create table if not exists blob_meta (
103                         key text primary key,
104                         value
105                 );
106
107                 create index if not exists blob_last_used on blob(last_used);
108                 
109                 -- While sqlite *seems* to be faster to get sum(length(data)) instead of 
110                 -- sum(length(data)), it may still require a large table scan at start-up or with a 
111                 -- cold-cache. With this we can be assured that it doesn't.
112                 insert or ignore into blob_meta values ('size', 0);
113                 
114                 create table if not exists setting (
115                         name primary key on conflict replace,
116                         value
117                 );
118         
119                 create view if not exists deletable_blob as
120                 with recursive excess (
121                         usage_with,
122                         last_used,
123                         blob_rowid,
124                         data_length
125                 ) as (
126                         select * 
127                         from (
128                                 select 
129                                         (select value from blob_meta where key='size') as usage_with,
130                                         last_used,
131                                         rowid,
132                                         length(data)
133                                 from blob order by last_used, rowid limit 1
134                         )
135                         where usage_with > (select value from setting where name='capacity')
136                         union all
137                         select 
138                                 usage_with-data_length as new_usage_with,
139                                 blob.last_used,
140                                 blob.rowid,
141                                 length(data)
142                         from excess join blob
143                         on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
144                         where new_usage_with > (select value from setting where name='capacity')
145                 )
146                 select * from excess;
147         `)
148         if err != nil {
149                 return err
150         }
151         if triggers {
152                 err := sqlitex.ExecScript(conn, `
153                         create trigger if not exists after_insert_blob
154                         after insert on blob
155                         begin
156                                 update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
157                                 delete from blob where rowid in (select blob_rowid from deletable_blob);
158                         end;
159                         
160                         create trigger if not exists after_update_blob
161                         after update of data on blob
162                         begin
163                                 update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
164                                 delete from blob where rowid in (select blob_rowid from deletable_blob);
165                         end;
166                         
167                         create trigger if not exists after_delete_blob
168                         after delete on blob
169                         begin
170                                 update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
171                         end;
172                 `)
173                 if err != nil {
174                         return err
175                 }
176         }
177         return nil
178 }
179
180 type NewPiecesStorageOpts struct {
181         NewPoolOpts
182         InitDbOpts
183         ProvOpts    func(*ProviderOpts)
184         StorageOpts func(*storage.ResourcePiecesOpts)
185 }
186
187 // A convenience function that creates a connection pool, resource provider, and a pieces storage
188 // ClientImpl and returns them all with a Close attached.
189 func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
190         conns, err := NewPool(opts.NewPoolOpts)
191         if err != nil {
192                 return
193         }
194         err = initPoolDatabase(conns, opts.InitDbOpts)
195         if err != nil {
196                 return
197         }
198         provOpts := ProviderOpts{
199                 BatchWrites: conns.NumConns() > 1,
200         }
201         if f := opts.ProvOpts; f != nil {
202                 f(&provOpts)
203         }
204         prov, err := NewProvider(conns, provOpts)
205         if err != nil {
206                 conns.Close()
207                 return
208         }
209         var (
210                 journalMode string
211         )
212         withPoolConn(conns, func(c conn) {
213                 err = sqlitex.Exec(c, "pragma journal_mode", func(stmt *sqlite.Stmt) error {
214                         journalMode = stmt.ColumnText(0)
215                         return nil
216                 })
217         })
218         if err != nil {
219                 err = fmt.Errorf("getting journal mode: %w", err)
220                 prov.Close()
221                 return
222         }
223         if journalMode == "" {
224                 err = errors.New("didn't get journal mode")
225                 prov.Close()
226                 return
227         }
228         storageOpts := storage.ResourcePiecesOpts{
229                 NoSizedPuts: journalMode != "wal" || conns.NumConns() == 1,
230         }
231         if f := opts.StorageOpts; f != nil {
232                 f(&storageOpts)
233         }
234         store := storage.NewResourcePiecesOpts(prov, storageOpts)
235         return struct {
236                 storage.ClientImpl
237                 io.Closer
238         }{
239                 store,
240                 prov,
241         }, nil
242 }
243
244 type NewPoolOpts struct {
245         NewConnOpts
246         InitConnOpts
247         NumConns int
248 }
249
250 type InitDbOpts struct {
251         DontInitSchema bool
252         PageSize       int
253         // If non-zero, overrides the existing setting.
254         Capacity   int64
255         NoTriggers bool
256 }
257
258 // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
259 // the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
260 // top-level option type.
261 type PoolConf struct {
262         NumConns    int
263         JournalMode string
264 }
265
266 // Remove any capacity limits.
267 func UnlimitCapacity(conn conn) error {
268         return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
269 }
270
271 // Set the capacity limit to exactly this value.
272 func SetCapacity(conn conn, cap int64) error {
273         return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
274 }
275
276 type NewConnOpts struct {
277         // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
278         // private, temporary on-disk database will be created. This private database will be
279         // automatically deleted as soon as the database connection is closed."
280         Path   string
281         Memory bool
282         // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
283         // and NumConns < 2.
284         NoConcurrentBlobReads bool
285 }
286
287 func newOpenUri(opts NewConnOpts) string {
288         path := url.PathEscape(opts.Path)
289         if opts.Memory {
290                 path = ":memory:"
291         }
292         values := make(url.Values)
293         if opts.NoConcurrentBlobReads || opts.Memory {
294                 values.Add("cache", "shared")
295         }
296         return fmt.Sprintf("file:%s?%s", path, values.Encode())
297 }
298
299 func initDatabase(conn conn, opts InitDbOpts) (err error) {
300         if !opts.DontInitSchema {
301                 if opts.PageSize == 0 {
302                         opts.PageSize = 1 << 14
303                 }
304                 err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
305                 if err != nil {
306                         return
307                 }
308         }
309         if opts.Capacity != 0 {
310                 err = SetCapacity(conn, opts.Capacity)
311                 if err != nil {
312                         return
313                 }
314         }
315         return
316 }
317
318 func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
319         withPoolConn(pool, func(c conn) {
320                 err = initDatabase(c, opts)
321         })
322         return
323 }
324
325 func newConn(opts NewConnOpts) (conn, error) {
326         return sqlite.OpenConn(newOpenUri(opts), 0)
327 }
328
329 type poolWithNumConns struct {
330         *sqlitex.Pool
331         numConns int
332 }
333
334 func (me poolWithNumConns) NumConns() int {
335         return me.numConns
336 }
337
338 func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
339         if opts.NumConns == 0 {
340                 opts.NumConns = runtime.NumCPU()
341         }
342         conns, err := func() (ConnPool, error) {
343                 switch opts.NumConns {
344                 case 1:
345                         conn, err := newConn(opts.NewConnOpts)
346                         return &poolFromConn{conn: conn}, err
347                 default:
348                         _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), 0, opts.NumConns)
349                         return poolWithNumConns{_pool, opts.NumConns}, err
350                 }
351         }()
352         if err != nil {
353                 return
354         }
355         defer func() {
356                 if err != nil {
357                         conns.Close()
358                 }
359         }()
360         return conns, initPoolConns(nil, conns, InitPoolOpts{
361                 NumConns:     opts.NumConns,
362                 InitConnOpts: opts.InitConnOpts,
363         })
364 }
365
366 // Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
367 type poolFromConn struct {
368         mu   sync.Mutex
369         conn conn
370 }
371
372 func (me *poolFromConn) Get(ctx context.Context) conn {
373         me.mu.Lock()
374         return me.conn
375 }
376
377 func (me *poolFromConn) Put(conn conn) {
378         if conn != me.conn {
379                 panic("expected to same conn")
380         }
381         me.mu.Unlock()
382 }
383
384 func (me *poolFromConn) Close() error {
385         return me.conn.Close()
386 }
387
388 func (poolFromConn) NumConns() int { return 1 }
389
390 type ProviderOpts struct {
391         BatchWrites bool
392 }
393
394 // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
395 // the ConnPool (since it has to initialize all the connections anyway).
396 func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
397         prov := &provider{pool: pool, opts: opts}
398         if opts.BatchWrites {
399                 writes := make(chan writeRequest)
400                 prov.writes = writes
401                 // This is retained for backwards compatibility. It may not be necessary.
402                 runtime.SetFinalizer(prov, func(p *provider) {
403                         p.Close()
404                 })
405                 go providerWriter(writes, prov.pool)
406         }
407         return prov, nil
408 }
409
410 type InitPoolOpts struct {
411         NumConns int
412         InitConnOpts
413 }
414
415 func initPoolConns(ctx context.Context, pool ConnPool, opts InitPoolOpts) (err error) {
416         var conns []conn
417         defer func() {
418                 for _, c := range conns {
419                         pool.Put(c)
420                 }
421         }()
422         for range iter.N(opts.NumConns) {
423                 conn := pool.Get(ctx)
424                 if conn == nil {
425                         break
426                 }
427                 conns = append(conns, conn)
428                 err = initConn(conn, opts.InitConnOpts)
429                 if err != nil {
430                         err = fmt.Errorf("initing conn %v: %w", len(conns), err)
431                         return
432                 }
433         }
434         return
435 }
436
437 type ConnPool interface {
438         Get(context.Context) conn
439         Put(conn)
440         Close() error
441         NumConns() int
442 }
443
444 func withPoolConn(pool ConnPool, with func(conn)) {
445         c := pool.Get(nil)
446         defer pool.Put(c)
447         with(c)
448 }
449
450 type provider struct {
451         pool     ConnPool
452         writes   chan<- writeRequest
453         opts     ProviderOpts
454         closeMu  sync.RWMutex
455         closed   bool
456         closeErr error
457 }
458
459 var _ storage.ConsecutiveChunkReader = (*provider)(nil)
460
461 func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
462         p.closeMu.RLock()
463         runner, err := p.getReadWithConnRunner()
464         if err != nil {
465                 p.closeMu.RUnlock()
466                 return nil, err
467         }
468         r, w := io.Pipe()
469         go func() {
470                 defer p.closeMu.RUnlock()
471                 err = runner(func(_ context.Context, conn conn) error {
472                         var written int64
473                         err = sqlitex.Exec(conn, `
474                                 select
475                                         data,
476                                         cast(substr(name, ?+1) as integer) as offset
477                                 from blob
478                                 where name like ?||'%'
479                                 order by offset`,
480                                 func(stmt *sqlite.Stmt) error {
481                                         offset := stmt.ColumnInt64(1)
482                                         if offset != written {
483                                                 return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
484                                         }
485                                         // TODO: Avoid intermediate buffers here
486                                         r := stmt.ColumnReader(0)
487                                         w1, err := io.Copy(w, r)
488                                         written += w1
489                                         return err
490                                 },
491                                 len(prefix),
492                                 prefix,
493                         )
494                         return err
495                 })
496                 w.CloseWithError(err)
497         }()
498         return r, nil
499 }
500
501 func (me *provider) Close() error {
502         me.closeMu.Lock()
503         defer me.closeMu.Unlock()
504         if me.closed {
505                 return me.closeErr
506         }
507         if me.writes != nil {
508                 close(me.writes)
509         }
510         me.closeErr = me.pool.Close()
511         me.closed = true
512         return me.closeErr
513 }
514
515 type writeRequest struct {
516         query  withConn
517         done   chan<- error
518         labels pprof.LabelSet
519 }
520
521 var expvars = expvar.NewMap("sqliteStorage")
522
523 func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
524         pprof.Do(context.Background(), labels, func(ctx context.Context) {
525                 // We pass in the context in the hope that the CPU profiler might incorporate sqlite
526                 // activity the action that triggered it. It doesn't seem that way, since those calls don't
527                 // take a context.Context themselves. It may come in useful in the goroutine profiles
528                 // though, and doesn't hurt to expose it here for other purposes should things change.
529                 err = query(ctx, conn)
530         })
531         return
532 }
533
534 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
535 // stronger typing on the writes channel.
536 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
537         conn := pool.Get(context.TODO())
538         if conn == nil {
539                 return
540         }
541         defer pool.Put(conn)
542         for {
543                 first, ok := <-writes
544                 if !ok {
545                         return
546                 }
547                 var buf []func()
548                 var cantFail error
549                 func() {
550                         defer sqlitex.Save(conn)(&cantFail)
551                         firstErr := runQueryWithLabels(first.query, first.labels, conn)
552                         buf = append(buf, func() { first.done <- firstErr })
553                         for {
554                                 select {
555                                 case wr, ok := <-writes:
556                                         if ok {
557                                                 err := runQueryWithLabels(wr.query, wr.labels, conn)
558                                                 buf = append(buf, func() { wr.done <- err })
559                                                 continue
560                                         }
561                                 default:
562                                 }
563                                 break
564                         }
565                 }()
566                 // Not sure what to do if this failed.
567                 if cantFail != nil {
568                         expvars.Add("batchTransactionErrors", 1)
569                 }
570                 // Signal done after we know the transaction succeeded.
571                 for _, done := range buf {
572                         done()
573                 }
574                 expvars.Add("batchTransactions", 1)
575                 expvars.Add("batchedQueries", int64(len(buf)))
576                 //log.Printf("batched %v write queries", len(buf))
577         }
578 }
579
580 func (p *provider) NewInstance(s string) (resource.Instance, error) {
581         return instance{s, p}, nil
582 }
583
584 type instance struct {
585         location string
586         p        *provider
587 }
588
589 func getLabels(skip int) pprof.LabelSet {
590         return pprof.Labels("sqlite-storage-action", func() string {
591                 var pcs [8]uintptr
592                 runtime.Callers(skip+3, pcs[:])
593                 fs := runtime.CallersFrames(pcs[:])
594                 f, _ := fs.Next()
595                 funcName := f.Func.Name()
596                 funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
597                 //log.Printf("func name: %q", funcName)
598                 return funcName
599         }())
600 }
601
602 func (p *provider) withConn(with withConn, write bool, skip int) error {
603         p.closeMu.RLock()
604         // I think we need to check this here because it may not be valid to send to the writes channel
605         // if we're already closed. So don't try to move this check into getReadWithConnRunner.
606         if p.closed {
607                 p.closeMu.RUnlock()
608                 return errors.New("closed")
609         }
610         if write && p.opts.BatchWrites {
611                 done := make(chan error)
612                 p.writes <- writeRequest{
613                         query:  with,
614                         done:   done,
615                         labels: getLabels(skip + 1),
616                 }
617                 p.closeMu.RUnlock()
618                 return <-done
619         } else {
620                 defer p.closeMu.RUnlock()
621                 runner, err := p.getReadWithConnRunner()
622                 if err != nil {
623                         return err
624                 }
625                 return runner(with)
626         }
627 }
628
629 // Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
630 // function, the runner *must* be used or the conn is leaked. You should check the provider isn't
631 // closed before using this.
632 func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
633         conn := p.pool.Get(context.TODO())
634         if conn == nil {
635                 err = errors.New("couldn't get pool conn")
636                 return
637         }
638         with = func(with withConn) error {
639                 defer p.pool.Put(conn)
640                 return runQueryWithLabels(with, getLabels(1), conn)
641         }
642         return
643 }
644
645 type withConn func(context.Context, conn) error
646
647 func (i instance) withConn(with withConn, write bool) error {
648         return i.p.withConn(with, write, 1)
649 }
650
651 func (i instance) getConn() *sqlite.Conn {
652         return i.p.pool.Get(context.TODO())
653 }
654
655 func (i instance) putConn(conn *sqlite.Conn) {
656         i.p.pool.Put(conn)
657 }
658
659 func (i instance) Readdirnames() (names []string, err error) {
660         prefix := i.location + "/"
661         err = i.withConn(func(_ context.Context, conn conn) error {
662                 return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
663                         names = append(names, stmt.ColumnText(0)[len(prefix):])
664                         return nil
665                 }, prefix+"%")
666         }, false)
667         //log.Printf("readdir %q gave %q", i.location, names)
668         return
669 }
670
671 func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
672         rows := 0
673         err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
674                 rowid = stmt.ColumnInt64(0)
675                 rows++
676                 return nil
677         }, i.location)
678         if err != nil {
679                 return
680         }
681         if rows == 1 {
682                 return
683         }
684         if rows == 0 {
685                 err = errors.New("blob not found")
686                 return
687         }
688         panic(rows)
689 }
690
691 type connBlob struct {
692         *sqlite.Blob
693         onClose func()
694 }
695
696 func (me connBlob) Close() error {
697         err := me.Blob.Close()
698         me.onClose()
699         return err
700 }
701
702 func (i instance) Get() (ret io.ReadCloser, err error) {
703         conn := i.getConn()
704         if conn == nil {
705                 panic("nil sqlite conn")
706         }
707         blob, err := i.openBlob(conn, false, true)
708         if err != nil {
709                 i.putConn(conn)
710                 return
711         }
712         var once sync.Once
713         return connBlob{blob, func() {
714                 once.Do(func() { i.putConn(conn) })
715         }}, nil
716 }
717
718 func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
719         rowid, err := i.getBlobRowid(conn)
720         if err != nil {
721                 return nil, err
722         }
723         // This seems to cause locking issues with in-memory databases. Is it something to do with not
724         // having WAL?
725         if updateAccess {
726                 err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
727                 if err != nil {
728                         err = fmt.Errorf("updating last_used: %w", err)
729                         return nil, err
730                 }
731                 if conn.Changes() != 1 {
732                         panic(conn.Changes())
733                 }
734         }
735         return conn.OpenBlob("main", "blob", "data", rowid, write)
736 }
737
738 func (i instance) PutSized(reader io.Reader, size int64) (err error) {
739         err = i.withConn(func(_ context.Context, conn conn) error {
740                 err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
741                         nil,
742                         i.location, size)
743                 if err != nil {
744                         return err
745                 }
746                 blob, err := i.openBlob(conn, true, false)
747                 if err != nil {
748                         return err
749                 }
750                 defer blob.Close()
751                 _, err = io.Copy(blob, reader)
752                 return err
753         }, true)
754         return
755 }
756
757 func (i instance) Put(reader io.Reader) (err error) {
758         var buf bytes.Buffer
759         _, err = io.Copy(&buf, reader)
760         if err != nil {
761                 return err
762         }
763         if false {
764                 return i.PutSized(&buf, int64(buf.Len()))
765         } else {
766                 return i.withConn(func(_ context.Context, conn conn) error {
767                         for range iter.N(10) {
768                                 err = sqlitex.Exec(conn,
769                                         "insert or replace into blob(name, data) values(?, cast(? as blob))",
770                                         nil,
771                                         i.location, buf.Bytes())
772                                 if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
773                                         log.Print("sqlite busy")
774                                         time.Sleep(time.Second)
775                                         continue
776                                 }
777                                 break
778                         }
779                         return err
780                 }, true)
781         }
782 }
783
784 type fileInfo struct {
785         size int64
786 }
787
788 func (f fileInfo) Name() string {
789         panic("implement me")
790 }
791
792 func (f fileInfo) Size() int64 {
793         return f.size
794 }
795
796 func (f fileInfo) Mode() os.FileMode {
797         panic("implement me")
798 }
799
800 func (f fileInfo) ModTime() time.Time {
801         panic("implement me")
802 }
803
804 func (f fileInfo) IsDir() bool {
805         panic("implement me")
806 }
807
808 func (f fileInfo) Sys() interface{} {
809         panic("implement me")
810 }
811
812 func (i instance) Stat() (ret os.FileInfo, err error) {
813         err = i.withConn(func(_ context.Context, conn conn) error {
814                 var blob *sqlite.Blob
815                 blob, err = i.openBlob(conn, false, false)
816                 if err != nil {
817                         return err
818                 }
819                 defer blob.Close()
820                 ret = fileInfo{blob.Size()}
821                 return nil
822         }, false)
823         return
824 }
825
826 func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
827         err = i.withConn(func(_ context.Context, conn conn) error {
828                 if false {
829                         var blob *sqlite.Blob
830                         blob, err = i.openBlob(conn, false, true)
831                         if err != nil {
832                                 return err
833                         }
834                         defer blob.Close()
835                         if off >= blob.Size() {
836                                 err = io.EOF
837                                 return err
838                         }
839                         if off+int64(len(p)) > blob.Size() {
840                                 p = p[:blob.Size()-off]
841                         }
842                         n, err = blob.ReadAt(p, off)
843                 } else {
844                         gotRow := false
845                         err = sqlitex.Exec(
846                                 conn,
847                                 "select substr(data, ?, ?) from blob where name=?",
848                                 func(stmt *sqlite.Stmt) error {
849                                         if gotRow {
850                                                 panic("found multiple matching blobs")
851                                         } else {
852                                                 gotRow = true
853                                         }
854                                         n = stmt.ColumnBytes(0, p)
855                                         return nil
856                                 },
857                                 off+1, len(p), i.location,
858                         )
859                         if err != nil {
860                                 return err
861                         }
862                         if !gotRow {
863                                 err = errors.New("blob not found")
864                                 return err
865                         }
866                         if n < len(p) {
867                                 err = io.EOF
868                         }
869                 }
870                 return nil
871         }, false)
872         return
873 }
874
875 func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
876         panic("implement me")
877 }
878
879 func (i instance) Delete() error {
880         return i.withConn(func(_ context.Context, conn conn) error {
881                 return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
882         }, true)
883 }