From 2ddd3169efcfa01cc308a417f967b34db551bf93 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 19 Jan 2021 09:28:09 +1100 Subject: [PATCH] Move storage piece benchmarks to storage/test and add a lot more dials --- storage/bench-resource-pieces.go | 39 ----- storage/sqlite/sqlite-storage.go | 239 ++++++++++++++++---------- storage/sqlite/sqlite-storage_test.go | 55 +++--- storage/test/bench-resource-pieces.go | 73 ++++++++ test/transfer_test.go | 5 +- 5 files changed, 243 insertions(+), 168 deletions(-) delete mode 100644 storage/bench-resource-pieces.go create mode 100644 storage/test/bench-resource-pieces.go diff --git a/storage/bench-resource-pieces.go b/storage/bench-resource-pieces.go deleted file mode 100644 index 753b914a..00000000 --- a/storage/bench-resource-pieces.go +++ /dev/null @@ -1,39 +0,0 @@ -package storage - -import ( - "bytes" - "io" - "io/ioutil" - "sync" - "testing" - - qt "github.com/frankban/quicktest" -) - -func BenchmarkPieceMarkComplete(tb testing.TB, pi PieceImpl, data []byte) { - c := qt.New(tb) - var wg sync.WaitGroup - for off := int64(0); off < int64(len(data)); off += chunkSize { - wg.Add(1) - go func(off int64) { - defer wg.Done() - n, err := pi.WriteAt(data[off:off+chunkSize], off) - if err != nil { - panic(err) - } - if n != chunkSize { - panic(n) - } - }(off) - } - wg.Wait() - //pi.MarkNotComplete() - // This might not apply if users of this benchmark don't cache with the expected capacity. - c.Assert(pi.Completion(), qt.Equals, Completion{Complete: false, Ok: true}) - c.Assert(pi.MarkComplete(), qt.IsNil) - c.Assert(pi.Completion(), qt.Equals, Completion{true, true}) - readData, err := ioutil.ReadAll(io.NewSectionReader(pi, 0, int64(len(data)))) - c.Assert(err, qt.IsNil) - c.Assert(len(readData), qt.Equals, len(data)) - c.Assert(bytes.Equal(readData, data), qt.IsTrue) -} diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index 0f8dc136..2700204a 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -11,6 +11,8 @@ import ( "net/url" "os" "runtime" + "runtime/pprof" + "strings" "sync" "time" @@ -47,103 +49,124 @@ func initConn(conn conn, wal bool) error { return nil } -func initSchema(conn conn) error { - return sqlitex.ExecScript(conn, ` --- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we --- can trim the database file size with partial vacuums without having to do a full vacuum, which --- locks everything. -pragma page_size=16384; -pragma auto_vacuum=incremental; - -create table if not exists blob ( - name text, - last_used timestamp default (datetime('now')), - data blob, - primary key (name) -); - -create table if not exists blob_meta ( - key text primary key, - value -); - --- While sqlite *seems* to be faster to get sum(length(data)) instead of --- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a --- cold-cache. With this we can be assured that it doesn't. -insert or ignore into blob_meta values ('size', 0); - -create table if not exists setting ( - name primary key on conflict replace, - value -); - -create view if not exists deletable_blob as -with recursive excess ( - usage_with, - last_used, - blob_rowid, - data_length -) as ( - select * - from ( - select - (select value from blob_meta where key='size') as usage_with, +func initSchema(conn conn, pageSize int, triggers bool) error { + if pageSize != 0 { + err := sqlitex.ExecScript(conn, fmt.Sprintf("pragma page_size=%d", pageSize)) + if err != nil { + return err + } + } + err := sqlitex.ExecScript(conn, ` + -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we + -- can trim the database file size with partial vacuums without having to do a full vacuum, which + -- locks everything. + pragma auto_vacuum=incremental; + + create table if not exists blob ( + name text, + last_used timestamp default (datetime('now')), + data blob, + primary key (name) + ); + + create table if not exists blob_meta ( + key text primary key, + value + ); + + -- While sqlite *seems* to be faster to get sum(length(data)) instead of + -- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a + -- cold-cache. With this we can be assured that it doesn't. + insert or ignore into blob_meta values ('size', 0); + + create table if not exists setting ( + name primary key on conflict replace, + value + ); + + create view if not exists deletable_blob as + with recursive excess ( + usage_with, last_used, - rowid, - length(cast(data as blob)) - from blob order by last_used, rowid limit 1 - ) - where usage_with > (select value from setting where name='capacity') - union all - select - usage_with-data_length as new_usage_with, - blob.last_used, - blob.rowid, - length(cast(data as blob)) - from excess join blob - on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid)) - where new_usage_with > (select value from setting where name='capacity') -) -select * from excess; - -create trigger if not exists after_insert_blob -after insert on blob -begin - update blob_meta set value=value+length(cast(new.data as blob)) where key='size'; - delete from blob where rowid in (select blob_rowid from deletable_blob); -end; - -create trigger if not exists after_update_blob -after update of data on blob -begin - update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size'; - delete from blob where rowid in (select blob_rowid from deletable_blob); -end; - -create trigger if not exists after_delete_blob -after delete on blob -begin - update blob_meta set value=value-length(cast(old.data as blob)) where key='size'; -end; -`) + blob_rowid, + data_length + ) as ( + select * + from ( + select + (select value from blob_meta where key='size') as usage_with, + last_used, + rowid, + length(cast(data as blob)) + from blob order by last_used, rowid limit 1 + ) + where usage_with > (select value from setting where name='capacity') + union all + select + usage_with-data_length as new_usage_with, + blob.last_used, + blob.rowid, + length(cast(data as blob)) + from excess join blob + on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid)) + where new_usage_with > (select value from setting where name='capacity') + ) + select * from excess; + `) + if err != nil { + return err + } + if triggers { + err := sqlitex.ExecScript(conn, ` + create trigger if not exists after_insert_blob + after insert on blob + begin + update blob_meta set value=value+length(cast(new.data as blob)) where key='size'; + delete from blob where rowid in (select blob_rowid from deletable_blob); + end; + + create trigger if not exists after_update_blob + after update of data on blob + begin + update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size'; + delete from blob where rowid in (select blob_rowid from deletable_blob); + end; + + create trigger if not exists after_delete_blob + after delete on blob + begin + update blob_meta set value=value-length(cast(old.data as blob)) where key='size'; + end; + `) + if err != nil { + return err + } + } + return nil +} + +type NewPiecesStorageOpts struct { + NewPoolOpts + ProvOpts func(*ProviderOpts) + storage.ResourcePiecesOpts } // A convenience function that creates a connection pool, resource provider, and a pieces storage // ClientImpl and returns them all with a Close attached. -func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) { - conns, provOpts, err := NewPool(opts) +func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) { + conns, provOpts, err := NewPool(opts.NewPoolOpts) if err != nil { return } + if f := opts.ProvOpts; f != nil { + f(&provOpts) + } prov, err := NewProvider(conns, provOpts) if err != nil { conns.Close() return } - store := storage.NewResourcePiecesOpts(prov, storage.ResourcePiecesOpts{ - LeaveIncompleteChunks: true, - AllowSizedPuts: true, - }) + store := storage.NewResourcePiecesOpts(prov, opts.ResourcePiecesOpts) return struct { storage.ClientImpl io.Closer @@ -160,6 +183,7 @@ type NewPoolOpts struct { // Forces WAL, disables shared caching. ConcurrentBlobReads bool DontInitSchema bool + PageSize int // If non-zero, overrides the existing setting. Capacity int64 } @@ -216,7 +240,10 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { conn := conns.Get(context.TODO()) defer conns.Put(conn) if !opts.DontInitSchema { - err = initSchema(conn) + if opts.PageSize == 0 { + opts.PageSize = 1 << 14 + } + err = initSchema(conn, opts.PageSize, true) if err != nil { return } @@ -343,7 +370,7 @@ func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written i prefix, ) return err - }, false) + }, false, 0) return } @@ -358,12 +385,20 @@ func (me *provider) Close() error { } type writeRequest struct { - query withConn - done chan<- error + query withConn + done chan<- error + labels pprof.LabelSet } var expvars = expvar.NewMap("sqliteStorage") +func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) { + pprof.Do(context.Background(), labels, func(context.Context) { + err = query(conn) + }) + return +} + // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have // stronger typing on the writes channel. func providerWriter(writes <-chan writeRequest, pool ConnPool) { @@ -381,13 +416,13 @@ func providerWriter(writes <-chan writeRequest, pool ConnPool) { var cantFail error func() { defer sqlitex.Save(conn)(&cantFail) - firstErr := first.query(conn) + firstErr := runQueryWithLabels(first.query, first.labels, conn) buf = append(buf, func() { first.done <- firstErr }) for { select { case wr, ok := <-writes: if ok { - err := wr.query(conn) + err := runQueryWithLabels(wr.query, wr.labels, conn) buf = append(buf, func() { wr.done <- err }) continue } @@ -419,12 +454,26 @@ type instance struct { p *provider } -func (p *provider) withConn(with withConn, write bool) error { +func getLabels(skip int) pprof.LabelSet { + return pprof.Labels("f", func() string { + var pcs [8]uintptr + runtime.Callers(skip+3, pcs[:]) + fs := runtime.CallersFrames(pcs[:]) + f, _ := fs.Next() + funcName := f.Func.Name() + funcName = funcName[strings.LastIndexByte(funcName, '.')+1:] + //log.Printf("func name: %q", funcName) + return funcName + }()) +} + +func (p *provider) withConn(with withConn, write bool, skip int) error { if write && p.opts.BatchWrites { done := make(chan error) p.writes <- writeRequest{ - query: with, - done: done, + query: with, + done: done, + labels: getLabels(skip + 1), } return <-done } else { @@ -433,14 +482,14 @@ func (p *provider) withConn(with withConn, write bool) error { return errors.New("couldn't get pool conn") } defer p.pool.Put(conn) - return with(conn) + return runQueryWithLabels(with, getLabels(skip+1), conn) } } type withConn func(conn) error func (i instance) withConn(with withConn, write bool) error { - return i.p.withConn(with, write) + return i.p.withConn(with, write, 1) } func (i instance) getConn() *sqlite.Conn { diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go index 2427cf3a..f732b51a 100644 --- a/storage/sqlite/sqlite-storage_test.go +++ b/storage/sqlite/sqlite-storage_test.go @@ -2,17 +2,15 @@ package sqliteStorage import ( "bytes" + "fmt" "io" "io/ioutil" - "math/rand" "path/filepath" "sync" "testing" _ "github.com/anacrolix/envpprof" - "github.com/anacrolix/missinggo/iter" - "github.com/anacrolix/torrent/metainfo" - "github.com/anacrolix/torrent/storage" + test_storage "github.com/anacrolix/torrent/storage/test" qt "github.com/frankban/quicktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -70,33 +68,28 @@ func TestSimultaneousIncrementalBlob(t *testing.T) { } func BenchmarkMarkComplete(b *testing.B) { - const pieceSize = 8 << 20 + const pieceSize = 2 << 20 + const capacity = 0 c := qt.New(b) - data := make([]byte, pieceSize) - rand.Read(data) - dbPath := filepath.Join(b.TempDir(), "storage.db") - b.Logf("storage db path: %q", dbPath) - ci, err := NewPiecesStorage(NewPoolOpts{ - Path: dbPath, - Capacity: pieceSize, - ConcurrentBlobReads: true, - }) - c.Assert(err, qt.IsNil) - defer ci.Close() - ti, err := ci.OpenTorrent(nil, metainfo.Hash{}) - c.Assert(err, qt.IsNil) - defer ti.Close() - pi := ti.Piece(metainfo.Piece{ - Info: &metainfo.Info{ - Pieces: make([]byte, metainfo.HashSize), - PieceLength: pieceSize, - Length: pieceSize, - }, - }) - // Do it untimed the first time to prime the cache. - storage.BenchmarkPieceMarkComplete(b, pi, data) - b.ResetTimer() - for range iter.N(b.N) { - storage.BenchmarkPieceMarkComplete(b, pi, data) + for _, memory := range []bool{false, true} { + b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) { + dbPath := filepath.Join(b.TempDir(), "storage.db") + //b.Logf("storage db path: %q", dbPath) + ci, err := NewPiecesStorage(NewPiecesStorageOpts{ + NewPoolOpts: NewPoolOpts{ + Path: dbPath, + //Capacity: 4*pieceSize - 1, + ConcurrentBlobReads: false, + PageSize: 1 << 14, + Memory: memory, + }, + ProvOpts: func(opts *ProviderOpts) { + opts.BatchWrites = true + }, + }) + c.Assert(err, qt.IsNil) + defer ci.Close() + test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, 16, capacity) + }) } } diff --git a/storage/test/bench-resource-pieces.go b/storage/test/bench-resource-pieces.go new file mode 100644 index 00000000..84d92218 --- /dev/null +++ b/storage/test/bench-resource-pieces.go @@ -0,0 +1,73 @@ +package test_storage + +import ( + "bytes" + "io" + "io/ioutil" + "math/rand" + "sync" + "testing" + + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/storage" + "github.com/bradfitz/iter" + qt "github.com/frankban/quicktest" +) + +const chunkSize = 1 << 14 + +func BenchmarkPieceMarkComplete(b *testing.B, ci storage.ClientImpl, pieceSize int64, numPieces int, capacity int64) { + c := qt.New(b) + ti, err := ci.OpenTorrent(nil, metainfo.Hash{}) + c.Assert(err, qt.IsNil) + defer ti.Close() + info := &metainfo.Info{ + Pieces: make([]byte, numPieces*metainfo.HashSize), + PieceLength: pieceSize, + Length: pieceSize * int64(numPieces), + } + rand.Read(info.Pieces) + data := make([]byte, pieceSize) + oneIter := func() { + for pieceIndex := range iter.N(numPieces) { + pi := ti.Piece(info.Piece(pieceIndex)) + rand.Read(data) + var wg sync.WaitGroup + for off := int64(0); off < int64(len(data)); off += chunkSize { + wg.Add(1) + go func(off int64) { + defer wg.Done() + n, err := pi.WriteAt(data[off:off+chunkSize], off) + if err != nil { + panic(err) + } + if n != chunkSize { + panic(n) + } + }(off) + } + wg.Wait() + if capacity == 0 { + pi.MarkNotComplete() + } + // This might not apply if users of this benchmark don't cache with the expected capacity. + c.Assert(pi.Completion(), qt.Equals, storage.Completion{Complete: false, Ok: true}) + c.Assert(pi.MarkComplete(), qt.IsNil) + c.Assert(pi.Completion(), qt.Equals, storage.Completion{true, true}) + readData, err := ioutil.ReadAll(io.NewSectionReader(pi, 0, int64(len(data)))) + c.Assert(err, qt.IsNil) + c.Assert(len(readData), qt.Equals, len(data)) + c.Assert(bytes.Equal(readData, data), qt.IsTrue) + } + } + // Fill the cache + if capacity > 0 { + for range iter.N(int((capacity + info.TotalLength() - 1) / info.TotalLength())) { + oneIter() + } + } + b.ResetTimer() + for range iter.N(b.N) { + oneIter() + } +} diff --git a/test/transfer_test.go b/test/transfer_test.go index 60678425..1ae49e15 100644 --- a/test/transfer_test.go +++ b/test/transfer_test.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "os" "path/filepath" "sync" @@ -282,8 +281,8 @@ func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) { func sqliteClientStorageFactory(connOptsMaker func(dataDir string) sqliteStorage.NewPoolOpts) storageFactory { return func(dataDir string) storage.ClientImplCloser { connOpts := connOptsMaker(dataDir) - log.Printf("opening sqlite db: %#v", connOpts) - ret, err := sqliteStorage.NewPiecesStorage(connOpts) + //log.Printf("opening sqlite db: %#v", connOpts) + ret, err := sqliteStorage.NewPiecesStorage(sqliteStorage.NewPiecesStorageOpts{NewPoolOpts: connOpts}) if err != nil { panic(err) } -- 2.48.1