From: Matt Joiner Date: Mon, 18 Jan 2021 08:29:53 +0000 (+1100) Subject: Improve configurability and add PutSized to sqlite storage X-Git-Tag: v1.22.0~23 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=cb5f80ec117400f444b8ced0e74a721e29c16096;p=btrtrc.git Improve configurability and add PutSized to sqlite storage --- diff --git a/client_test.go b/client_test.go index 6be06cbe..24820ac4 100644 --- a/client_test.go +++ b/client_test.go @@ -152,7 +152,12 @@ func TestAddDropManyTorrents(t *testing.T) { } func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl { - return storage.NewResourcePieces(fc.AsResourceProvider()) + return storage.NewResourcePiecesOpts( + fc.AsResourceProvider(), + storage.ResourcePiecesOpts{ + LeaveIncompleteChunks: true, + }, + ) } func TestMergingTrackersByAddingSpecs(t *testing.T) { diff --git a/storage/bench-resource-pieces.go b/storage/bench-resource-pieces.go index b72f25c0..753b914a 100644 --- a/storage/bench-resource-pieces.go +++ b/storage/bench-resource-pieces.go @@ -27,6 +27,7 @@ func BenchmarkPieceMarkComplete(tb testing.TB, pi PieceImpl, data []byte) { }(off) } wg.Wait() + //pi.MarkNotComplete() // This might not apply if users of this benchmark don't cache with the expected capacity. c.Assert(pi.Completion(), qt.Equals, Completion{Complete: false, Ok: true}) c.Assert(pi.MarkComplete(), qt.IsNil) diff --git a/storage/piece_resource.go b/storage/piece_resource.go index 04566515..11d80ed4 100644 --- a/storage/piece_resource.go +++ b/storage/piece_resource.go @@ -14,12 +14,22 @@ import ( ) type piecePerResource struct { - p PieceProvider + rp PieceProvider + opts ResourcePiecesOpts +} + +type ResourcePiecesOpts struct { + LeaveIncompleteChunks bool + AllowSizedPuts bool } func NewResourcePieces(p PieceProvider) ClientImpl { + return NewResourcePiecesOpts(p, ResourcePiecesOpts{}) +} + +func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl { return &piecePerResource{ - p: p, + rp: p, } } @@ -37,8 +47,8 @@ func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Has func (s piecePerResource) Piece(p metainfo.Piece) PieceImpl { return piecePerResourcePiece{ - mp: p, - rp: s.p, + mp: p, + piecePerResource: s, } } @@ -52,7 +62,7 @@ type ConsecutiveChunkWriter interface { type piecePerResourcePiece struct { mp metainfo.Piece - rp resource.Provider + piecePerResource } var _ io.WriterTo = piecePerResourcePiece{} @@ -90,6 +100,10 @@ func (s piecePerResourcePiece) Completion() Completion { } } +type SizedPutter interface { + PutSized(io.Reader, int64) error +} + func (s piecePerResourcePiece) MarkComplete() error { incompleteChunks := s.getChunks() r, w := io.Pipe() @@ -102,8 +116,19 @@ func (s piecePerResourcePiece) MarkComplete() error { } w.CloseWithError(err) }() - err := s.completed().Put(r) - if err == nil { + completedInstance := s.completed() + err := func() error { + if sp, ok := completedInstance.(SizedPutter); ok && s.opts.AllowSizedPuts { + return sp.PutSized(r, s.mp.Length()) + } else { + return completedInstance.Put(r) + } + }() + if err == nil && !s.opts.LeaveIncompleteChunks { + // I think we do this synchronously here since we don't want callers to act on the completed + // piece if we're concurrently still deleting chunks. The caller may decide to start + // downloading chunks again and won't expect us to delete them. It seems to be much faster + // to let the resource provider do this if possible. var wg sync.WaitGroup for _, c := range incompleteChunks { wg.Add(1) diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index b046d0d3..0f8dc136 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -52,6 +52,7 @@ func initSchema(conn conn) error { -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we -- can trim the database file size with partial vacuums without having to do a full vacuum, which -- locks everything. +pragma page_size=16384; pragma auto_vacuum=incremental; create table if not exists blob ( @@ -139,7 +140,10 @@ func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) conns.Close() return } - store := storage.NewResourcePieces(prov) + store := storage.NewResourcePiecesOpts(prov, storage.ResourcePiecesOpts{ + LeaveIncompleteChunks: true, + AllowSizedPuts: true, + }) return struct { storage.ClientImpl io.Closer @@ -259,15 +263,20 @@ func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) { if err != nil { return } - writes := make(chan writeRequest, 1<<(20-14)) - prov := &provider{pool: pool, writes: writes, opts: opts} - runtime.SetFinalizer(prov, func(p *provider) { - // This is done in a finalizer, as it's easier than trying to synchronize on whether the - // channel has been closed. It also means that the provider writer can pass back errors from - // a closed ConnPool. - close(p.writes) - }) - go providerWriter(writes, prov.pool) + prov := &provider{pool: pool, opts: opts} + if opts.BatchWrites { + if opts.NumConns < 2 { + err = errors.New("batch writes requires more than 1 conn") + return + } + writes := make(chan writeRequest) + prov.writes = writes + // This is retained for backwards compatibility. It may not be necessary. + runtime.SetFinalizer(prov, func(p *provider) { + p.Close() + }) + go providerWriter(writes, prov.pool) + } return prov, nil } @@ -301,9 +310,11 @@ type ConnPool interface { } type provider struct { - pool ConnPool - writes chan<- writeRequest - opts ProviderOpts + pool ConnPool + writes chan<- writeRequest + opts ProviderOpts + closed sync.Once + closeErr error } var _ storage.ConsecutiveChunkWriter = (*provider)(nil) @@ -337,7 +348,13 @@ func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written i } func (me *provider) Close() error { - return me.pool.Close() + me.closed.Do(func() { + if me.writes != nil { + close(me.writes) + } + me.closeErr = me.pool.Close() + }) + return me.closeErr } type writeRequest struct { @@ -350,6 +367,11 @@ var expvars = expvar.NewMap("sqliteStorage") // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have // stronger typing on the writes channel. func providerWriter(writes <-chan writeRequest, pool ConnPool) { + conn := pool.Get(context.TODO()) + if conn == nil { + return + } + defer pool.Put(conn) for { first, ok := <-writes if !ok { @@ -358,11 +380,6 @@ func providerWriter(writes <-chan writeRequest, pool ConnPool) { var buf []func() var cantFail error func() { - conn := pool.Get(context.TODO()) - if conn == nil { - return - } - defer pool.Put(conn) defer sqlitex.Save(conn)(&cantFail) firstErr := first.query(conn) buf = append(buf, func() { first.done <- firstErr }) @@ -513,28 +530,50 @@ func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, e return conn.OpenBlob("main", "blob", "data", rowid, write) } +func (i instance) PutSized(reader io.Reader, size int64) (err error) { + err = i.withConn(func(conn conn) error { + err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))", + nil, + i.location, size) + if err != nil { + return err + } + blob, err := i.openBlob(conn, true, false) + if err != nil { + return err + } + defer blob.Close() + _, err = io.Copy(blob, reader) + return err + }, true) + return +} + func (i instance) Put(reader io.Reader) (err error) { var buf bytes.Buffer _, err = io.Copy(&buf, reader) if err != nil { return err } - err = i.withConn(func(conn conn) error { - for range iter.N(10) { - err = sqlitex.Exec(conn, - "insert or replace into blob(name, data) values(?, cast(? as blob))", - nil, - i.location, buf.Bytes()) - if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY { - log.Print("sqlite busy") - time.Sleep(time.Second) - continue + if false { + return i.PutSized(&buf, int64(buf.Len())) + } else { + return i.withConn(func(conn conn) error { + for range iter.N(10) { + err = sqlitex.Exec(conn, + "insert or replace into blob(name, data) values(?, cast(? as blob))", + nil, + i.location, buf.Bytes()) + if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY { + log.Print("sqlite busy") + time.Sleep(time.Second) + continue + } + break } - break - } - return err - }, true) - return + return err + }, true) + } } type fileInfo struct { diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go index 836cc385..2427cf3a 100644 --- a/storage/sqlite/sqlite-storage_test.go +++ b/storage/sqlite/sqlite-storage_test.go @@ -42,7 +42,7 @@ func TestTextBlobSize(t *testing.T) { func TestSimultaneousIncrementalBlob(t *testing.T) { _, p := newConnsAndProv(t, NewPoolOpts{ - NumConns: 2, + NumConns: 3, ConcurrentBlobReads: true, }) a, err := p.NewInstance("a") @@ -76,7 +76,11 @@ func BenchmarkMarkComplete(b *testing.B) { rand.Read(data) dbPath := filepath.Join(b.TempDir(), "storage.db") b.Logf("storage db path: %q", dbPath) - ci, err := NewPiecesStorage(NewPoolOpts{Path: dbPath, Capacity: pieceSize}) + ci, err := NewPiecesStorage(NewPoolOpts{ + Path: dbPath, + Capacity: pieceSize, + ConcurrentBlobReads: true, + }) c.Assert(err, qt.IsNil) defer ci.Close() ti, err := ci.OpenTorrent(nil, metainfo.Hash{}) @@ -89,6 +93,8 @@ func BenchmarkMarkComplete(b *testing.B) { Length: pieceSize, }, }) + // Do it untimed the first time to prime the cache. + storage.BenchmarkPieceMarkComplete(b, pi, data) b.ResetTimer() for range iter.N(b.N) { storage.BenchmarkPieceMarkComplete(b, pi, data)