]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/sqlite-storage.go
Don't set the page size by default
[btrtrc.git] / storage / sqlite / sqlite-storage.go
1 package sqliteStorage
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "expvar"
8         "fmt"
9         "io"
10         "log"
11         "net/url"
12         "os"
13         "runtime"
14         "runtime/pprof"
15         "strings"
16         "sync"
17         "time"
18
19         "crawshaw.io/sqlite"
20         "crawshaw.io/sqlite/sqlitex"
21         "github.com/anacrolix/missinggo/iter"
22         "github.com/anacrolix/missinggo/v2/resource"
23         "github.com/anacrolix/torrent/storage"
24 )
25
26 type conn = *sqlite.Conn
27
28 type InitConnOpts struct {
29         SetJournalMode string
30         MmapSizeOk     bool  // If false, a package-specific default will be used.
31         MmapSize       int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
32 }
33
34 func (me InitConnOpts) JournalMode() string {
35         if me.SetJournalMode != "" {
36                 return me.SetJournalMode
37         }
38         return "wal"
39 }
40
41 var UnexpectedJournalMode = errors.New("unexpected journal mode")
42
43 func initConn(conn conn, opts InitConnOpts) error {
44         // Recursive triggers are required because we need to trim the blob_meta size after trimming to
45         // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
46         err := sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
47         if err != nil {
48                 return err
49         }
50         err = sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
51         if err != nil {
52                 return err
53         }
54         if opts.SetJournalMode != "" {
55                 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
56                         ret := stmt.ColumnText(0)
57                         if ret != opts.SetJournalMode {
58                                 return UnexpectedJournalMode
59                         }
60                         return nil
61                 })
62                 if err != nil {
63                         return err
64                 }
65         }
66         if !opts.MmapSizeOk {
67                 // Set the default. Currently it seems the library picks reasonable defaults, especially for
68                 // wal.
69                 opts.MmapSize = -1
70                 //opts.MmapSize = 1 << 24 // 8 MiB
71         }
72         if opts.MmapSize >= 0 {
73                 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
74                 if err != nil {
75                         return err
76                 }
77         }
78         return nil
79 }
80
81 func InitSchema(conn conn, pageSize int, triggers bool) error {
82         if pageSize != 0 {
83                 err := sqlitex.ExecScript(conn, fmt.Sprintf("pragma page_size=%d", pageSize))
84                 if err != nil {
85                         return err
86                 }
87         }
88         err := sqlitex.ExecScript(conn, `
89                 -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
90                 -- can trim the database file size with partial vacuums without having to do a full vacuum, which 
91                 -- locks everything.
92                 pragma auto_vacuum=incremental;
93                 
94                 create table if not exists blob (
95                         name text,
96                         last_used timestamp default (datetime('now')),
97                         data blob,
98                         verified bool,
99                         primary key (name)
100                 );
101                 
102                 create table if not exists blob_meta (
103                         key text primary key,
104                         value
105                 );
106
107                 create index if not exists blob_last_used on blob(last_used);
108                 
109                 -- While sqlite *seems* to be faster to get sum(length(data)) instead of 
110                 -- sum(length(data)), it may still require a large table scan at start-up or with a 
111                 -- cold-cache. With this we can be assured that it doesn't.
112                 insert or ignore into blob_meta values ('size', 0);
113                 
114                 create table if not exists setting (
115                         name primary key on conflict replace,
116                         value
117                 );
118         
119                 create view if not exists deletable_blob as
120                 with recursive excess (
121                         usage_with,
122                         last_used,
123                         blob_rowid,
124                         data_length
125                 ) as (
126                         select * 
127                         from (
128                                 select 
129                                         (select value from blob_meta where key='size') as usage_with,
130                                         last_used,
131                                         rowid,
132                                         length(data)
133                                 from blob order by last_used, rowid limit 1
134                         )
135                         where usage_with > (select value from setting where name='capacity')
136                         union all
137                         select 
138                                 usage_with-data_length as new_usage_with,
139                                 blob.last_used,
140                                 blob.rowid,
141                                 length(data)
142                         from excess join blob
143                         on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
144                         where new_usage_with > (select value from setting where name='capacity')
145                 )
146                 select * from excess;
147         `)
148         if err != nil {
149                 return err
150         }
151         if triggers {
152                 err := sqlitex.ExecScript(conn, `
153                         create trigger if not exists after_insert_blob
154                         after insert on blob
155                         begin
156                                 update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
157                                 delete from blob where rowid in (select blob_rowid from deletable_blob);
158                         end;
159                         
160                         create trigger if not exists after_update_blob
161                         after update of data on blob
162                         begin
163                                 update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
164                                 delete from blob where rowid in (select blob_rowid from deletable_blob);
165                         end;
166                         
167                         create trigger if not exists after_delete_blob
168                         after delete on blob
169                         begin
170                                 update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
171                         end;
172                 `)
173                 if err != nil {
174                         return err
175                 }
176         }
177         return nil
178 }
179
180 type NewPiecesStorageOpts struct {
181         NewPoolOpts
182         InitDbOpts
183         ProvOpts    func(*ProviderOpts)
184         StorageOpts func(*storage.ResourcePiecesOpts)
185 }
186
187 // A convenience function that creates a connection pool, resource provider, and a pieces storage
188 // ClientImpl and returns them all with a Close attached.
189 func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
190         conns, err := NewPool(opts.NewPoolOpts)
191         if err != nil {
192                 return
193         }
194         err = initPoolDatabase(conns, opts.InitDbOpts)
195         if err != nil {
196                 return
197         }
198         provOpts := ProviderOpts{
199                 BatchWrites: conns.NumConns() > 1,
200         }
201         if f := opts.ProvOpts; f != nil {
202                 f(&provOpts)
203         }
204         prov, err := NewProvider(conns, provOpts)
205         if err != nil {
206                 conns.Close()
207                 return
208         }
209         var (
210                 journalMode string
211         )
212         withPoolConn(conns, func(c conn) {
213                 err = sqlitex.Exec(c, "pragma journal_mode", func(stmt *sqlite.Stmt) error {
214                         journalMode = stmt.ColumnText(0)
215                         return nil
216                 })
217         })
218         if err != nil {
219                 err = fmt.Errorf("getting journal mode: %w", err)
220                 prov.Close()
221                 return
222         }
223         if journalMode == "" {
224                 err = errors.New("didn't get journal mode")
225                 prov.Close()
226                 return
227         }
228         storageOpts := storage.ResourcePiecesOpts{
229                 NoSizedPuts: journalMode != "wal" || conns.NumConns() == 1,
230         }
231         if f := opts.StorageOpts; f != nil {
232                 f(&storageOpts)
233         }
234         store := storage.NewResourcePiecesOpts(prov, storageOpts)
235         return struct {
236                 storage.ClientImpl
237                 io.Closer
238         }{
239                 store,
240                 prov,
241         }, nil
242 }
243
244 type NewPoolOpts struct {
245         NewConnOpts
246         InitConnOpts
247         NumConns int
248 }
249
250 type InitDbOpts struct {
251         DontInitSchema bool
252         PageSize       int
253         // If non-zero, overrides the existing setting.
254         Capacity   int64
255         NoTriggers bool
256 }
257
258 // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
259 // the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
260 // top-level option type.
261 type PoolConf struct {
262         NumConns    int
263         JournalMode string
264 }
265
266 // Remove any capacity limits.
267 func UnlimitCapacity(conn conn) error {
268         return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
269 }
270
271 // Set the capacity limit to exactly this value.
272 func SetCapacity(conn conn, cap int64) error {
273         return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
274 }
275
276 type NewConnOpts struct {
277         // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
278         // private, temporary on-disk database will be created. This private database will be
279         // automatically deleted as soon as the database connection is closed."
280         Path   string
281         Memory bool
282         // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
283         // and NumConns < 2.
284         NoConcurrentBlobReads bool
285 }
286
287 func newOpenUri(opts NewConnOpts) string {
288         path := url.PathEscape(opts.Path)
289         if opts.Memory {
290                 path = ":memory:"
291         }
292         values := make(url.Values)
293         if opts.NoConcurrentBlobReads || opts.Memory {
294                 values.Add("cache", "shared")
295         }
296         return fmt.Sprintf("file:%s?%s", path, values.Encode())
297 }
298
299 func initDatabase(conn conn, opts InitDbOpts) (err error) {
300         if !opts.DontInitSchema {
301                 if opts.PageSize == 0 {
302                         // There doesn't seem to be an optimal size. I did try with the standard chunk size, but
303                         // the difference is not convincing.
304
305                         //opts.PageSize = 1 << 14
306                 }
307                 err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
308                 if err != nil {
309                         return
310                 }
311         }
312         if opts.Capacity != 0 {
313                 err = SetCapacity(conn, opts.Capacity)
314                 if err != nil {
315                         return
316                 }
317         }
318         return
319 }
320
321 func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
322         withPoolConn(pool, func(c conn) {
323                 err = initDatabase(c, opts)
324         })
325         return
326 }
327
328 func newConn(opts NewConnOpts) (conn, error) {
329         return sqlite.OpenConn(newOpenUri(opts), 0)
330 }
331
332 type poolWithNumConns struct {
333         *sqlitex.Pool
334         numConns int
335 }
336
337 func (me poolWithNumConns) NumConns() int {
338         return me.numConns
339 }
340
341 func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
342         if opts.NumConns == 0 {
343                 opts.NumConns = runtime.NumCPU()
344         }
345         conns, err := func() (ConnPool, error) {
346                 switch opts.NumConns {
347                 case 1:
348                         conn, err := newConn(opts.NewConnOpts)
349                         return &poolFromConn{conn: conn}, err
350                 default:
351                         _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), 0, opts.NumConns)
352                         return poolWithNumConns{_pool, opts.NumConns}, err
353                 }
354         }()
355         if err != nil {
356                 return
357         }
358         defer func() {
359                 if err != nil {
360                         conns.Close()
361                 }
362         }()
363         return conns, initPoolConns(nil, conns, InitPoolOpts{
364                 NumConns:     opts.NumConns,
365                 InitConnOpts: opts.InitConnOpts,
366         })
367 }
368
369 // Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
370 type poolFromConn struct {
371         mu   sync.Mutex
372         conn conn
373 }
374
375 func (me *poolFromConn) Get(ctx context.Context) conn {
376         me.mu.Lock()
377         return me.conn
378 }
379
380 func (me *poolFromConn) Put(conn conn) {
381         if conn != me.conn {
382                 panic("expected to same conn")
383         }
384         me.mu.Unlock()
385 }
386
387 func (me *poolFromConn) Close() error {
388         return me.conn.Close()
389 }
390
391 func (poolFromConn) NumConns() int { return 1 }
392
393 type ProviderOpts struct {
394         BatchWrites bool
395 }
396
397 // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
398 // the ConnPool (since it has to initialize all the connections anyway).
399 func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
400         prov := &provider{pool: pool, opts: opts}
401         if opts.BatchWrites {
402                 writes := make(chan writeRequest)
403                 prov.writes = writes
404                 // This is retained for backwards compatibility. It may not be necessary.
405                 runtime.SetFinalizer(prov, func(p *provider) {
406                         p.Close()
407                 })
408                 go providerWriter(writes, prov.pool)
409         }
410         return prov, nil
411 }
412
413 type InitPoolOpts struct {
414         NumConns int
415         InitConnOpts
416 }
417
418 func initPoolConns(ctx context.Context, pool ConnPool, opts InitPoolOpts) (err error) {
419         var conns []conn
420         defer func() {
421                 for _, c := range conns {
422                         pool.Put(c)
423                 }
424         }()
425         for range iter.N(opts.NumConns) {
426                 conn := pool.Get(ctx)
427                 if conn == nil {
428                         break
429                 }
430                 conns = append(conns, conn)
431                 err = initConn(conn, opts.InitConnOpts)
432                 if err != nil {
433                         err = fmt.Errorf("initing conn %v: %w", len(conns), err)
434                         return
435                 }
436         }
437         return
438 }
439
440 type ConnPool interface {
441         Get(context.Context) conn
442         Put(conn)
443         Close() error
444         NumConns() int
445 }
446
447 func withPoolConn(pool ConnPool, with func(conn)) {
448         c := pool.Get(nil)
449         defer pool.Put(c)
450         with(c)
451 }
452
453 type provider struct {
454         pool     ConnPool
455         writes   chan<- writeRequest
456         opts     ProviderOpts
457         closeMu  sync.RWMutex
458         closed   bool
459         closeErr error
460 }
461
462 var _ storage.ConsecutiveChunkReader = (*provider)(nil)
463
464 func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
465         p.closeMu.RLock()
466         runner, err := p.getReadWithConnRunner()
467         if err != nil {
468                 p.closeMu.RUnlock()
469                 return nil, err
470         }
471         r, w := io.Pipe()
472         go func() {
473                 defer p.closeMu.RUnlock()
474                 err = runner(func(_ context.Context, conn conn) error {
475                         var written int64
476                         err = sqlitex.Exec(conn, `
477                                 select
478                                         data,
479                                         cast(substr(name, ?+1) as integer) as offset
480                                 from blob
481                                 where name like ?||'%'
482                                 order by offset`,
483                                 func(stmt *sqlite.Stmt) error {
484                                         offset := stmt.ColumnInt64(1)
485                                         if offset != written {
486                                                 return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
487                                         }
488                                         // TODO: Avoid intermediate buffers here
489                                         r := stmt.ColumnReader(0)
490                                         w1, err := io.Copy(w, r)
491                                         written += w1
492                                         return err
493                                 },
494                                 len(prefix),
495                                 prefix,
496                         )
497                         return err
498                 })
499                 w.CloseWithError(err)
500         }()
501         return r, nil
502 }
503
504 func (me *provider) Close() error {
505         me.closeMu.Lock()
506         defer me.closeMu.Unlock()
507         if me.closed {
508                 return me.closeErr
509         }
510         if me.writes != nil {
511                 close(me.writes)
512         }
513         me.closeErr = me.pool.Close()
514         me.closed = true
515         return me.closeErr
516 }
517
518 type writeRequest struct {
519         query  withConn
520         done   chan<- error
521         labels pprof.LabelSet
522 }
523
524 var expvars = expvar.NewMap("sqliteStorage")
525
526 func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
527         pprof.Do(context.Background(), labels, func(ctx context.Context) {
528                 // We pass in the context in the hope that the CPU profiler might incorporate sqlite
529                 // activity the action that triggered it. It doesn't seem that way, since those calls don't
530                 // take a context.Context themselves. It may come in useful in the goroutine profiles
531                 // though, and doesn't hurt to expose it here for other purposes should things change.
532                 err = query(ctx, conn)
533         })
534         return
535 }
536
537 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
538 // stronger typing on the writes channel.
539 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
540         conn := pool.Get(context.TODO())
541         if conn == nil {
542                 return
543         }
544         defer pool.Put(conn)
545         for {
546                 first, ok := <-writes
547                 if !ok {
548                         return
549                 }
550                 var buf []func()
551                 var cantFail error
552                 func() {
553                         defer sqlitex.Save(conn)(&cantFail)
554                         firstErr := runQueryWithLabels(first.query, first.labels, conn)
555                         buf = append(buf, func() { first.done <- firstErr })
556                         for {
557                                 select {
558                                 case wr, ok := <-writes:
559                                         if ok {
560                                                 err := runQueryWithLabels(wr.query, wr.labels, conn)
561                                                 buf = append(buf, func() { wr.done <- err })
562                                                 continue
563                                         }
564                                 default:
565                                 }
566                                 break
567                         }
568                 }()
569                 // Not sure what to do if this failed.
570                 if cantFail != nil {
571                         expvars.Add("batchTransactionErrors", 1)
572                 }
573                 // Signal done after we know the transaction succeeded.
574                 for _, done := range buf {
575                         done()
576                 }
577                 expvars.Add("batchTransactions", 1)
578                 expvars.Add("batchedQueries", int64(len(buf)))
579                 //log.Printf("batched %v write queries", len(buf))
580         }
581 }
582
583 func (p *provider) NewInstance(s string) (resource.Instance, error) {
584         return instance{s, p}, nil
585 }
586
587 type instance struct {
588         location string
589         p        *provider
590 }
591
592 func getLabels(skip int) pprof.LabelSet {
593         return pprof.Labels("sqlite-storage-action", func() string {
594                 var pcs [8]uintptr
595                 runtime.Callers(skip+3, pcs[:])
596                 fs := runtime.CallersFrames(pcs[:])
597                 f, _ := fs.Next()
598                 funcName := f.Func.Name()
599                 funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
600                 //log.Printf("func name: %q", funcName)
601                 return funcName
602         }())
603 }
604
605 func (p *provider) withConn(with withConn, write bool, skip int) error {
606         p.closeMu.RLock()
607         // I think we need to check this here because it may not be valid to send to the writes channel
608         // if we're already closed. So don't try to move this check into getReadWithConnRunner.
609         if p.closed {
610                 p.closeMu.RUnlock()
611                 return errors.New("closed")
612         }
613         if write && p.opts.BatchWrites {
614                 done := make(chan error)
615                 p.writes <- writeRequest{
616                         query:  with,
617                         done:   done,
618                         labels: getLabels(skip + 1),
619                 }
620                 p.closeMu.RUnlock()
621                 return <-done
622         } else {
623                 defer p.closeMu.RUnlock()
624                 runner, err := p.getReadWithConnRunner()
625                 if err != nil {
626                         return err
627                 }
628                 return runner(with)
629         }
630 }
631
632 // Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
633 // function, the runner *must* be used or the conn is leaked. You should check the provider isn't
634 // closed before using this.
635 func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
636         conn := p.pool.Get(context.TODO())
637         if conn == nil {
638                 err = errors.New("couldn't get pool conn")
639                 return
640         }
641         with = func(with withConn) error {
642                 defer p.pool.Put(conn)
643                 return runQueryWithLabels(with, getLabels(1), conn)
644         }
645         return
646 }
647
648 type withConn func(context.Context, conn) error
649
650 func (i instance) withConn(with withConn, write bool) error {
651         return i.p.withConn(with, write, 1)
652 }
653
654 func (i instance) getConn() *sqlite.Conn {
655         return i.p.pool.Get(context.TODO())
656 }
657
658 func (i instance) putConn(conn *sqlite.Conn) {
659         i.p.pool.Put(conn)
660 }
661
662 func (i instance) Readdirnames() (names []string, err error) {
663         prefix := i.location + "/"
664         err = i.withConn(func(_ context.Context, conn conn) error {
665                 return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
666                         names = append(names, stmt.ColumnText(0)[len(prefix):])
667                         return nil
668                 }, prefix+"%")
669         }, false)
670         //log.Printf("readdir %q gave %q", i.location, names)
671         return
672 }
673
674 func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
675         rows := 0
676         err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
677                 rowid = stmt.ColumnInt64(0)
678                 rows++
679                 return nil
680         }, i.location)
681         if err != nil {
682                 return
683         }
684         if rows == 1 {
685                 return
686         }
687         if rows == 0 {
688                 err = errors.New("blob not found")
689                 return
690         }
691         panic(rows)
692 }
693
694 type connBlob struct {
695         *sqlite.Blob
696         onClose func()
697 }
698
699 func (me connBlob) Close() error {
700         err := me.Blob.Close()
701         me.onClose()
702         return err
703 }
704
705 func (i instance) Get() (ret io.ReadCloser, err error) {
706         conn := i.getConn()
707         if conn == nil {
708                 panic("nil sqlite conn")
709         }
710         blob, err := i.openBlob(conn, false, true)
711         if err != nil {
712                 i.putConn(conn)
713                 return
714         }
715         var once sync.Once
716         return connBlob{blob, func() {
717                 once.Do(func() { i.putConn(conn) })
718         }}, nil
719 }
720
721 func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
722         rowid, err := i.getBlobRowid(conn)
723         if err != nil {
724                 return nil, err
725         }
726         // This seems to cause locking issues with in-memory databases. Is it something to do with not
727         // having WAL?
728         if updateAccess {
729                 err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
730                 if err != nil {
731                         err = fmt.Errorf("updating last_used: %w", err)
732                         return nil, err
733                 }
734                 if conn.Changes() != 1 {
735                         panic(conn.Changes())
736                 }
737         }
738         return conn.OpenBlob("main", "blob", "data", rowid, write)
739 }
740
741 func (i instance) PutSized(reader io.Reader, size int64) (err error) {
742         err = i.withConn(func(_ context.Context, conn conn) error {
743                 err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
744                         nil,
745                         i.location, size)
746                 if err != nil {
747                         return err
748                 }
749                 blob, err := i.openBlob(conn, true, false)
750                 if err != nil {
751                         return err
752                 }
753                 defer blob.Close()
754                 _, err = io.Copy(blob, reader)
755                 return err
756         }, true)
757         return
758 }
759
760 func (i instance) Put(reader io.Reader) (err error) {
761         var buf bytes.Buffer
762         _, err = io.Copy(&buf, reader)
763         if err != nil {
764                 return err
765         }
766         if false {
767                 return i.PutSized(&buf, int64(buf.Len()))
768         } else {
769                 return i.withConn(func(_ context.Context, conn conn) error {
770                         for range iter.N(10) {
771                                 err = sqlitex.Exec(conn,
772                                         "insert or replace into blob(name, data) values(?, cast(? as blob))",
773                                         nil,
774                                         i.location, buf.Bytes())
775                                 if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
776                                         log.Print("sqlite busy")
777                                         time.Sleep(time.Second)
778                                         continue
779                                 }
780                                 break
781                         }
782                         return err
783                 }, true)
784         }
785 }
786
787 type fileInfo struct {
788         size int64
789 }
790
791 func (f fileInfo) Name() string {
792         panic("implement me")
793 }
794
795 func (f fileInfo) Size() int64 {
796         return f.size
797 }
798
799 func (f fileInfo) Mode() os.FileMode {
800         panic("implement me")
801 }
802
803 func (f fileInfo) ModTime() time.Time {
804         panic("implement me")
805 }
806
807 func (f fileInfo) IsDir() bool {
808         panic("implement me")
809 }
810
811 func (f fileInfo) Sys() interface{} {
812         panic("implement me")
813 }
814
815 func (i instance) Stat() (ret os.FileInfo, err error) {
816         err = i.withConn(func(_ context.Context, conn conn) error {
817                 var blob *sqlite.Blob
818                 blob, err = i.openBlob(conn, false, false)
819                 if err != nil {
820                         return err
821                 }
822                 defer blob.Close()
823                 ret = fileInfo{blob.Size()}
824                 return nil
825         }, false)
826         return
827 }
828
829 func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
830         err = i.withConn(func(_ context.Context, conn conn) error {
831                 if false {
832                         var blob *sqlite.Blob
833                         blob, err = i.openBlob(conn, false, true)
834                         if err != nil {
835                                 return err
836                         }
837                         defer blob.Close()
838                         if off >= blob.Size() {
839                                 err = io.EOF
840                                 return err
841                         }
842                         if off+int64(len(p)) > blob.Size() {
843                                 p = p[:blob.Size()-off]
844                         }
845                         n, err = blob.ReadAt(p, off)
846                 } else {
847                         gotRow := false
848                         err = sqlitex.Exec(
849                                 conn,
850                                 "select substr(data, ?, ?) from blob where name=?",
851                                 func(stmt *sqlite.Stmt) error {
852                                         if gotRow {
853                                                 panic("found multiple matching blobs")
854                                         } else {
855                                                 gotRow = true
856                                         }
857                                         n = stmt.ColumnBytes(0, p)
858                                         return nil
859                                 },
860                                 off+1, len(p), i.location,
861                         )
862                         if err != nil {
863                                 return err
864                         }
865                         if !gotRow {
866                                 err = errors.New("blob not found")
867                                 return err
868                         }
869                         if n < len(p) {
870                                 err = io.EOF
871                         }
872                 }
873                 return nil
874         }, false)
875         return
876 }
877
878 func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
879         panic("implement me")
880 }
881
882 func (i instance) Delete() error {
883         return i.withConn(func(_ context.Context, conn conn) error {
884                 return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
885         }, true)
886 }