]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Move storage piece benchmarks to storage/test and add a lot more dials
authorMatt Joiner <anacrolix@gmail.com>
Mon, 18 Jan 2021 22:28:09 +0000 (09:28 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 25 Jan 2021 04:54:37 +0000 (15:54 +1100)
storage/bench-resource-pieces.go [deleted file]
storage/sqlite/sqlite-storage.go
storage/sqlite/sqlite-storage_test.go
storage/test/bench-resource-pieces.go [new file with mode: 0644]
test/transfer_test.go

diff --git a/storage/bench-resource-pieces.go b/storage/bench-resource-pieces.go
deleted file mode 100644 (file)
index 753b914..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-package storage
-
-import (
-       "bytes"
-       "io"
-       "io/ioutil"
-       "sync"
-       "testing"
-
-       qt "github.com/frankban/quicktest"
-)
-
-func BenchmarkPieceMarkComplete(tb testing.TB, pi PieceImpl, data []byte) {
-       c := qt.New(tb)
-       var wg sync.WaitGroup
-       for off := int64(0); off < int64(len(data)); off += chunkSize {
-               wg.Add(1)
-               go func(off int64) {
-                       defer wg.Done()
-                       n, err := pi.WriteAt(data[off:off+chunkSize], off)
-                       if err != nil {
-                               panic(err)
-                       }
-                       if n != chunkSize {
-                               panic(n)
-                       }
-               }(off)
-       }
-       wg.Wait()
-       //pi.MarkNotComplete()
-       // This might not apply if users of this benchmark don't cache with the expected capacity.
-       c.Assert(pi.Completion(), qt.Equals, Completion{Complete: false, Ok: true})
-       c.Assert(pi.MarkComplete(), qt.IsNil)
-       c.Assert(pi.Completion(), qt.Equals, Completion{true, true})
-       readData, err := ioutil.ReadAll(io.NewSectionReader(pi, 0, int64(len(data))))
-       c.Assert(err, qt.IsNil)
-       c.Assert(len(readData), qt.Equals, len(data))
-       c.Assert(bytes.Equal(readData, data), qt.IsTrue)
-}
index 0f8dc13670b8c720ee581d387e98ac3bcf4c71a5..2700204a6266dd3557888fe2da503898ca3451ae 100644 (file)
@@ -11,6 +11,8 @@ import (
        "net/url"
        "os"
        "runtime"
+       "runtime/pprof"
+       "strings"
        "sync"
        "time"
 
@@ -47,103 +49,124 @@ func initConn(conn conn, wal bool) error {
        return nil
 }
 
-func initSchema(conn conn) error {
-       return sqlitex.ExecScript(conn, `
--- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
--- can trim the database file size with partial vacuums without having to do a full vacuum, which 
--- locks everything.
-pragma page_size=16384;
-pragma auto_vacuum=incremental;
-
-create table if not exists blob (
-       name text,
-       last_used timestamp default (datetime('now')),
-       data blob,
-       primary key (name)
-);
-
-create table if not exists blob_meta (
-       key text primary key,
-       value
-);
-
--- While sqlite *seems* to be faster to get sum(length(data)) instead of 
--- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a 
--- cold-cache. With this we can be assured that it doesn't.
-insert or ignore into blob_meta values ('size', 0);
-
-create table if not exists setting (
-       name primary key on conflict replace,
-       value
-);
-
-create view if not exists deletable_blob as
-with recursive excess (
-       usage_with,
-       last_used,
-       blob_rowid,
-       data_length
-) as (
-       select * 
-       from (
-               select 
-                       (select value from blob_meta where key='size') as usage_with,
+func initSchema(conn conn, pageSize int, triggers bool) error {
+       if pageSize != 0 {
+               err := sqlitex.ExecScript(conn, fmt.Sprintf("pragma page_size=%d", pageSize))
+               if err != nil {
+                       return err
+               }
+       }
+       err := sqlitex.ExecScript(conn, `
+               -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
+               -- can trim the database file size with partial vacuums without having to do a full vacuum, which 
+               -- locks everything.
+               pragma auto_vacuum=incremental;
+               
+               create table if not exists blob (
+                       name text,
+                       last_used timestamp default (datetime('now')),
+                       data blob,
+                       primary key (name)
+               );
+               
+               create table if not exists blob_meta (
+                       key text primary key,
+                       value
+               );
+               
+               -- While sqlite *seems* to be faster to get sum(length(data)) instead of 
+               -- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a 
+               -- cold-cache. With this we can be assured that it doesn't.
+               insert or ignore into blob_meta values ('size', 0);
+               
+               create table if not exists setting (
+                       name primary key on conflict replace,
+                       value
+               );
+       
+               create view if not exists deletable_blob as
+               with recursive excess (
+                       usage_with,
                        last_used,
-                       rowid,
-                       length(cast(data as blob))
-               from blob order by last_used, rowid limit 1
-       )
-       where usage_with > (select value from setting where name='capacity')
-       union all
-       select 
-               usage_with-data_length as new_usage_with,
-               blob.last_used,
-               blob.rowid,
-               length(cast(data as blob))
-       from excess join blob
-       on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
-       where new_usage_with > (select value from setting where name='capacity')
-)
-select * from excess;
-
-create trigger if not exists after_insert_blob
-after insert on blob
-begin
-       update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
-       delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
-
-create trigger if not exists after_update_blob
-after update of data on blob
-begin
-       update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
-       delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
-
-create trigger if not exists after_delete_blob
-after delete on blob
-begin
-       update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
-end;
-`)
+                       blob_rowid,
+                       data_length
+               ) as (
+                       select * 
+                       from (
+                               select 
+                                       (select value from blob_meta where key='size') as usage_with,
+                                       last_used,
+                                       rowid,
+                                       length(cast(data as blob))
+                               from blob order by last_used, rowid limit 1
+                       )
+                       where usage_with > (select value from setting where name='capacity')
+                       union all
+                       select 
+                               usage_with-data_length as new_usage_with,
+                               blob.last_used,
+                               blob.rowid,
+                               length(cast(data as blob))
+                       from excess join blob
+                       on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
+                       where new_usage_with > (select value from setting where name='capacity')
+               )
+               select * from excess;
+       `)
+       if err != nil {
+               return err
+       }
+       if triggers {
+               err := sqlitex.ExecScript(conn, `
+                       create trigger if not exists after_insert_blob
+                       after insert on blob
+                       begin
+                               update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
+                               delete from blob where rowid in (select blob_rowid from deletable_blob);
+                       end;
+                       
+                       create trigger if not exists after_update_blob
+                       after update of data on blob
+                       begin
+                               update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
+                               delete from blob where rowid in (select blob_rowid from deletable_blob);
+                       end;
+                       
+                       create trigger if not exists after_delete_blob
+                       after delete on blob
+                       begin
+                               update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
+                       end;
+               `)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+type NewPiecesStorageOpts struct {
+       NewPoolOpts
+       ProvOpts func(*ProviderOpts)
+       storage.ResourcePiecesOpts
 }
 
 // A convenience function that creates a connection pool, resource provider, and a pieces storage
 // ClientImpl and returns them all with a Close attached.
-func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) {
-       conns, provOpts, err := NewPool(opts)
+func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
+       conns, provOpts, err := NewPool(opts.NewPoolOpts)
        if err != nil {
                return
        }
+       if f := opts.ProvOpts; f != nil {
+               f(&provOpts)
+       }
        prov, err := NewProvider(conns, provOpts)
        if err != nil {
                conns.Close()
                return
        }
-       store := storage.NewResourcePiecesOpts(prov, storage.ResourcePiecesOpts{
-               LeaveIncompleteChunks: true,
-               AllowSizedPuts:        true,
-       })
+       store := storage.NewResourcePiecesOpts(prov, opts.ResourcePiecesOpts)
        return struct {
                storage.ClientImpl
                io.Closer
@@ -160,6 +183,7 @@ type NewPoolOpts struct {
        // Forces WAL, disables shared caching.
        ConcurrentBlobReads bool
        DontInitSchema      bool
+       PageSize            int
        // If non-zero, overrides the existing setting.
        Capacity int64
 }
@@ -216,7 +240,10 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
        conn := conns.Get(context.TODO())
        defer conns.Put(conn)
        if !opts.DontInitSchema {
-               err = initSchema(conn)
+               if opts.PageSize == 0 {
+                       opts.PageSize = 1 << 14
+               }
+               err = initSchema(conn, opts.PageSize, true)
                if err != nil {
                        return
                }
@@ -343,7 +370,7 @@ func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written i
                        prefix,
                )
                return err
-       }, false)
+       }, false, 0)
        return
 }
 
@@ -358,12 +385,20 @@ func (me *provider) Close() error {
 }
 
 type writeRequest struct {
-       query withConn
-       done  chan<- error
+       query  withConn
+       done   chan<- error
+       labels pprof.LabelSet
 }
 
 var expvars = expvar.NewMap("sqliteStorage")
 
+func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
+       pprof.Do(context.Background(), labels, func(context.Context) {
+               err = query(conn)
+       })
+       return
+}
+
 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
 // stronger typing on the writes channel.
 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
@@ -381,13 +416,13 @@ func providerWriter(writes <-chan writeRequest, pool ConnPool) {
                var cantFail error
                func() {
                        defer sqlitex.Save(conn)(&cantFail)
-                       firstErr := first.query(conn)
+                       firstErr := runQueryWithLabels(first.query, first.labels, conn)
                        buf = append(buf, func() { first.done <- firstErr })
                        for {
                                select {
                                case wr, ok := <-writes:
                                        if ok {
-                                               err := wr.query(conn)
+                                               err := runQueryWithLabels(wr.query, wr.labels, conn)
                                                buf = append(buf, func() { wr.done <- err })
                                                continue
                                        }
@@ -419,12 +454,26 @@ type instance struct {
        p        *provider
 }
 
-func (p *provider) withConn(with withConn, write bool) error {
+func getLabels(skip int) pprof.LabelSet {
+       return pprof.Labels("f", func() string {
+               var pcs [8]uintptr
+               runtime.Callers(skip+3, pcs[:])
+               fs := runtime.CallersFrames(pcs[:])
+               f, _ := fs.Next()
+               funcName := f.Func.Name()
+               funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
+               //log.Printf("func name: %q", funcName)
+               return funcName
+       }())
+}
+
+func (p *provider) withConn(with withConn, write bool, skip int) error {
        if write && p.opts.BatchWrites {
                done := make(chan error)
                p.writes <- writeRequest{
-                       query: with,
-                       done:  done,
+                       query:  with,
+                       done:   done,
+                       labels: getLabels(skip + 1),
                }
                return <-done
        } else {
@@ -433,14 +482,14 @@ func (p *provider) withConn(with withConn, write bool) error {
                        return errors.New("couldn't get pool conn")
                }
                defer p.pool.Put(conn)
-               return with(conn)
+               return runQueryWithLabels(with, getLabels(skip+1), conn)
        }
 }
 
 type withConn func(conn) error
 
 func (i instance) withConn(with withConn, write bool) error {
-       return i.p.withConn(with, write)
+       return i.p.withConn(with, write, 1)
 }
 
 func (i instance) getConn() *sqlite.Conn {
index 2427cf3a3f6e9afa583dc76f448eba9a231e5cb8..f732b51aea62e46feb936249cd1153b9731d7699 100644 (file)
@@ -2,17 +2,15 @@ package sqliteStorage
 
 import (
        "bytes"
+       "fmt"
        "io"
        "io/ioutil"
-       "math/rand"
        "path/filepath"
        "sync"
        "testing"
 
        _ "github.com/anacrolix/envpprof"
-       "github.com/anacrolix/missinggo/iter"
-       "github.com/anacrolix/torrent/metainfo"
-       "github.com/anacrolix/torrent/storage"
+       test_storage "github.com/anacrolix/torrent/storage/test"
        qt "github.com/frankban/quicktest"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
@@ -70,33 +68,28 @@ func TestSimultaneousIncrementalBlob(t *testing.T) {
 }
 
 func BenchmarkMarkComplete(b *testing.B) {
-       const pieceSize = 8 << 20
+       const pieceSize = 2 << 20
+       const capacity = 0
        c := qt.New(b)
-       data := make([]byte, pieceSize)
-       rand.Read(data)
-       dbPath := filepath.Join(b.TempDir(), "storage.db")
-       b.Logf("storage db path: %q", dbPath)
-       ci, err := NewPiecesStorage(NewPoolOpts{
-               Path:                dbPath,
-               Capacity:            pieceSize,
-               ConcurrentBlobReads: true,
-       })
-       c.Assert(err, qt.IsNil)
-       defer ci.Close()
-       ti, err := ci.OpenTorrent(nil, metainfo.Hash{})
-       c.Assert(err, qt.IsNil)
-       defer ti.Close()
-       pi := ti.Piece(metainfo.Piece{
-               Info: &metainfo.Info{
-                       Pieces:      make([]byte, metainfo.HashSize),
-                       PieceLength: pieceSize,
-                       Length:      pieceSize,
-               },
-       })
-       // Do it untimed the first time to prime the cache.
-       storage.BenchmarkPieceMarkComplete(b, pi, data)
-       b.ResetTimer()
-       for range iter.N(b.N) {
-               storage.BenchmarkPieceMarkComplete(b, pi, data)
+       for _, memory := range []bool{false, true} {
+               b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
+                       dbPath := filepath.Join(b.TempDir(), "storage.db")
+                       //b.Logf("storage db path: %q", dbPath)
+                       ci, err := NewPiecesStorage(NewPiecesStorageOpts{
+                               NewPoolOpts: NewPoolOpts{
+                                       Path: dbPath,
+                                       //Capacity:            4*pieceSize - 1,
+                                       ConcurrentBlobReads: false,
+                                       PageSize:            1 << 14,
+                                       Memory:              memory,
+                               },
+                               ProvOpts: func(opts *ProviderOpts) {
+                                       opts.BatchWrites = true
+                               },
+                       })
+                       c.Assert(err, qt.IsNil)
+                       defer ci.Close()
+                       test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, 16, capacity)
+               })
        }
 }
diff --git a/storage/test/bench-resource-pieces.go b/storage/test/bench-resource-pieces.go
new file mode 100644 (file)
index 0000000..84d9221
--- /dev/null
@@ -0,0 +1,73 @@
+package test_storage
+
+import (
+       "bytes"
+       "io"
+       "io/ioutil"
+       "math/rand"
+       "sync"
+       "testing"
+
+       "github.com/anacrolix/torrent/metainfo"
+       "github.com/anacrolix/torrent/storage"
+       "github.com/bradfitz/iter"
+       qt "github.com/frankban/quicktest"
+)
+
+const chunkSize = 1 << 14
+
+func BenchmarkPieceMarkComplete(b *testing.B, ci storage.ClientImpl, pieceSize int64, numPieces int, capacity int64) {
+       c := qt.New(b)
+       ti, err := ci.OpenTorrent(nil, metainfo.Hash{})
+       c.Assert(err, qt.IsNil)
+       defer ti.Close()
+       info := &metainfo.Info{
+               Pieces:      make([]byte, numPieces*metainfo.HashSize),
+               PieceLength: pieceSize,
+               Length:      pieceSize * int64(numPieces),
+       }
+       rand.Read(info.Pieces)
+       data := make([]byte, pieceSize)
+       oneIter := func() {
+               for pieceIndex := range iter.N(numPieces) {
+                       pi := ti.Piece(info.Piece(pieceIndex))
+                       rand.Read(data)
+                       var wg sync.WaitGroup
+                       for off := int64(0); off < int64(len(data)); off += chunkSize {
+                               wg.Add(1)
+                               go func(off int64) {
+                                       defer wg.Done()
+                                       n, err := pi.WriteAt(data[off:off+chunkSize], off)
+                                       if err != nil {
+                                               panic(err)
+                                       }
+                                       if n != chunkSize {
+                                               panic(n)
+                                       }
+                               }(off)
+                       }
+                       wg.Wait()
+                       if capacity == 0 {
+                               pi.MarkNotComplete()
+                       }
+                       // This might not apply if users of this benchmark don't cache with the expected capacity.
+                       c.Assert(pi.Completion(), qt.Equals, storage.Completion{Complete: false, Ok: true})
+                       c.Assert(pi.MarkComplete(), qt.IsNil)
+                       c.Assert(pi.Completion(), qt.Equals, storage.Completion{true, true})
+                       readData, err := ioutil.ReadAll(io.NewSectionReader(pi, 0, int64(len(data))))
+                       c.Assert(err, qt.IsNil)
+                       c.Assert(len(readData), qt.Equals, len(data))
+                       c.Assert(bytes.Equal(readData, data), qt.IsTrue)
+               }
+       }
+       // Fill the cache
+       if capacity > 0 {
+               for range iter.N(int((capacity + info.TotalLength() - 1) / info.TotalLength())) {
+                       oneIter()
+               }
+       }
+       b.ResetTimer()
+       for range iter.N(b.N) {
+               oneIter()
+       }
+}
index 606784255fbabdcf423f02207966482c34974571..1ae49e15c63be8e0e2bfb8b520567725e7b3ec12 100644 (file)
@@ -4,7 +4,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "os"
        "path/filepath"
        "sync"
@@ -282,8 +281,8 @@ func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
 func sqliteClientStorageFactory(connOptsMaker func(dataDir string) sqliteStorage.NewPoolOpts) storageFactory {
        return func(dataDir string) storage.ClientImplCloser {
                connOpts := connOptsMaker(dataDir)
-               log.Printf("opening sqlite db: %#v", connOpts)
-               ret, err := sqliteStorage.NewPiecesStorage(connOpts)
+               //log.Printf("opening sqlite db: %#v", connOpts)
+               ret, err := sqliteStorage.NewPiecesStorage(sqliteStorage.NewPiecesStorageOpts{NewPoolOpts: connOpts})
                if err != nil {
                        panic(err)
                }