]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework to use a pool of blobs
authorMatt Joiner <anacrolix@gmail.com>
Wed, 5 May 2021 00:02:15 +0000 (10:02 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 5 May 2021 00:02:15 +0000 (10:02 +1000)
storage/sqlite/new.go
storage/sqlite/sqlite-storage_test.go
storage/test/bench-piece-mark-complete.go [moved from storage/test/bench-resource-pieces.go with 90% similarity]

index 2ce6ac445960f27d88269f08564db8bf13e0eeff..c33c3d3b31aa01f8da373d004bdae114cb5e0f2a 100644 (file)
@@ -31,16 +31,17 @@ func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, er
                return
        }
        return &client{
-               prov: prov,
-               conn: prov.pool.Get(nil),
+               prov:  prov,
+               conn:  prov.pool.Get(nil),
+               blobs: make(map[string]*sqlite.Blob),
        }, nil
 }
 
 type client struct {
-       l    sync.Mutex
-       prov *provider
-       conn conn
-       blob *sqlite.Blob
+       l     sync.Mutex
+       prov  *provider
+       conn  conn
+       blobs map[string]*sqlite.Blob
 }
 
 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
@@ -48,8 +49,8 @@ func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (stora
 }
 
 func (c *client) Close() error {
-       if c.blob != nil {
-               c.blob.Close()
+       for _, b := range c.blobs {
+               b.Close()
        }
        c.prov.pool.Put(c.conn)
        return c.prov.Close()
@@ -82,7 +83,7 @@ func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
        t.c.l.Lock()
        defer t.c.l.Unlock()
        name := p.Hash().HexString()
-       return piece{t.c.conn, name, &t.c.l, p.Length(), &t.c.blob}
+       return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()}
 }
 
 func (t torrent) Close() error {
@@ -91,29 +92,10 @@ func (t torrent) Close() error {
 
 type piece struct {
        conn   conn
-       name   string
        l      *sync.Mutex
+       name   string
+       blobs  map[string]*sqlite.Blob
        length int64
-       blob   **sqlite.Blob
-}
-
-func (p2 piece) getBlob() *sqlite.Blob {
-       rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
-       if err != nil {
-               panic(err)
-       }
-       if *p2.blob != nil {
-               err := (*p2.blob).Close()
-               if err != nil {
-                       panic(err)
-               }
-               *p2.blob = nil
-       }
-       *p2.blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
-       if err != nil {
-               panic(err)
-       }
-       return *p2.blob
 }
 
 func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
@@ -140,10 +122,21 @@ func (p2 piece) MarkComplete() error {
        if changes != 1 {
                panic(changes)
        }
+       p2.blobWouldExpire()
        return nil
 }
 
+func (p2 piece) blobWouldExpire() {
+       blob, ok := p2.blobs[p2.name]
+       if !ok {
+               return
+       }
+       blob.Close()
+       delete(p2.blobs, p2.name)
+}
+
 func (p2 piece) MarkNotComplete() error {
+       p2.blobWouldExpire()
        return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name)
 }
 
@@ -160,3 +153,26 @@ func (p2 piece) Completion() (ret storage.Completion) {
        }
        return
 }
+
+func (p2 piece) closeBlobIfExists() {
+       if b, ok := p2.blobs[p2.name]; ok {
+               b.Close()
+               delete(p2.blobs, p2.name)
+       }
+}
+
+func (p2 piece) getBlob() *sqlite.Blob {
+       blob, ok := p2.blobs[p2.name]
+       if !ok {
+               rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
+               if err != nil {
+                       panic(err)
+               }
+               blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
+               if err != nil {
+                       panic(err)
+               }
+               p2.blobs[p2.name] = blob
+       }
+       return blob
+}
index ff4511af9be41e6d1a2788652a8ef4b14ec880f5..d18369ed6316cdd6e564cfff511029f2c6f41045 100644 (file)
@@ -10,6 +10,7 @@ import (
        "testing"
 
        _ "github.com/anacrolix/envpprof"
+       "github.com/anacrolix/torrent/storage"
        test_storage "github.com/anacrolix/torrent/storage/test"
        qt "github.com/frankban/quicktest"
        "github.com/stretchr/testify/assert"
@@ -68,42 +69,51 @@ func TestSimultaneousIncrementalBlob(t *testing.T) {
 
 func BenchmarkMarkComplete(b *testing.B) {
        const pieceSize = test_storage.DefaultPieceSize
-       const capacity = test_storage.DefaultCapacity
+       const capacity = test_storage.DefaultNumPieces * pieceSize / 2
        c := qt.New(b)
-       for _, memory := range []bool{false, true} {
-               b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
-                       for _, batchWrites := range []bool{false, true} {
-                               b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
-                                       dbPath := filepath.Join(b.TempDir(), "storage.db")
-                                       //b.Logf("storage db path: %q", dbPath)
-                                       newPoolOpts := NewPoolOpts{
-                                               Path:                  dbPath,
-                                               Capacity:              4*pieceSize - 1,
-                                               NoConcurrentBlobReads: false,
-                                               PageSize:              1 << 14,
-                                               Memory:                memory,
-                                       }
-                                       provOpts := func(opts *ProviderOpts) {
-                                               opts.BatchWrites = batchWrites
-                                       }
-                                       b.Run("SqlitePieceStorage", func(b *testing.B) {
-                                               ci, err := NewPiecesStorage(NewPiecesStorageOpts{
-                                                       NewPoolOpts: newPoolOpts,
-                                                       ProvOpts:    provOpts,
+       for _, storage := range []struct {
+               name  string
+               maker func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser
+       }{
+               {"SqliteDirect", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser {
+                       ci, err := NewDirectStorage(NewDirectStorageOpts{
+                               NewPoolOpts: newPoolOpts,
+                               ProvOpts:    provOpts,
+                       })
+                       c.Assert(err, qt.IsNil)
+                       return ci
+               }},
+               {"SqlitePieceStorage", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser {
+                       ci, err := NewPiecesStorage(NewPiecesStorageOpts{
+                               NewPoolOpts: newPoolOpts,
+                               ProvOpts:    provOpts,
+                       })
+                       c.Assert(err, qt.IsNil)
+                       return ci
+               }},
+       } {
+               b.Run(storage.name, func(b *testing.B) {
+                       for _, memory := range []bool{false, true} {
+                               b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
+                                       for _, batchWrites := range []bool{false, true} {
+                                               b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
+                                                       dbPath := filepath.Join(b.TempDir(), "storage.db")
+                                                       //b.Logf("storage db path: %q", dbPath)
+                                                       newPoolOpts := NewPoolOpts{
+                                                               Path:                  dbPath,
+                                                               Capacity:              capacity,
+                                                               NoConcurrentBlobReads: false,
+                                                               PageSize:              1 << 14,
+                                                               Memory:                memory,
+                                                       }
+                                                       provOpts := func(opts *ProviderOpts) {
+                                                               opts.BatchWrites = batchWrites
+                                                       }
+                                                       ci := storage.maker(newPoolOpts, provOpts)
+                                                       defer ci.Close()
+                                                       test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
                                                })
-                                               c.Assert(err, qt.IsNil)
-                                               defer ci.Close()
-                                               test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
-                                       })
-                                       b.Run("SqliteDirect", func(b *testing.B) {
-                                               ci, err := NewDirectStorage(NewDirectStorageOpts{
-                                                       NewPoolOpts: newPoolOpts,
-                                                       ProvOpts:    provOpts,
-                                               })
-                                               c.Assert(err, qt.IsNil)
-                                               defer ci.Close()
-                                               test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
-                                       })
+                                       }
                                })
                        }
                })
similarity index 90%
rename from storage/test/bench-resource-pieces.go
rename to storage/test/bench-piece-mark-complete.go
index 4a79c313c2acd45674aa5890efa31b437248ba0f..1656b8acbdc22d941c335355aa8416704f62f75d 100644 (file)
@@ -23,7 +23,10 @@ const (
 
 func BenchmarkPieceMarkComplete(
        b *testing.B, ci storage.ClientImpl,
-       pieceSize int64, numPieces int, capacity int64,
+       pieceSize int64, numPieces int,
+       // This drives any special handling around capacity that may be configured into the storage
+       // implementation.
+       capacity int64,
 ) {
        const check = true
        c := qt.New(b)
@@ -60,16 +63,18 @@ func BenchmarkPieceMarkComplete(
                                }(off)
                        }
                        wg.Wait()
+                       b.StopTimer()
                        if capacity == 0 {
                                pi.MarkNotComplete()
                        }
+                       b.StartTimer()
                        // This might not apply if users of this benchmark don't cache with the expected capacity.
                        c.Assert(pi.Completion(), qt.Equals, storage.Completion{Complete: false, Ok: true})
                        c.Assert(pi.MarkComplete(), qt.IsNil)
                        c.Assert(pi.Completion(), qt.Equals, storage.Completion{true, true})
                        if check {
                                readData, err := ioutil.ReadAll(io.NewSectionReader(pi, 0, int64(len(data))))
-                               c.Assert(err, qt.IsNil)
+                               c.Check(err, qt.IsNil)
                                c.Assert(len(readData), qt.Equals, len(data))
                                c.Assert(bytes.Equal(readData, data), qt.IsTrue)
                        }