]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/sqlite-storage.go
38f734fe5096f4d54a04d475620279dbf004800c
[btrtrc.git] / storage / sqlite / sqlite-storage.go
1 //go:build cgo
2 // +build cgo
3
4 package sqliteStorage
5
6 import (
7         "bytes"
8         "context"
9         "errors"
10         "expvar"
11         "fmt"
12         "io"
13         "log"
14         "net/url"
15         "os"
16         "runtime"
17         "runtime/pprof"
18         "strings"
19         "sync"
20         "time"
21
22         "crawshaw.io/sqlite"
23         "crawshaw.io/sqlite/sqlitex"
24         "github.com/anacrolix/missinggo/iter"
25         "github.com/anacrolix/missinggo/v2/resource"
26
27         "github.com/anacrolix/torrent/storage"
28 )
29
30 type conn = *sqlite.Conn
31
32 type InitConnOpts struct {
33         SetSynchronous int
34         SetJournalMode string
35         MmapSizeOk     bool  // If false, a package-specific default will be used.
36         MmapSize       int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
37 }
38
39 type UnexpectedJournalMode struct {
40         JournalMode string
41 }
42
43 func (me UnexpectedJournalMode) Error() string {
44         return fmt.Sprintf("unexpected journal mode: %q", me.JournalMode)
45 }
46
47 func setSynchronous(conn conn, syncInt int) (err error) {
48         err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma synchronous=%v`, syncInt), nil)
49         if err != nil {
50                 return err
51         }
52         var (
53                 actual   int
54                 actualOk bool
55         )
56         err = sqlitex.ExecTransient(conn, `pragma synchronous`, func(stmt *sqlite.Stmt) error {
57                 actual = stmt.ColumnInt(0)
58                 actualOk = true
59                 return nil
60         })
61         if err != nil {
62                 return
63         }
64         if !actualOk {
65                 return errors.New("synchronous setting query didn't return anything")
66         }
67         if actual != syncInt {
68                 return fmt.Errorf("set synchronous %q, got %q", syncInt, actual)
69         }
70         return nil
71 }
72
73 func initConn(conn conn, opts InitConnOpts) (err error) {
74         err = setSynchronous(conn, opts.SetSynchronous)
75         if err != nil {
76                 return
77         }
78         // Recursive triggers are required because we need to trim the blob_meta size after trimming to
79         // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
80         err = sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
81         if err != nil {
82                 return err
83         }
84         if opts.SetJournalMode != "" {
85                 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
86                         ret := stmt.ColumnText(0)
87                         if ret != opts.SetJournalMode {
88                                 return UnexpectedJournalMode{ret}
89                         }
90                         return nil
91                 })
92                 if err != nil {
93                         return err
94                 }
95         }
96         if !opts.MmapSizeOk {
97                 // Set the default. Currently it seems the library picks reasonable defaults, especially for
98                 // wal.
99                 opts.MmapSize = -1
100                 //opts.MmapSize = 1 << 24 // 8 MiB
101         }
102         if opts.MmapSize >= 0 {
103                 err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
104                 if err != nil {
105                         return err
106                 }
107         }
108         return nil
109 }
110
111 func setPageSize(conn conn, pageSize int) error {
112         if pageSize == 0 {
113                 return nil
114         }
115         var retSize int64
116         err := sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma page_size=%d`, pageSize), nil)
117         if err != nil {
118                 return err
119         }
120         err = sqlitex.ExecTransient(conn, "pragma page_size", func(stmt *sqlite.Stmt) error {
121                 retSize = stmt.ColumnInt64(0)
122                 return nil
123         })
124         if err != nil {
125                 return err
126         }
127         if retSize != int64(pageSize) {
128                 return fmt.Errorf("requested page size %v but got %v", pageSize, retSize)
129         }
130         return nil
131 }
132
133 func InitSchema(conn conn, pageSize int, triggers bool) error {
134         err := setPageSize(conn, pageSize)
135         if err != nil {
136                 return fmt.Errorf("setting page size: %w", err)
137         }
138         err = sqlitex.ExecScript(conn, `
139                 -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
140                 -- can trim the database file size with partial vacuums without having to do a full vacuum, which 
141                 -- locks everything.
142                 pragma auto_vacuum=incremental;
143                 
144                 create table if not exists blob (
145                         name text,
146                         last_used timestamp default (datetime('now')),
147                         data blob,
148                         verified bool,
149                         primary key (name)
150                 );
151                 
152                 create table if not exists blob_meta (
153                         key text primary key,
154                         value
155                 );
156
157                 create index if not exists blob_last_used on blob(last_used);
158                 
159                 -- While sqlite *seems* to be faster to get sum(length(data)) instead of 
160                 -- sum(length(data)), it may still require a large table scan at start-up or with a 
161                 -- cold-cache. With this we can be assured that it doesn't.
162                 insert or ignore into blob_meta values ('size', 0);
163                 
164                 create table if not exists setting (
165                         name primary key on conflict replace,
166                         value
167                 );
168         
169                 create view if not exists deletable_blob as
170                 with recursive excess (
171                         usage_with,
172                         last_used,
173                         blob_rowid,
174                         data_length
175                 ) as (
176                         select * 
177                         from (
178                                 select 
179                                         (select value from blob_meta where key='size') as usage_with,
180                                         last_used,
181                                         rowid,
182                                         length(data)
183                                 from blob order by last_used, rowid limit 1
184                         )
185                         where usage_with > (select value from setting where name='capacity')
186                         union all
187                         select 
188                                 usage_with-data_length as new_usage_with,
189                                 blob.last_used,
190                                 blob.rowid,
191                                 length(data)
192                         from excess join blob
193                         on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
194                         where new_usage_with > (select value from setting where name='capacity')
195                 )
196                 select * from excess;
197         `)
198         if err != nil {
199                 return err
200         }
201         if triggers {
202                 err := sqlitex.ExecScript(conn, `
203                         create trigger if not exists after_insert_blob
204                         after insert on blob
205                         begin
206                                 update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
207                                 delete from blob where rowid in (select blob_rowid from deletable_blob);
208                         end;
209                         
210                         create trigger if not exists after_update_blob
211                         after update of data on blob
212                         begin
213                                 update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
214                                 delete from blob where rowid in (select blob_rowid from deletable_blob);
215                         end;
216                         
217                         create trigger if not exists after_delete_blob
218                         after delete on blob
219                         begin
220                                 update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
221                         end;
222                 `)
223                 if err != nil {
224                         return err
225                 }
226         }
227         return nil
228 }
229
230 type NewPiecesStorageOpts struct {
231         NewPoolOpts
232         InitDbOpts
233         ProvOpts    func(*ProviderOpts)
234         StorageOpts func(*storage.ResourcePiecesOpts)
235 }
236
237 // A convenience function that creates a connection pool, resource provider, and a pieces storage
238 // ClientImpl and returns them all with a Close attached.
239 func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
240         conns, err := NewPool(opts.NewPoolOpts)
241         if err != nil {
242                 return
243         }
244         if opts.PageSize == 0 {
245                 opts.PageSize = 1 << 14
246         }
247         err = initPoolDatabase(conns, opts.InitDbOpts)
248         if err != nil {
249                 conns.Close()
250                 return
251         }
252         if opts.SetJournalMode == "" && !opts.Memory {
253                 opts.SetJournalMode = "wal"
254         }
255         err = initPoolConns(nil, conns, opts.InitConnOpts)
256         if err != nil {
257                 conns.Close()
258                 return
259         }
260         provOpts := ProviderOpts{
261                 BatchWrites: conns.NumConns() > 1,
262         }
263         if f := opts.ProvOpts; f != nil {
264                 f(&provOpts)
265         }
266         prov, err := NewProvider(conns, provOpts)
267         if err != nil {
268                 conns.Close()
269                 return
270         }
271         var (
272                 journalMode string
273         )
274         withPoolConn(conns, func(c conn) {
275                 err = sqlitex.Exec(c, "pragma journal_mode", func(stmt *sqlite.Stmt) error {
276                         journalMode = stmt.ColumnText(0)
277                         return nil
278                 })
279         })
280         if err != nil {
281                 err = fmt.Errorf("getting journal mode: %w", err)
282                 prov.Close()
283                 return
284         }
285         if journalMode == "" {
286                 err = errors.New("didn't get journal mode")
287                 prov.Close()
288                 return
289         }
290         storageOpts := storage.ResourcePiecesOpts{
291                 NoSizedPuts: journalMode != "wal" || conns.NumConns() == 1,
292         }
293         if f := opts.StorageOpts; f != nil {
294                 f(&storageOpts)
295         }
296         store := storage.NewResourcePiecesOpts(prov, storageOpts)
297         return struct {
298                 storage.ClientImpl
299                 io.Closer
300         }{
301                 store,
302                 prov,
303         }, nil
304 }
305
306 type NewPoolOpts struct {
307         NewConnOpts
308         InitConnOpts
309         NumConns int
310 }
311
312 type InitDbOpts struct {
313         DontInitSchema bool
314         PageSize       int
315         // If non-zero, overrides the existing setting.
316         Capacity   int64
317         NoTriggers bool
318 }
319
320 // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
321 // the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
322 // top-level option type.
323 type PoolConf struct {
324         NumConns    int
325         JournalMode string
326 }
327
328 // Remove any capacity limits.
329 func UnlimitCapacity(conn conn) error {
330         return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
331 }
332
333 // Set the capacity limit to exactly this value.
334 func SetCapacity(conn conn, cap int64) error {
335         return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
336 }
337
338 type NewConnOpts struct {
339         // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
340         // private, temporary on-disk database will be created. This private database will be
341         // automatically deleted as soon as the database connection is closed."
342         Path   string
343         Memory bool
344         // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
345         // and NumConns < 2.
346         NoConcurrentBlobReads bool
347 }
348
349 func newOpenUri(opts NewConnOpts) string {
350         path := url.PathEscape(opts.Path)
351         if opts.Memory {
352                 path = ":memory:"
353         }
354         values := make(url.Values)
355         if opts.NoConcurrentBlobReads || opts.Memory {
356                 values.Add("cache", "shared")
357         }
358         return fmt.Sprintf("file:%s?%s", path, values.Encode())
359 }
360
361 func initDatabase(conn conn, opts InitDbOpts) (err error) {
362         if !opts.DontInitSchema {
363                 err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
364                 if err != nil {
365                         return
366                 }
367         }
368         if opts.Capacity != 0 {
369                 err = SetCapacity(conn, opts.Capacity)
370                 if err != nil {
371                         return
372                 }
373         }
374         return
375 }
376
377 func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
378         withPoolConn(pool, func(c conn) {
379                 err = initDatabase(c, opts)
380         })
381         return
382 }
383
384 // Go fmt, why you so shit?
385 const openConnFlags = 0 |
386         sqlite.SQLITE_OPEN_READWRITE |
387         sqlite.SQLITE_OPEN_CREATE |
388         sqlite.SQLITE_OPEN_URI |
389         sqlite.SQLITE_OPEN_NOMUTEX
390
391 func newConn(opts NewConnOpts) (conn, error) {
392         return sqlite.OpenConn(newOpenUri(opts), openConnFlags)
393 }
394
395 type poolWithNumConns struct {
396         *sqlitex.Pool
397         numConns int
398 }
399
400 func (me poolWithNumConns) NumConns() int {
401         return me.numConns
402 }
403
404 func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
405         if opts.NumConns == 0 {
406                 opts.NumConns = runtime.NumCPU()
407         }
408         switch opts.NumConns {
409         case 1:
410                 conn, err := newConn(opts.NewConnOpts)
411                 return &poolFromConn{conn: conn}, err
412         default:
413                 _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), openConnFlags, opts.NumConns)
414                 return poolWithNumConns{_pool, opts.NumConns}, err
415         }
416 }
417
418 // Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
419 type poolFromConn struct {
420         mu   sync.Mutex
421         conn conn
422 }
423
424 func (me *poolFromConn) Get(ctx context.Context) conn {
425         me.mu.Lock()
426         return me.conn
427 }
428
429 func (me *poolFromConn) Put(conn conn) {
430         if conn != me.conn {
431                 panic("expected to same conn")
432         }
433         me.mu.Unlock()
434 }
435
436 func (me *poolFromConn) Close() error {
437         return me.conn.Close()
438 }
439
440 func (poolFromConn) NumConns() int { return 1 }
441
442 type ProviderOpts struct {
443         BatchWrites bool
444 }
445
446 // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
447 // the ConnPool (since it has to initialize all the connections anyway).
448 func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
449         prov := &provider{pool: pool, opts: opts}
450         if opts.BatchWrites {
451                 writes := make(chan writeRequest)
452                 prov.writes = writes
453                 // This is retained for backwards compatibility. It may not be necessary.
454                 runtime.SetFinalizer(prov, func(p *provider) {
455                         p.Close()
456                 })
457                 go providerWriter(writes, prov.pool)
458         }
459         return prov, nil
460 }
461
462 type InitPoolOpts struct {
463         NumConns int
464         InitConnOpts
465 }
466
467 func initPoolConns(ctx context.Context, pool ConnPool, opts InitConnOpts) (err error) {
468         var conns []conn
469         defer func() {
470                 for _, c := range conns {
471                         pool.Put(c)
472                 }
473         }()
474         for range iter.N(pool.NumConns()) {
475                 conn := pool.Get(ctx)
476                 if conn == nil {
477                         break
478                 }
479                 conns = append(conns, conn)
480                 err = initConn(conn, opts)
481                 if err != nil {
482                         err = fmt.Errorf("initing conn %v: %w", len(conns), err)
483                         return
484                 }
485         }
486         return
487 }
488
489 type ConnPool interface {
490         Get(context.Context) conn
491         Put(conn)
492         Close() error
493         NumConns() int
494 }
495
496 func withPoolConn(pool ConnPool, with func(conn)) {
497         c := pool.Get(nil)
498         defer pool.Put(c)
499         with(c)
500 }
501
502 type provider struct {
503         pool     ConnPool
504         writes   chan<- writeRequest
505         opts     ProviderOpts
506         closeMu  sync.RWMutex
507         closed   bool
508         closeErr error
509 }
510
511 var _ storage.ConsecutiveChunkReader = (*provider)(nil)
512
513 func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
514         p.closeMu.RLock()
515         runner, err := p.getReadWithConnRunner()
516         if err != nil {
517                 p.closeMu.RUnlock()
518                 return nil, err
519         }
520         r, w := io.Pipe()
521         go func() {
522                 defer p.closeMu.RUnlock()
523                 err = runner(func(_ context.Context, conn conn) error {
524                         var written int64
525                         err = sqlitex.Exec(conn, `
526                                 select
527                                         data,
528                                         cast(substr(name, ?+1) as integer) as offset
529                                 from blob
530                                 where name like ?||'%'
531                                 order by offset`,
532                                 func(stmt *sqlite.Stmt) error {
533                                         offset := stmt.ColumnInt64(1)
534                                         if offset != written {
535                                                 return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
536                                         }
537                                         // TODO: Avoid intermediate buffers here
538                                         r := stmt.ColumnReader(0)
539                                         w1, err := io.Copy(w, r)
540                                         written += w1
541                                         return err
542                                 },
543                                 len(prefix),
544                                 prefix,
545                         )
546                         return err
547                 })
548                 w.CloseWithError(err)
549         }()
550         return r, nil
551 }
552
553 func (me *provider) Close() error {
554         me.closeMu.Lock()
555         defer me.closeMu.Unlock()
556         if me.closed {
557                 return me.closeErr
558         }
559         if me.writes != nil {
560                 close(me.writes)
561         }
562         me.closeErr = me.pool.Close()
563         me.closed = true
564         return me.closeErr
565 }
566
567 type writeRequest struct {
568         query  withConn
569         done   chan<- error
570         labels pprof.LabelSet
571 }
572
573 var expvars = expvar.NewMap("sqliteStorage")
574
575 func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
576         pprof.Do(context.Background(), labels, func(ctx context.Context) {
577                 // We pass in the context in the hope that the CPU profiler might incorporate sqlite
578                 // activity the action that triggered it. It doesn't seem that way, since those calls don't
579                 // take a context.Context themselves. It may come in useful in the goroutine profiles
580                 // though, and doesn't hurt to expose it here for other purposes should things change.
581                 err = query(ctx, conn)
582         })
583         return
584 }
585
586 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
587 // stronger typing on the writes channel.
588 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
589         conn := pool.Get(context.TODO())
590         if conn == nil {
591                 return
592         }
593         defer pool.Put(conn)
594         for {
595                 first, ok := <-writes
596                 if !ok {
597                         return
598                 }
599                 var buf []func()
600                 var cantFail error
601                 func() {
602                         defer sqlitex.Save(conn)(&cantFail)
603                         firstErr := runQueryWithLabels(first.query, first.labels, conn)
604                         buf = append(buf, func() { first.done <- firstErr })
605                         for {
606                                 select {
607                                 case wr, ok := <-writes:
608                                         if ok {
609                                                 err := runQueryWithLabels(wr.query, wr.labels, conn)
610                                                 buf = append(buf, func() { wr.done <- err })
611                                                 continue
612                                         }
613                                 default:
614                                 }
615                                 break
616                         }
617                 }()
618                 // Not sure what to do if this failed.
619                 if cantFail != nil {
620                         expvars.Add("batchTransactionErrors", 1)
621                 }
622                 // Signal done after we know the transaction succeeded.
623                 for _, done := range buf {
624                         done()
625                 }
626                 expvars.Add("batchTransactions", 1)
627                 expvars.Add("batchedQueries", int64(len(buf)))
628                 //log.Printf("batched %v write queries", len(buf))
629         }
630 }
631
632 func (p *provider) NewInstance(s string) (resource.Instance, error) {
633         return instance{s, p}, nil
634 }
635
636 type instance struct {
637         location string
638         p        *provider
639 }
640
641 func getLabels(skip int) pprof.LabelSet {
642         return pprof.Labels("sqlite-storage-action", func() string {
643                 var pcs [8]uintptr
644                 runtime.Callers(skip+3, pcs[:])
645                 fs := runtime.CallersFrames(pcs[:])
646                 f, _ := fs.Next()
647                 funcName := f.Func.Name()
648                 funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
649                 //log.Printf("func name: %q", funcName)
650                 return funcName
651         }())
652 }
653
654 func (p *provider) withConn(with withConn, write bool, skip int) error {
655         p.closeMu.RLock()
656         // I think we need to check this here because it may not be valid to send to the writes channel
657         // if we're already closed. So don't try to move this check into getReadWithConnRunner.
658         if p.closed {
659                 p.closeMu.RUnlock()
660                 return errors.New("closed")
661         }
662         if write && p.opts.BatchWrites {
663                 done := make(chan error)
664                 p.writes <- writeRequest{
665                         query:  with,
666                         done:   done,
667                         labels: getLabels(skip + 1),
668                 }
669                 p.closeMu.RUnlock()
670                 return <-done
671         } else {
672                 defer p.closeMu.RUnlock()
673                 runner, err := p.getReadWithConnRunner()
674                 if err != nil {
675                         return err
676                 }
677                 return runner(with)
678         }
679 }
680
681 // Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
682 // function, the runner *must* be used or the conn is leaked. You should check the provider isn't
683 // closed before using this.
684 func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
685         conn := p.pool.Get(context.TODO())
686         if conn == nil {
687                 err = errors.New("couldn't get pool conn")
688                 return
689         }
690         with = func(with withConn) error {
691                 defer p.pool.Put(conn)
692                 return runQueryWithLabels(with, getLabels(1), conn)
693         }
694         return
695 }
696
697 type withConn func(context.Context, conn) error
698
699 func (i instance) withConn(with withConn, write bool) error {
700         return i.p.withConn(with, write, 1)
701 }
702
703 func (i instance) getConn() *sqlite.Conn {
704         return i.p.pool.Get(context.TODO())
705 }
706
707 func (i instance) putConn(conn *sqlite.Conn) {
708         i.p.pool.Put(conn)
709 }
710
711 func (i instance) Readdirnames() (names []string, err error) {
712         prefix := i.location + "/"
713         err = i.withConn(func(_ context.Context, conn conn) error {
714                 return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
715                         names = append(names, stmt.ColumnText(0)[len(prefix):])
716                         return nil
717                 }, prefix+"%")
718         }, false)
719         //log.Printf("readdir %q gave %q", i.location, names)
720         return
721 }
722
723 func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
724         rows := 0
725         err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
726                 rowid = stmt.ColumnInt64(0)
727                 rows++
728                 return nil
729         }, i.location)
730         if err != nil {
731                 return
732         }
733         if rows == 1 {
734                 return
735         }
736         if rows == 0 {
737                 err = errors.New("blob not found")
738                 return
739         }
740         panic(rows)
741 }
742
743 type connBlob struct {
744         *sqlite.Blob
745         onClose func()
746 }
747
748 func (me connBlob) Close() error {
749         err := me.Blob.Close()
750         me.onClose()
751         return err
752 }
753
754 func (i instance) Get() (ret io.ReadCloser, err error) {
755         conn := i.getConn()
756         if conn == nil {
757                 panic("nil sqlite conn")
758         }
759         blob, err := i.openBlob(conn, false, true)
760         if err != nil {
761                 i.putConn(conn)
762                 return
763         }
764         var once sync.Once
765         return connBlob{blob, func() {
766                 once.Do(func() { i.putConn(conn) })
767         }}, nil
768 }
769
770 func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
771         rowid, err := i.getBlobRowid(conn)
772         if err != nil {
773                 return nil, err
774         }
775         // This seems to cause locking issues with in-memory databases. Is it something to do with not
776         // having WAL?
777         if updateAccess {
778                 err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
779                 if err != nil {
780                         err = fmt.Errorf("updating last_used: %w", err)
781                         return nil, err
782                 }
783                 if conn.Changes() != 1 {
784                         panic(conn.Changes())
785                 }
786         }
787         return conn.OpenBlob("main", "blob", "data", rowid, write)
788 }
789
790 func (i instance) PutSized(reader io.Reader, size int64) (err error) {
791         err = i.withConn(func(_ context.Context, conn conn) error {
792                 err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
793                         nil,
794                         i.location, size)
795                 if err != nil {
796                         return err
797                 }
798                 blob, err := i.openBlob(conn, true, false)
799                 if err != nil {
800                         return err
801                 }
802                 defer blob.Close()
803                 _, err = io.Copy(blob, reader)
804                 return err
805         }, true)
806         return
807 }
808
809 func (i instance) Put(reader io.Reader) (err error) {
810         var buf bytes.Buffer
811         _, err = io.Copy(&buf, reader)
812         if err != nil {
813                 return err
814         }
815         if false {
816                 return i.PutSized(&buf, int64(buf.Len()))
817         } else {
818                 return i.withConn(func(_ context.Context, conn conn) error {
819                         for range iter.N(10) {
820                                 err = sqlitex.Exec(conn,
821                                         "insert or replace into blob(name, data) values(?, cast(? as blob))",
822                                         nil,
823                                         i.location, buf.Bytes())
824                                 if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
825                                         log.Print("sqlite busy")
826                                         time.Sleep(time.Second)
827                                         continue
828                                 }
829                                 break
830                         }
831                         return err
832                 }, true)
833         }
834 }
835
836 type fileInfo struct {
837         size int64
838 }
839
840 func (f fileInfo) Name() string {
841         panic("implement me")
842 }
843
844 func (f fileInfo) Size() int64 {
845         return f.size
846 }
847
848 func (f fileInfo) Mode() os.FileMode {
849         panic("implement me")
850 }
851
852 func (f fileInfo) ModTime() time.Time {
853         panic("implement me")
854 }
855
856 func (f fileInfo) IsDir() bool {
857         panic("implement me")
858 }
859
860 func (f fileInfo) Sys() interface{} {
861         panic("implement me")
862 }
863
864 func (i instance) Stat() (ret os.FileInfo, err error) {
865         err = i.withConn(func(_ context.Context, conn conn) error {
866                 var blob *sqlite.Blob
867                 blob, err = i.openBlob(conn, false, false)
868                 if err != nil {
869                         return err
870                 }
871                 defer blob.Close()
872                 ret = fileInfo{blob.Size()}
873                 return nil
874         }, false)
875         return
876 }
877
878 func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
879         err = i.withConn(func(_ context.Context, conn conn) error {
880                 if false {
881                         var blob *sqlite.Blob
882                         blob, err = i.openBlob(conn, false, true)
883                         if err != nil {
884                                 return err
885                         }
886                         defer blob.Close()
887                         if off >= blob.Size() {
888                                 err = io.EOF
889                                 return err
890                         }
891                         if off+int64(len(p)) > blob.Size() {
892                                 p = p[:blob.Size()-off]
893                         }
894                         n, err = blob.ReadAt(p, off)
895                 } else {
896                         gotRow := false
897                         err = sqlitex.Exec(
898                                 conn,
899                                 "select substr(data, ?, ?) from blob where name=?",
900                                 func(stmt *sqlite.Stmt) error {
901                                         if gotRow {
902                                                 panic("found multiple matching blobs")
903                                         } else {
904                                                 gotRow = true
905                                         }
906                                         n = stmt.ColumnBytes(0, p)
907                                         return nil
908                                 },
909                                 off+1, len(p), i.location,
910                         )
911                         if err != nil {
912                                 return err
913                         }
914                         if !gotRow {
915                                 err = errors.New("blob not found")
916                                 return err
917                         }
918                         if n < len(p) {
919                                 err = io.EOF
920                         }
921                 }
922                 return nil
923         }, false)
924         return
925 }
926
927 func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
928         panic("implement me")
929 }
930
931 func (i instance) Delete() error {
932         return i.withConn(func(_ context.Context, conn conn) error {
933                 return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
934         }, true)
935 }