]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/sqlite-storage.go
Move storage piece benchmarks to storage/test and add a lot more dials
[btrtrc.git] / storage / sqlite / sqlite-storage.go
1 package sqliteStorage
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "expvar"
8         "fmt"
9         "io"
10         "log"
11         "net/url"
12         "os"
13         "runtime"
14         "runtime/pprof"
15         "strings"
16         "sync"
17         "time"
18
19         "crawshaw.io/sqlite"
20         "crawshaw.io/sqlite/sqlitex"
21         "github.com/anacrolix/missinggo/iter"
22         "github.com/anacrolix/missinggo/v2/resource"
23         "github.com/anacrolix/torrent/storage"
24 )
25
26 type conn = *sqlite.Conn
27
28 func initConn(conn conn, wal bool) error {
29         // Recursive triggers are required because we need to trim the blob_meta size after trimming to
30         // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
31         err := sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
32         if err != nil {
33                 return err
34         }
35         err = sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
36         if err != nil {
37                 return err
38         }
39         if !wal {
40                 err = sqlitex.ExecTransient(conn, `pragma journal_mode=off`, nil)
41                 if err != nil {
42                         return err
43                 }
44         }
45         err = sqlitex.ExecTransient(conn, `pragma mmap_size=1000000000000`, nil)
46         if err != nil {
47                 return err
48         }
49         return nil
50 }
51
52 func initSchema(conn conn, pageSize int, triggers bool) error {
53         if pageSize != 0 {
54                 err := sqlitex.ExecScript(conn, fmt.Sprintf("pragma page_size=%d", pageSize))
55                 if err != nil {
56                         return err
57                 }
58         }
59         err := sqlitex.ExecScript(conn, `
60                 -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
61                 -- can trim the database file size with partial vacuums without having to do a full vacuum, which 
62                 -- locks everything.
63                 pragma auto_vacuum=incremental;
64                 
65                 create table if not exists blob (
66                         name text,
67                         last_used timestamp default (datetime('now')),
68                         data blob,
69                         primary key (name)
70                 );
71                 
72                 create table if not exists blob_meta (
73                         key text primary key,
74                         value
75                 );
76                 
77                 -- While sqlite *seems* to be faster to get sum(length(data)) instead of 
78                 -- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a 
79                 -- cold-cache. With this we can be assured that it doesn't.
80                 insert or ignore into blob_meta values ('size', 0);
81                 
82                 create table if not exists setting (
83                         name primary key on conflict replace,
84                         value
85                 );
86         
87                 create view if not exists deletable_blob as
88                 with recursive excess (
89                         usage_with,
90                         last_used,
91                         blob_rowid,
92                         data_length
93                 ) as (
94                         select * 
95                         from (
96                                 select 
97                                         (select value from blob_meta where key='size') as usage_with,
98                                         last_used,
99                                         rowid,
100                                         length(cast(data as blob))
101                                 from blob order by last_used, rowid limit 1
102                         )
103                         where usage_with > (select value from setting where name='capacity')
104                         union all
105                         select 
106                                 usage_with-data_length as new_usage_with,
107                                 blob.last_used,
108                                 blob.rowid,
109                                 length(cast(data as blob))
110                         from excess join blob
111                         on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
112                         where new_usage_with > (select value from setting where name='capacity')
113                 )
114                 select * from excess;
115         `)
116         if err != nil {
117                 return err
118         }
119         if triggers {
120                 err := sqlitex.ExecScript(conn, `
121                         create trigger if not exists after_insert_blob
122                         after insert on blob
123                         begin
124                                 update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
125                                 delete from blob where rowid in (select blob_rowid from deletable_blob);
126                         end;
127                         
128                         create trigger if not exists after_update_blob
129                         after update of data on blob
130                         begin
131                                 update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
132                                 delete from blob where rowid in (select blob_rowid from deletable_blob);
133                         end;
134                         
135                         create trigger if not exists after_delete_blob
136                         after delete on blob
137                         begin
138                                 update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
139                         end;
140                 `)
141                 if err != nil {
142                         return err
143                 }
144         }
145         return nil
146 }
147
148 type NewPiecesStorageOpts struct {
149         NewPoolOpts
150         ProvOpts func(*ProviderOpts)
151         storage.ResourcePiecesOpts
152 }
153
154 // A convenience function that creates a connection pool, resource provider, and a pieces storage
155 // ClientImpl and returns them all with a Close attached.
156 func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
157         conns, provOpts, err := NewPool(opts.NewPoolOpts)
158         if err != nil {
159                 return
160         }
161         if f := opts.ProvOpts; f != nil {
162                 f(&provOpts)
163         }
164         prov, err := NewProvider(conns, provOpts)
165         if err != nil {
166                 conns.Close()
167                 return
168         }
169         store := storage.NewResourcePiecesOpts(prov, opts.ResourcePiecesOpts)
170         return struct {
171                 storage.ClientImpl
172                 io.Closer
173         }{
174                 store,
175                 prov,
176         }, nil
177 }
178
179 type NewPoolOpts struct {
180         Path     string
181         Memory   bool
182         NumConns int
183         // Forces WAL, disables shared caching.
184         ConcurrentBlobReads bool
185         DontInitSchema      bool
186         PageSize            int
187         // If non-zero, overrides the existing setting.
188         Capacity int64
189 }
190
191 // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
192 // the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
193 // top-level option type.
194 type ProviderOpts struct {
195         NumConns int
196         // Concurrent blob reads require WAL.
197         ConcurrentBlobRead bool
198         BatchWrites        bool
199 }
200
201 // Remove any capacity limits.
202 func UnlimitCapacity(conn conn) error {
203         return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
204 }
205
206 // Set the capacity limit to exactly this value.
207 func SetCapacity(conn conn, cap int64) error {
208         return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
209 }
210
211 func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
212         if opts.NumConns == 0 {
213                 opts.NumConns = runtime.NumCPU()
214         }
215         if opts.Memory {
216                 opts.Path = ":memory:"
217         }
218         values := make(url.Values)
219         if !opts.ConcurrentBlobReads {
220                 values.Add("cache", "shared")
221         }
222         path := fmt.Sprintf("file:%s?%s", opts.Path, values.Encode())
223         conns, err := func() (ConnPool, error) {
224                 switch opts.NumConns {
225                 case 1:
226                         conn, err := sqlite.OpenConn(path, 0)
227                         return &poolFromConn{conn: conn}, err
228                 default:
229                         return sqlitex.Open(path, 0, opts.NumConns)
230                 }
231         }()
232         if err != nil {
233                 return
234         }
235         defer func() {
236                 if err != nil {
237                         conns.Close()
238                 }
239         }()
240         conn := conns.Get(context.TODO())
241         defer conns.Put(conn)
242         if !opts.DontInitSchema {
243                 if opts.PageSize == 0 {
244                         opts.PageSize = 1 << 14
245                 }
246                 err = initSchema(conn, opts.PageSize, true)
247                 if err != nil {
248                         return
249                 }
250         }
251         if opts.Capacity != 0 {
252                 err = SetCapacity(conn, opts.Capacity)
253                 if err != nil {
254                         return
255                 }
256         }
257         return conns, ProviderOpts{
258                 NumConns:           opts.NumConns,
259                 ConcurrentBlobRead: opts.ConcurrentBlobReads,
260                 BatchWrites:        true,
261         }, nil
262 }
263
264 // Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
265 type poolFromConn struct {
266         mu   sync.Mutex
267         conn conn
268 }
269
270 func (me *poolFromConn) Get(ctx context.Context) conn {
271         me.mu.Lock()
272         return me.conn
273 }
274
275 func (me *poolFromConn) Put(conn conn) {
276         if conn != me.conn {
277                 panic("expected to same conn")
278         }
279         me.mu.Unlock()
280 }
281
282 func (me *poolFromConn) Close() error {
283         return me.conn.Close()
284 }
285
286 // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
287 // the ConnPool (since it has to initialize all the connections anyway).
288 func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
289         _, err = initPoolConns(context.TODO(), pool, opts.NumConns, true)
290         if err != nil {
291                 return
292         }
293         prov := &provider{pool: pool, opts: opts}
294         if opts.BatchWrites {
295                 if opts.NumConns < 2 {
296                         err = errors.New("batch writes requires more than 1 conn")
297                         return
298                 }
299                 writes := make(chan writeRequest)
300                 prov.writes = writes
301                 // This is retained for backwards compatibility. It may not be necessary.
302                 runtime.SetFinalizer(prov, func(p *provider) {
303                         p.Close()
304                 })
305                 go providerWriter(writes, prov.pool)
306         }
307         return prov, nil
308 }
309
310 func initPoolConns(ctx context.Context, pool ConnPool, numConn int, wal bool) (numInited int, err error) {
311         var conns []conn
312         defer func() {
313                 for _, c := range conns {
314                         pool.Put(c)
315                 }
316         }()
317         for range iter.N(numConn) {
318                 conn := pool.Get(ctx)
319                 if conn == nil {
320                         break
321                 }
322                 conns = append(conns, conn)
323                 err = initConn(conn, wal)
324                 if err != nil {
325                         err = fmt.Errorf("initing conn %v: %w", len(conns), err)
326                         return
327                 }
328                 numInited++
329         }
330         return
331 }
332
333 type ConnPool interface {
334         Get(context.Context) conn
335         Put(conn)
336         Close() error
337 }
338
339 type provider struct {
340         pool     ConnPool
341         writes   chan<- writeRequest
342         opts     ProviderOpts
343         closed   sync.Once
344         closeErr error
345 }
346
347 var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
348
349 func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written int64, err error) {
350         err = p.withConn(func(conn conn) error {
351                 err = io.EOF
352                 err = sqlitex.Exec(conn, `
353                                 select
354                                         cast(data as blob),
355                                         cast(substr(name, ?+1) as integer) as offset
356                                 from blob
357                                 where name like ?||'%'
358                                 order by offset`,
359                         func(stmt *sqlite.Stmt) error {
360                                 offset := stmt.ColumnInt64(1)
361                                 if offset != written {
362                                         return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
363                                 }
364                                 r := stmt.ColumnReader(0)
365                                 w1, err := io.Copy(w, r)
366                                 written += w1
367                                 return err
368                         },
369                         len(prefix),
370                         prefix,
371                 )
372                 return err
373         }, false, 0)
374         return
375 }
376
377 func (me *provider) Close() error {
378         me.closed.Do(func() {
379                 if me.writes != nil {
380                         close(me.writes)
381                 }
382                 me.closeErr = me.pool.Close()
383         })
384         return me.closeErr
385 }
386
387 type writeRequest struct {
388         query  withConn
389         done   chan<- error
390         labels pprof.LabelSet
391 }
392
393 var expvars = expvar.NewMap("sqliteStorage")
394
395 func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
396         pprof.Do(context.Background(), labels, func(context.Context) {
397                 err = query(conn)
398         })
399         return
400 }
401
402 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
403 // stronger typing on the writes channel.
404 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
405         conn := pool.Get(context.TODO())
406         if conn == nil {
407                 return
408         }
409         defer pool.Put(conn)
410         for {
411                 first, ok := <-writes
412                 if !ok {
413                         return
414                 }
415                 var buf []func()
416                 var cantFail error
417                 func() {
418                         defer sqlitex.Save(conn)(&cantFail)
419                         firstErr := runQueryWithLabels(first.query, first.labels, conn)
420                         buf = append(buf, func() { first.done <- firstErr })
421                         for {
422                                 select {
423                                 case wr, ok := <-writes:
424                                         if ok {
425                                                 err := runQueryWithLabels(wr.query, wr.labels, conn)
426                                                 buf = append(buf, func() { wr.done <- err })
427                                                 continue
428                                         }
429                                 default:
430                                 }
431                                 break
432                         }
433                 }()
434                 // Not sure what to do if this failed.
435                 if cantFail != nil {
436                         expvars.Add("batchTransactionErrors", 1)
437                 }
438                 // Signal done after we know the transaction succeeded.
439                 for _, done := range buf {
440                         done()
441                 }
442                 expvars.Add("batchTransactions", 1)
443                 expvars.Add("batchedQueries", int64(len(buf)))
444                 //log.Printf("batched %v write queries", len(buf))
445         }
446 }
447
448 func (p *provider) NewInstance(s string) (resource.Instance, error) {
449         return instance{s, p}, nil
450 }
451
452 type instance struct {
453         location string
454         p        *provider
455 }
456
457 func getLabels(skip int) pprof.LabelSet {
458         return pprof.Labels("f", func() string {
459                 var pcs [8]uintptr
460                 runtime.Callers(skip+3, pcs[:])
461                 fs := runtime.CallersFrames(pcs[:])
462                 f, _ := fs.Next()
463                 funcName := f.Func.Name()
464                 funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
465                 //log.Printf("func name: %q", funcName)
466                 return funcName
467         }())
468 }
469
470 func (p *provider) withConn(with withConn, write bool, skip int) error {
471         if write && p.opts.BatchWrites {
472                 done := make(chan error)
473                 p.writes <- writeRequest{
474                         query:  with,
475                         done:   done,
476                         labels: getLabels(skip + 1),
477                 }
478                 return <-done
479         } else {
480                 conn := p.pool.Get(context.TODO())
481                 if conn == nil {
482                         return errors.New("couldn't get pool conn")
483                 }
484                 defer p.pool.Put(conn)
485                 return runQueryWithLabels(with, getLabels(skip+1), conn)
486         }
487 }
488
489 type withConn func(conn) error
490
491 func (i instance) withConn(with withConn, write bool) error {
492         return i.p.withConn(with, write, 1)
493 }
494
495 func (i instance) getConn() *sqlite.Conn {
496         return i.p.pool.Get(context.TODO())
497 }
498
499 func (i instance) putConn(conn *sqlite.Conn) {
500         i.p.pool.Put(conn)
501 }
502
503 func (i instance) Readdirnames() (names []string, err error) {
504         prefix := i.location + "/"
505         err = i.withConn(func(conn conn) error {
506                 return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
507                         names = append(names, stmt.ColumnText(0)[len(prefix):])
508                         return nil
509                 }, prefix+"%")
510         }, false)
511         //log.Printf("readdir %q gave %q", i.location, names)
512         return
513 }
514
515 func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
516         rows := 0
517         err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
518                 rowid = stmt.ColumnInt64(0)
519                 rows++
520                 return nil
521         }, i.location)
522         if err != nil {
523                 return
524         }
525         if rows == 1 {
526                 return
527         }
528         if rows == 0 {
529                 err = errors.New("blob not found")
530                 return
531         }
532         panic(rows)
533 }
534
535 type connBlob struct {
536         *sqlite.Blob
537         onClose func()
538 }
539
540 func (me connBlob) Close() error {
541         err := me.Blob.Close()
542         me.onClose()
543         return err
544 }
545
546 func (i instance) Get() (ret io.ReadCloser, err error) {
547         conn := i.getConn()
548         if conn == nil {
549                 panic("nil sqlite conn")
550         }
551         blob, err := i.openBlob(conn, false, true)
552         if err != nil {
553                 i.putConn(conn)
554                 return
555         }
556         var once sync.Once
557         return connBlob{blob, func() {
558                 once.Do(func() { i.putConn(conn) })
559         }}, nil
560 }
561
562 func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
563         rowid, err := i.getBlobRowid(conn)
564         if err != nil {
565                 return nil, err
566         }
567         // This seems to cause locking issues with in-memory databases. Is it something to do with not
568         // having WAL?
569         if updateAccess {
570                 err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
571                 if err != nil {
572                         err = fmt.Errorf("updating last_used: %w", err)
573                         return nil, err
574                 }
575                 if conn.Changes() != 1 {
576                         panic(conn.Changes())
577                 }
578         }
579         return conn.OpenBlob("main", "blob", "data", rowid, write)
580 }
581
582 func (i instance) PutSized(reader io.Reader, size int64) (err error) {
583         err = i.withConn(func(conn conn) error {
584                 err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
585                         nil,
586                         i.location, size)
587                 if err != nil {
588                         return err
589                 }
590                 blob, err := i.openBlob(conn, true, false)
591                 if err != nil {
592                         return err
593                 }
594                 defer blob.Close()
595                 _, err = io.Copy(blob, reader)
596                 return err
597         }, true)
598         return
599 }
600
601 func (i instance) Put(reader io.Reader) (err error) {
602         var buf bytes.Buffer
603         _, err = io.Copy(&buf, reader)
604         if err != nil {
605                 return err
606         }
607         if false {
608                 return i.PutSized(&buf, int64(buf.Len()))
609         } else {
610                 return i.withConn(func(conn conn) error {
611                         for range iter.N(10) {
612                                 err = sqlitex.Exec(conn,
613                                         "insert or replace into blob(name, data) values(?, cast(? as blob))",
614                                         nil,
615                                         i.location, buf.Bytes())
616                                 if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
617                                         log.Print("sqlite busy")
618                                         time.Sleep(time.Second)
619                                         continue
620                                 }
621                                 break
622                         }
623                         return err
624                 }, true)
625         }
626 }
627
628 type fileInfo struct {
629         size int64
630 }
631
632 func (f fileInfo) Name() string {
633         panic("implement me")
634 }
635
636 func (f fileInfo) Size() int64 {
637         return f.size
638 }
639
640 func (f fileInfo) Mode() os.FileMode {
641         panic("implement me")
642 }
643
644 func (f fileInfo) ModTime() time.Time {
645         panic("implement me")
646 }
647
648 func (f fileInfo) IsDir() bool {
649         panic("implement me")
650 }
651
652 func (f fileInfo) Sys() interface{} {
653         panic("implement me")
654 }
655
656 func (i instance) Stat() (ret os.FileInfo, err error) {
657         err = i.withConn(func(conn conn) error {
658                 var blob *sqlite.Blob
659                 blob, err = i.openBlob(conn, false, false)
660                 if err != nil {
661                         return err
662                 }
663                 defer blob.Close()
664                 ret = fileInfo{blob.Size()}
665                 return nil
666         }, false)
667         return
668 }
669
670 func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
671         err = i.withConn(func(conn conn) error {
672                 if false {
673                         var blob *sqlite.Blob
674                         blob, err = i.openBlob(conn, false, true)
675                         if err != nil {
676                                 return err
677                         }
678                         defer blob.Close()
679                         if off >= blob.Size() {
680                                 err = io.EOF
681                                 return err
682                         }
683                         if off+int64(len(p)) > blob.Size() {
684                                 p = p[:blob.Size()-off]
685                         }
686                         n, err = blob.ReadAt(p, off)
687                 } else {
688                         gotRow := false
689                         err = sqlitex.Exec(
690                                 conn,
691                                 "select substr(cast(data as blob), ?, ?) from blob where name=?",
692                                 func(stmt *sqlite.Stmt) error {
693                                         if gotRow {
694                                                 panic("found multiple matching blobs")
695                                         } else {
696                                                 gotRow = true
697                                         }
698                                         n = stmt.ColumnBytes(0, p)
699                                         return nil
700                                 },
701                                 off+1, len(p), i.location,
702                         )
703                         if err != nil {
704                                 return err
705                         }
706                         if !gotRow {
707                                 err = errors.New("blob not found")
708                                 return err
709                         }
710                         if n < len(p) {
711                                 err = io.EOF
712                         }
713                 }
714                 return nil
715         }, false)
716         return
717 }
718
719 func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
720         panic("implement me")
721 }
722
723 func (i instance) Delete() error {
724         return i.withConn(func(conn conn) error {
725                 return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
726         }, true)
727 }