]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Expose a variety of blob cleanup styles
authorMatt Joiner <anacrolix@gmail.com>
Thu, 6 May 2021 05:17:31 +0000 (15:17 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 6 May 2021 05:17:31 +0000 (15:17 +1000)
storage/sqlite/direct.go
storage/sqlite/sqlite-storage.go
storage/sqlite/sqlite-storage_test.go

index 5f89ec9891535542292933d378866ee5e3f5096d..e97822960bb3249a4a3488da52797a35ab6eb2d8 100644 (file)
@@ -2,7 +2,9 @@ package sqliteStorage
 
 import (
        "errors"
+       "runtime"
        "sync"
+       "time"
 
        "crawshaw.io/sqlite"
        "crawshaw.io/sqlite/sqlitex"
@@ -14,6 +16,9 @@ type NewDirectStorageOpts struct {
        NewConnOpts
        InitDbOpts
        InitConnOpts
+       GcBlobs           bool
+       CacheBlobs        bool
+       BlobFlushInterval time.Duration
 }
 
 // A convenience function that creates a connection pool, resource provider, and a pieces storage
@@ -32,16 +37,41 @@ func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, er
        if err != nil {
                return
        }
-       return &client{
+       cl := &client{
                conn:  conn,
                blobs: make(map[string]*sqlite.Blob),
-       }, nil
+               opts:  opts,
+       }
+       if opts.BlobFlushInterval != 0 {
+               cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
+       }
+       return cl, nil
 }
 
 type client struct {
-       l     sync.Mutex
-       conn  conn
-       blobs map[string]*sqlite.Blob
+       l           sync.Mutex
+       conn        conn
+       blobs       map[string]*sqlite.Blob
+       blobFlusher *time.Timer
+       opts        NewDirectStorageOpts
+       closed      bool
+}
+
+func (c *client) blobFlusherFunc() {
+       c.l.Lock()
+       defer c.l.Unlock()
+       c.flushBlobs()
+       if !c.closed {
+               c.blobFlusher.Reset(c.opts.BlobFlushInterval)
+       }
+}
+
+func (c *client) flushBlobs() {
+       for key, b := range c.blobs {
+               // Need the lock to prevent racing with the GC finalizers.
+               b.Close()
+               delete(c.blobs, key)
+       }
 }
 
 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
@@ -49,8 +79,12 @@ func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (stora
 }
 
 func (c *client) Close() error {
-       for _, b := range c.blobs {
-               b.Close()
+       c.l.Lock()
+       defer c.l.Unlock()
+       c.flushBlobs()
+       c.closed = true
+       if c.opts.BlobFlushInterval != 0 {
+               c.blobFlusher.Stop()
        }
        return c.conn.Close()
 }
@@ -82,7 +116,11 @@ func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
        t.c.l.Lock()
        defer t.c.l.Unlock()
        name := p.Hash().HexString()
-       return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()}
+       return piece{
+               name,
+               p.Length(),
+               t.c,
+       }
 }
 
 func (t torrent) Close() error {
@@ -90,11 +128,9 @@ func (t torrent) Close() error {
 }
 
 type piece struct {
-       conn   conn
-       l      *sync.Mutex
        name   string
-       blobs  map[string]*sqlite.Blob
        length int64
+       *client
 }
 
 func (p2 piece) doAtIoWithBlob(
@@ -104,13 +140,21 @@ func (p2 piece) doAtIoWithBlob(
 ) (n int, err error) {
        p2.l.Lock()
        defer p2.l.Unlock()
-       //defer p2.blobWouldExpire()
+       if !p2.opts.CacheBlobs {
+               defer p2.forgetBlob()
+       }
        n, err = atIo(p2.getBlob())(p, off)
+       if err == nil {
+               return
+       }
        var se sqlite.Error
-       if !errors.As(err, &se) || se.Code != sqlite.SQLITE_ABORT {
+       if !errors.As(err, &se) {
                return
        }
-       p2.blobWouldExpire()
+       if se.Code != sqlite.SQLITE_ABORT && !(p2.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
+               return
+       }
+       p2.forgetBlob()
        return atIo(p2.getBlob())(p, off)
 }
 
@@ -140,7 +184,7 @@ func (p2 piece) MarkComplete() error {
        return nil
 }
 
-func (p2 piece) blobWouldExpire() {
+func (p2 piece) forgetBlob() {
        blob, ok := p2.blobs[p2.name]
        if !ok {
                return
@@ -185,6 +229,14 @@ func (p2 piece) getBlob() *sqlite.Blob {
                if err != nil {
                        panic(err)
                }
+               if p2.opts.GcBlobs {
+                       herp := new(byte)
+                       runtime.SetFinalizer(herp, func(*byte) {
+                               p2.l.Lock()
+                               defer p2.l.Unlock()
+                               blob.Close()
+                       })
+               }
                p2.blobs[p2.name] = blob
        }
        return blob
index 4192a58d1232e8323ed6394fa0aaa4a898f19507..f6854791763f293fe586c8c5cd3f45979bcd3fb5 100644 (file)
@@ -251,7 +251,8 @@ type InitDbOpts struct {
        DontInitSchema bool
        PageSize       int
        // If non-zero, overrides the existing setting.
-       Capacity int64
+       Capacity   int64
+       NoTriggers bool
 }
 
 // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
@@ -300,7 +301,7 @@ func initDatabase(conn conn, opts InitDbOpts) (err error) {
                if opts.PageSize == 0 {
                        opts.PageSize = 1 << 14
                }
-               err = InitSchema(conn, opts.PageSize, true)
+               err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
                if err != nil {
                        return
                }
index 633d4358b4ebefe2b1fbc9fe3b2ecb10395e5314..9e5d8275565b58badf78a23857a4305a06d80ff2 100644 (file)
@@ -9,6 +9,7 @@ import (
        "path/filepath"
        "sync"
        "testing"
+       "time"
 
        _ "github.com/anacrolix/envpprof"
        "github.com/anacrolix/torrent/storage"
@@ -72,7 +73,12 @@ func TestSimultaneousIncrementalBlob(t *testing.T) {
 
 func BenchmarkMarkComplete(b *testing.B) {
        const pieceSize = test_storage.DefaultPieceSize
-       const capacity = test_storage.DefaultNumPieces * pieceSize / 2
+       const noTriggers = false
+       var capacity int64 = test_storage.DefaultNumPieces * pieceSize / 2
+       if noTriggers {
+               // Since we won't push out old pieces, we have to mark them incomplete manually.
+               capacity = 0
+       }
        runBench := func(b *testing.B, ci storage.ClientImpl) {
                test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
        }
@@ -84,6 +90,10 @@ func BenchmarkMarkComplete(b *testing.B) {
                                opts.Memory = memory
                                opts.Path = filepath.Join(b.TempDir(), "storage.db")
                                opts.Capacity = capacity
+                               opts.CacheBlobs = true
+                               //opts.GcBlobs = true
+                               opts.BlobFlushInterval = time.Second
+                               opts.NoTriggers = noTriggers
                                directBench := func(b *testing.B) {
                                        ci, err := NewDirectStorage(opts)
                                        if errors.Is(err, UnexpectedJournalMode) {
@@ -93,10 +103,10 @@ func BenchmarkMarkComplete(b *testing.B) {
                                        defer ci.Close()
                                        runBench(b, ci)
                                }
-                               for _, journalMode := range []string{"", "wal", "off", "delete", "memory"} {
+                               for _, journalMode := range []string{"", "wal", "off", "truncate", "delete", "persist", "memory"} {
                                        opts.SetJournalMode = journalMode
                                        b.Run("JournalMode="+journalMode, func(b *testing.B) {
-                                               for _, mmapSize := range []int64{-1, 0, 1 << 24, 1 << 25, 1 << 26} {
+                                               for _, mmapSize := range []int64{-1, 0, 1 << 23, 1 << 24, 1 << 25} {
                                                        if memory && mmapSize >= 0 {
                                                                continue
                                                        }