From: Matt Joiner Date: Wed, 5 May 2021 00:02:15 +0000 (+1000) Subject: Rework to use a pool of blobs X-Git-Tag: v1.28.0~18^2~11 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=675a0ab0dc612c41b72b872881040c83216ad559;p=btrtrc.git Rework to use a pool of blobs --- diff --git a/storage/sqlite/new.go b/storage/sqlite/new.go index 2ce6ac44..c33c3d3b 100644 --- a/storage/sqlite/new.go +++ b/storage/sqlite/new.go @@ -31,16 +31,17 @@ func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, er return } return &client{ - prov: prov, - conn: prov.pool.Get(nil), + prov: prov, + conn: prov.pool.Get(nil), + blobs: make(map[string]*sqlite.Blob), }, nil } type client struct { - l sync.Mutex - prov *provider - conn conn - blob *sqlite.Blob + l sync.Mutex + prov *provider + conn conn + blobs map[string]*sqlite.Blob } func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { @@ -48,8 +49,8 @@ func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (stora } func (c *client) Close() error { - if c.blob != nil { - c.blob.Close() + for _, b := range c.blobs { + b.Close() } c.prov.pool.Put(c.conn) return c.prov.Close() @@ -82,7 +83,7 @@ func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl { t.c.l.Lock() defer t.c.l.Unlock() name := p.Hash().HexString() - return piece{t.c.conn, name, &t.c.l, p.Length(), &t.c.blob} + return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()} } func (t torrent) Close() error { @@ -91,29 +92,10 @@ func (t torrent) Close() error { type piece struct { conn conn - name string l *sync.Mutex + name string + blobs map[string]*sqlite.Blob length int64 - blob **sqlite.Blob -} - -func (p2 piece) getBlob() *sqlite.Blob { - rowid, err := rowidForBlob(p2.conn, p2.name, p2.length) - if err != nil { - panic(err) - } - if *p2.blob != nil { - err := (*p2.blob).Close() - if err != nil { - panic(err) - } - *p2.blob = nil - } - *p2.blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true) - if err != nil { - panic(err) - } - return *p2.blob } func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) { @@ -140,10 +122,21 @@ func (p2 piece) MarkComplete() error { if changes != 1 { panic(changes) } + p2.blobWouldExpire() return nil } +func (p2 piece) blobWouldExpire() { + blob, ok := p2.blobs[p2.name] + if !ok { + return + } + blob.Close() + delete(p2.blobs, p2.name) +} + func (p2 piece) MarkNotComplete() error { + p2.blobWouldExpire() return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name) } @@ -160,3 +153,26 @@ func (p2 piece) Completion() (ret storage.Completion) { } return } + +func (p2 piece) closeBlobIfExists() { + if b, ok := p2.blobs[p2.name]; ok { + b.Close() + delete(p2.blobs, p2.name) + } +} + +func (p2 piece) getBlob() *sqlite.Blob { + blob, ok := p2.blobs[p2.name] + if !ok { + rowid, err := rowidForBlob(p2.conn, p2.name, p2.length) + if err != nil { + panic(err) + } + blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true) + if err != nil { + panic(err) + } + p2.blobs[p2.name] = blob + } + return blob +} diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go index ff4511af..d18369ed 100644 --- a/storage/sqlite/sqlite-storage_test.go +++ b/storage/sqlite/sqlite-storage_test.go @@ -10,6 +10,7 @@ import ( "testing" _ "github.com/anacrolix/envpprof" + "github.com/anacrolix/torrent/storage" test_storage "github.com/anacrolix/torrent/storage/test" qt "github.com/frankban/quicktest" "github.com/stretchr/testify/assert" @@ -68,42 +69,51 @@ func TestSimultaneousIncrementalBlob(t *testing.T) { func BenchmarkMarkComplete(b *testing.B) { const pieceSize = test_storage.DefaultPieceSize - const capacity = test_storage.DefaultCapacity + const capacity = test_storage.DefaultNumPieces * pieceSize / 2 c := qt.New(b) - for _, memory := range []bool{false, true} { - b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) { - for _, batchWrites := range []bool{false, true} { - b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) { - dbPath := filepath.Join(b.TempDir(), "storage.db") - //b.Logf("storage db path: %q", dbPath) - newPoolOpts := NewPoolOpts{ - Path: dbPath, - Capacity: 4*pieceSize - 1, - NoConcurrentBlobReads: false, - PageSize: 1 << 14, - Memory: memory, - } - provOpts := func(opts *ProviderOpts) { - opts.BatchWrites = batchWrites - } - b.Run("SqlitePieceStorage", func(b *testing.B) { - ci, err := NewPiecesStorage(NewPiecesStorageOpts{ - NewPoolOpts: newPoolOpts, - ProvOpts: provOpts, + for _, storage := range []struct { + name string + maker func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser + }{ + {"SqliteDirect", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser { + ci, err := NewDirectStorage(NewDirectStorageOpts{ + NewPoolOpts: newPoolOpts, + ProvOpts: provOpts, + }) + c.Assert(err, qt.IsNil) + return ci + }}, + {"SqlitePieceStorage", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser { + ci, err := NewPiecesStorage(NewPiecesStorageOpts{ + NewPoolOpts: newPoolOpts, + ProvOpts: provOpts, + }) + c.Assert(err, qt.IsNil) + return ci + }}, + } { + b.Run(storage.name, func(b *testing.B) { + for _, memory := range []bool{false, true} { + b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) { + for _, batchWrites := range []bool{false, true} { + b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) { + dbPath := filepath.Join(b.TempDir(), "storage.db") + //b.Logf("storage db path: %q", dbPath) + newPoolOpts := NewPoolOpts{ + Path: dbPath, + Capacity: capacity, + NoConcurrentBlobReads: false, + PageSize: 1 << 14, + Memory: memory, + } + provOpts := func(opts *ProviderOpts) { + opts.BatchWrites = batchWrites + } + ci := storage.maker(newPoolOpts, provOpts) + defer ci.Close() + test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity) }) - c.Assert(err, qt.IsNil) - defer ci.Close() - test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity) - }) - b.Run("SqliteDirect", func(b *testing.B) { - ci, err := NewDirectStorage(NewDirectStorageOpts{ - NewPoolOpts: newPoolOpts, - ProvOpts: provOpts, - }) - c.Assert(err, qt.IsNil) - defer ci.Close() - test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity) - }) + } }) } }) diff --git a/storage/test/bench-resource-pieces.go b/storage/test/bench-piece-mark-complete.go similarity index 90% rename from storage/test/bench-resource-pieces.go rename to storage/test/bench-piece-mark-complete.go index 4a79c313..1656b8ac 100644 --- a/storage/test/bench-resource-pieces.go +++ b/storage/test/bench-piece-mark-complete.go @@ -23,7 +23,10 @@ const ( func BenchmarkPieceMarkComplete( b *testing.B, ci storage.ClientImpl, - pieceSize int64, numPieces int, capacity int64, + pieceSize int64, numPieces int, + // This drives any special handling around capacity that may be configured into the storage + // implementation. + capacity int64, ) { const check = true c := qt.New(b) @@ -60,16 +63,18 @@ func BenchmarkPieceMarkComplete( }(off) } wg.Wait() + b.StopTimer() if capacity == 0 { pi.MarkNotComplete() } + b.StartTimer() // This might not apply if users of this benchmark don't cache with the expected capacity. c.Assert(pi.Completion(), qt.Equals, storage.Completion{Complete: false, Ok: true}) c.Assert(pi.MarkComplete(), qt.IsNil) c.Assert(pi.Completion(), qt.Equals, storage.Completion{true, true}) if check { readData, err := ioutil.ReadAll(io.NewSectionReader(pi, 0, int64(len(data)))) - c.Assert(err, qt.IsNil) + c.Check(err, qt.IsNil) c.Assert(len(readData), qt.Equals, len(data)) c.Assert(bytes.Equal(readData, data), qt.IsTrue) }