"net/url"
"os"
"runtime"
+ "runtime/pprof"
+ "strings"
"sync"
"time"
return nil
}
-func initSchema(conn conn) error {
- return sqlitex.ExecScript(conn, `
--- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
--- can trim the database file size with partial vacuums without having to do a full vacuum, which
--- locks everything.
-pragma page_size=16384;
-pragma auto_vacuum=incremental;
-
-create table if not exists blob (
- name text,
- last_used timestamp default (datetime('now')),
- data blob,
- primary key (name)
-);
-
-create table if not exists blob_meta (
- key text primary key,
- value
-);
-
--- While sqlite *seems* to be faster to get sum(length(data)) instead of
--- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a
--- cold-cache. With this we can be assured that it doesn't.
-insert or ignore into blob_meta values ('size', 0);
-
-create table if not exists setting (
- name primary key on conflict replace,
- value
-);
-
-create view if not exists deletable_blob as
-with recursive excess (
- usage_with,
- last_used,
- blob_rowid,
- data_length
-) as (
- select *
- from (
- select
- (select value from blob_meta where key='size') as usage_with,
+func initSchema(conn conn, pageSize int, triggers bool) error {
+ if pageSize != 0 {
+ err := sqlitex.ExecScript(conn, fmt.Sprintf("pragma page_size=%d", pageSize))
+ if err != nil {
+ return err
+ }
+ }
+ err := sqlitex.ExecScript(conn, `
+ -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
+ -- can trim the database file size with partial vacuums without having to do a full vacuum, which
+ -- locks everything.
+ pragma auto_vacuum=incremental;
+
+ create table if not exists blob (
+ name text,
+ last_used timestamp default (datetime('now')),
+ data blob,
+ primary key (name)
+ );
+
+ create table if not exists blob_meta (
+ key text primary key,
+ value
+ );
+
+ -- While sqlite *seems* to be faster to get sum(length(data)) instead of
+ -- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a
+ -- cold-cache. With this we can be assured that it doesn't.
+ insert or ignore into blob_meta values ('size', 0);
+
+ create table if not exists setting (
+ name primary key on conflict replace,
+ value
+ );
+
+ create view if not exists deletable_blob as
+ with recursive excess (
+ usage_with,
last_used,
- rowid,
- length(cast(data as blob))
- from blob order by last_used, rowid limit 1
- )
- where usage_with > (select value from setting where name='capacity')
- union all
- select
- usage_with-data_length as new_usage_with,
- blob.last_used,
- blob.rowid,
- length(cast(data as blob))
- from excess join blob
- on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
- where new_usage_with > (select value from setting where name='capacity')
-)
-select * from excess;
-
-create trigger if not exists after_insert_blob
-after insert on blob
-begin
- update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
- delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
-
-create trigger if not exists after_update_blob
-after update of data on blob
-begin
- update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
- delete from blob where rowid in (select blob_rowid from deletable_blob);
-end;
-
-create trigger if not exists after_delete_blob
-after delete on blob
-begin
- update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
-end;
-`)
+ blob_rowid,
+ data_length
+ ) as (
+ select *
+ from (
+ select
+ (select value from blob_meta where key='size') as usage_with,
+ last_used,
+ rowid,
+ length(cast(data as blob))
+ from blob order by last_used, rowid limit 1
+ )
+ where usage_with > (select value from setting where name='capacity')
+ union all
+ select
+ usage_with-data_length as new_usage_with,
+ blob.last_used,
+ blob.rowid,
+ length(cast(data as blob))
+ from excess join blob
+ on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
+ where new_usage_with > (select value from setting where name='capacity')
+ )
+ select * from excess;
+ `)
+ if err != nil {
+ return err
+ }
+ if triggers {
+ err := sqlitex.ExecScript(conn, `
+ create trigger if not exists after_insert_blob
+ after insert on blob
+ begin
+ update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
+ delete from blob where rowid in (select blob_rowid from deletable_blob);
+ end;
+
+ create trigger if not exists after_update_blob
+ after update of data on blob
+ begin
+ update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
+ delete from blob where rowid in (select blob_rowid from deletable_blob);
+ end;
+
+ create trigger if not exists after_delete_blob
+ after delete on blob
+ begin
+ update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
+ end;
+ `)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+type NewPiecesStorageOpts struct {
+ NewPoolOpts
+ ProvOpts func(*ProviderOpts)
+ storage.ResourcePiecesOpts
}
// A convenience function that creates a connection pool, resource provider, and a pieces storage
// ClientImpl and returns them all with a Close attached.
-func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) {
- conns, provOpts, err := NewPool(opts)
+func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
+ conns, provOpts, err := NewPool(opts.NewPoolOpts)
if err != nil {
return
}
+ if f := opts.ProvOpts; f != nil {
+ f(&provOpts)
+ }
prov, err := NewProvider(conns, provOpts)
if err != nil {
conns.Close()
return
}
- store := storage.NewResourcePiecesOpts(prov, storage.ResourcePiecesOpts{
- LeaveIncompleteChunks: true,
- AllowSizedPuts: true,
- })
+ store := storage.NewResourcePiecesOpts(prov, opts.ResourcePiecesOpts)
return struct {
storage.ClientImpl
io.Closer
// Forces WAL, disables shared caching.
ConcurrentBlobReads bool
DontInitSchema bool
+ PageSize int
// If non-zero, overrides the existing setting.
Capacity int64
}
conn := conns.Get(context.TODO())
defer conns.Put(conn)
if !opts.DontInitSchema {
- err = initSchema(conn)
+ if opts.PageSize == 0 {
+ opts.PageSize = 1 << 14
+ }
+ err = initSchema(conn, opts.PageSize, true)
if err != nil {
return
}
prefix,
)
return err
- }, false)
+ }, false, 0)
return
}
}
type writeRequest struct {
- query withConn
- done chan<- error
+ query withConn
+ done chan<- error
+ labels pprof.LabelSet
}
var expvars = expvar.NewMap("sqliteStorage")
+func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
+ pprof.Do(context.Background(), labels, func(context.Context) {
+ err = query(conn)
+ })
+ return
+}
+
// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
// stronger typing on the writes channel.
func providerWriter(writes <-chan writeRequest, pool ConnPool) {
var cantFail error
func() {
defer sqlitex.Save(conn)(&cantFail)
- firstErr := first.query(conn)
+ firstErr := runQueryWithLabels(first.query, first.labels, conn)
buf = append(buf, func() { first.done <- firstErr })
for {
select {
case wr, ok := <-writes:
if ok {
- err := wr.query(conn)
+ err := runQueryWithLabels(wr.query, wr.labels, conn)
buf = append(buf, func() { wr.done <- err })
continue
}
p *provider
}
-func (p *provider) withConn(with withConn, write bool) error {
+func getLabels(skip int) pprof.LabelSet {
+ return pprof.Labels("f", func() string {
+ var pcs [8]uintptr
+ runtime.Callers(skip+3, pcs[:])
+ fs := runtime.CallersFrames(pcs[:])
+ f, _ := fs.Next()
+ funcName := f.Func.Name()
+ funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
+ //log.Printf("func name: %q", funcName)
+ return funcName
+ }())
+}
+
+func (p *provider) withConn(with withConn, write bool, skip int) error {
if write && p.opts.BatchWrites {
done := make(chan error)
p.writes <- writeRequest{
- query: with,
- done: done,
+ query: with,
+ done: done,
+ labels: getLabels(skip + 1),
}
return <-done
} else {
return errors.New("couldn't get pool conn")
}
defer p.pool.Put(conn)
- return with(conn)
+ return runQueryWithLabels(with, getLabels(skip+1), conn)
}
}
type withConn func(conn) error
func (i instance) withConn(with withConn, write bool) error {
- return i.p.withConn(with, write)
+ return i.p.withConn(with, write, 1)
}
func (i instance) getConn() *sqlite.Conn {
import (
"bytes"
+ "fmt"
"io"
"io/ioutil"
- "math/rand"
"path/filepath"
"sync"
"testing"
_ "github.com/anacrolix/envpprof"
- "github.com/anacrolix/missinggo/iter"
- "github.com/anacrolix/torrent/metainfo"
- "github.com/anacrolix/torrent/storage"
+ test_storage "github.com/anacrolix/torrent/storage/test"
qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
}
func BenchmarkMarkComplete(b *testing.B) {
- const pieceSize = 8 << 20
+ const pieceSize = 2 << 20
+ const capacity = 0
c := qt.New(b)
- data := make([]byte, pieceSize)
- rand.Read(data)
- dbPath := filepath.Join(b.TempDir(), "storage.db")
- b.Logf("storage db path: %q", dbPath)
- ci, err := NewPiecesStorage(NewPoolOpts{
- Path: dbPath,
- Capacity: pieceSize,
- ConcurrentBlobReads: true,
- })
- c.Assert(err, qt.IsNil)
- defer ci.Close()
- ti, err := ci.OpenTorrent(nil, metainfo.Hash{})
- c.Assert(err, qt.IsNil)
- defer ti.Close()
- pi := ti.Piece(metainfo.Piece{
- Info: &metainfo.Info{
- Pieces: make([]byte, metainfo.HashSize),
- PieceLength: pieceSize,
- Length: pieceSize,
- },
- })
- // Do it untimed the first time to prime the cache.
- storage.BenchmarkPieceMarkComplete(b, pi, data)
- b.ResetTimer()
- for range iter.N(b.N) {
- storage.BenchmarkPieceMarkComplete(b, pi, data)
+ for _, memory := range []bool{false, true} {
+ b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
+ dbPath := filepath.Join(b.TempDir(), "storage.db")
+ //b.Logf("storage db path: %q", dbPath)
+ ci, err := NewPiecesStorage(NewPiecesStorageOpts{
+ NewPoolOpts: NewPoolOpts{
+ Path: dbPath,
+ //Capacity: 4*pieceSize - 1,
+ ConcurrentBlobReads: false,
+ PageSize: 1 << 14,
+ Memory: memory,
+ },
+ ProvOpts: func(opts *ProviderOpts) {
+ opts.BatchWrites = true
+ },
+ })
+ c.Assert(err, qt.IsNil)
+ defer ci.Close()
+ test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, 16, capacity)
+ })
}
}
--- /dev/null
+package test_storage
+
+import (
+ "bytes"
+ "io"
+ "io/ioutil"
+ "math/rand"
+ "sync"
+ "testing"
+
+ "github.com/anacrolix/torrent/metainfo"
+ "github.com/anacrolix/torrent/storage"
+ "github.com/bradfitz/iter"
+ qt "github.com/frankban/quicktest"
+)
+
+const chunkSize = 1 << 14
+
+func BenchmarkPieceMarkComplete(b *testing.B, ci storage.ClientImpl, pieceSize int64, numPieces int, capacity int64) {
+ c := qt.New(b)
+ ti, err := ci.OpenTorrent(nil, metainfo.Hash{})
+ c.Assert(err, qt.IsNil)
+ defer ti.Close()
+ info := &metainfo.Info{
+ Pieces: make([]byte, numPieces*metainfo.HashSize),
+ PieceLength: pieceSize,
+ Length: pieceSize * int64(numPieces),
+ }
+ rand.Read(info.Pieces)
+ data := make([]byte, pieceSize)
+ oneIter := func() {
+ for pieceIndex := range iter.N(numPieces) {
+ pi := ti.Piece(info.Piece(pieceIndex))
+ rand.Read(data)
+ var wg sync.WaitGroup
+ for off := int64(0); off < int64(len(data)); off += chunkSize {
+ wg.Add(1)
+ go func(off int64) {
+ defer wg.Done()
+ n, err := pi.WriteAt(data[off:off+chunkSize], off)
+ if err != nil {
+ panic(err)
+ }
+ if n != chunkSize {
+ panic(n)
+ }
+ }(off)
+ }
+ wg.Wait()
+ if capacity == 0 {
+ pi.MarkNotComplete()
+ }
+ // This might not apply if users of this benchmark don't cache with the expected capacity.
+ c.Assert(pi.Completion(), qt.Equals, storage.Completion{Complete: false, Ok: true})
+ c.Assert(pi.MarkComplete(), qt.IsNil)
+ c.Assert(pi.Completion(), qt.Equals, storage.Completion{true, true})
+ readData, err := ioutil.ReadAll(io.NewSectionReader(pi, 0, int64(len(data))))
+ c.Assert(err, qt.IsNil)
+ c.Assert(len(readData), qt.Equals, len(data))
+ c.Assert(bytes.Equal(readData, data), qt.IsTrue)
+ }
+ }
+ // Fill the cache
+ if capacity > 0 {
+ for range iter.N(int((capacity + info.TotalLength() - 1) / info.TotalLength())) {
+ oneIter()
+ }
+ }
+ b.ResetTimer()
+ for range iter.N(b.N) {
+ oneIter()
+ }
+}