From 96574468c54acdc07dd53f9fe860944727dc80b1 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 6 May 2021 15:17:31 +1000 Subject: [PATCH] Expose a variety of blob cleanup styles --- storage/sqlite/direct.go | 82 ++++++++++++++++++++++----- storage/sqlite/sqlite-storage.go | 5 +- storage/sqlite/sqlite-storage_test.go | 16 +++++- 3 files changed, 83 insertions(+), 20 deletions(-) diff --git a/storage/sqlite/direct.go b/storage/sqlite/direct.go index 5f89ec98..e9782296 100644 --- a/storage/sqlite/direct.go +++ b/storage/sqlite/direct.go @@ -2,7 +2,9 @@ package sqliteStorage import ( "errors" + "runtime" "sync" + "time" "crawshaw.io/sqlite" "crawshaw.io/sqlite/sqlitex" @@ -14,6 +16,9 @@ type NewDirectStorageOpts struct { NewConnOpts InitDbOpts InitConnOpts + GcBlobs bool + CacheBlobs bool + BlobFlushInterval time.Duration } // A convenience function that creates a connection pool, resource provider, and a pieces storage @@ -32,16 +37,41 @@ func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, er if err != nil { return } - return &client{ + cl := &client{ conn: conn, blobs: make(map[string]*sqlite.Blob), - }, nil + opts: opts, + } + if opts.BlobFlushInterval != 0 { + cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc) + } + return cl, nil } type client struct { - l sync.Mutex - conn conn - blobs map[string]*sqlite.Blob + l sync.Mutex + conn conn + blobs map[string]*sqlite.Blob + blobFlusher *time.Timer + opts NewDirectStorageOpts + closed bool +} + +func (c *client) blobFlusherFunc() { + c.l.Lock() + defer c.l.Unlock() + c.flushBlobs() + if !c.closed { + c.blobFlusher.Reset(c.opts.BlobFlushInterval) + } +} + +func (c *client) flushBlobs() { + for key, b := range c.blobs { + // Need the lock to prevent racing with the GC finalizers. + b.Close() + delete(c.blobs, key) + } } func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { @@ -49,8 +79,12 @@ func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (stora } func (c *client) Close() error { - for _, b := range c.blobs { - b.Close() + c.l.Lock() + defer c.l.Unlock() + c.flushBlobs() + c.closed = true + if c.opts.BlobFlushInterval != 0 { + c.blobFlusher.Stop() } return c.conn.Close() } @@ -82,7 +116,11 @@ func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl { t.c.l.Lock() defer t.c.l.Unlock() name := p.Hash().HexString() - return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()} + return piece{ + name, + p.Length(), + t.c, + } } func (t torrent) Close() error { @@ -90,11 +128,9 @@ func (t torrent) Close() error { } type piece struct { - conn conn - l *sync.Mutex name string - blobs map[string]*sqlite.Blob length int64 + *client } func (p2 piece) doAtIoWithBlob( @@ -104,13 +140,21 @@ func (p2 piece) doAtIoWithBlob( ) (n int, err error) { p2.l.Lock() defer p2.l.Unlock() - //defer p2.blobWouldExpire() + if !p2.opts.CacheBlobs { + defer p2.forgetBlob() + } n, err = atIo(p2.getBlob())(p, off) + if err == nil { + return + } var se sqlite.Error - if !errors.As(err, &se) || se.Code != sqlite.SQLITE_ABORT { + if !errors.As(err, &se) { return } - p2.blobWouldExpire() + if se.Code != sqlite.SQLITE_ABORT && !(p2.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") { + return + } + p2.forgetBlob() return atIo(p2.getBlob())(p, off) } @@ -140,7 +184,7 @@ func (p2 piece) MarkComplete() error { return nil } -func (p2 piece) blobWouldExpire() { +func (p2 piece) forgetBlob() { blob, ok := p2.blobs[p2.name] if !ok { return @@ -185,6 +229,14 @@ func (p2 piece) getBlob() *sqlite.Blob { if err != nil { panic(err) } + if p2.opts.GcBlobs { + herp := new(byte) + runtime.SetFinalizer(herp, func(*byte) { + p2.l.Lock() + defer p2.l.Unlock() + blob.Close() + }) + } p2.blobs[p2.name] = blob } return blob diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index 4192a58d..f6854791 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -251,7 +251,8 @@ type InitDbOpts struct { DontInitSchema bool PageSize int // If non-zero, overrides the existing setting. - Capacity int64 + Capacity int64 + NoTriggers bool } // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now, @@ -300,7 +301,7 @@ func initDatabase(conn conn, opts InitDbOpts) (err error) { if opts.PageSize == 0 { opts.PageSize = 1 << 14 } - err = InitSchema(conn, opts.PageSize, true) + err = InitSchema(conn, opts.PageSize, !opts.NoTriggers) if err != nil { return } diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go index 633d4358..9e5d8275 100644 --- a/storage/sqlite/sqlite-storage_test.go +++ b/storage/sqlite/sqlite-storage_test.go @@ -9,6 +9,7 @@ import ( "path/filepath" "sync" "testing" + "time" _ "github.com/anacrolix/envpprof" "github.com/anacrolix/torrent/storage" @@ -72,7 +73,12 @@ func TestSimultaneousIncrementalBlob(t *testing.T) { func BenchmarkMarkComplete(b *testing.B) { const pieceSize = test_storage.DefaultPieceSize - const capacity = test_storage.DefaultNumPieces * pieceSize / 2 + const noTriggers = false + var capacity int64 = test_storage.DefaultNumPieces * pieceSize / 2 + if noTriggers { + // Since we won't push out old pieces, we have to mark them incomplete manually. + capacity = 0 + } runBench := func(b *testing.B, ci storage.ClientImpl) { test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity) } @@ -84,6 +90,10 @@ func BenchmarkMarkComplete(b *testing.B) { opts.Memory = memory opts.Path = filepath.Join(b.TempDir(), "storage.db") opts.Capacity = capacity + opts.CacheBlobs = true + //opts.GcBlobs = true + opts.BlobFlushInterval = time.Second + opts.NoTriggers = noTriggers directBench := func(b *testing.B) { ci, err := NewDirectStorage(opts) if errors.Is(err, UnexpectedJournalMode) { @@ -93,10 +103,10 @@ func BenchmarkMarkComplete(b *testing.B) { defer ci.Close() runBench(b, ci) } - for _, journalMode := range []string{"", "wal", "off", "delete", "memory"} { + for _, journalMode := range []string{"", "wal", "off", "truncate", "delete", "persist", "memory"} { opts.SetJournalMode = journalMode b.Run("JournalMode="+journalMode, func(b *testing.B) { - for _, mmapSize := range []int64{-1, 0, 1 << 24, 1 << 25, 1 << 26} { + for _, mmapSize := range []int64{-1, 0, 1 << 23, 1 << 24, 1 << 25} { if memory && mmapSize >= 0 { continue } -- 2.44.0