]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Improve configurability and add PutSized to sqlite storage
authorMatt Joiner <anacrolix@gmail.com>
Mon, 18 Jan 2021 08:29:53 +0000 (19:29 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 25 Jan 2021 04:54:37 +0000 (15:54 +1100)
client_test.go
storage/bench-resource-pieces.go
storage/piece_resource.go
storage/sqlite/sqlite-storage.go
storage/sqlite/sqlite-storage_test.go

index 6be06cbef5a5e73f124e01bc48814c329a7613dc..24820ac49a07f41f520d507709332e33de577dac 100644 (file)
@@ -152,7 +152,12 @@ func TestAddDropManyTorrents(t *testing.T) {
 }
 
 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
-       return storage.NewResourcePieces(fc.AsResourceProvider())
+       return storage.NewResourcePiecesOpts(
+               fc.AsResourceProvider(),
+               storage.ResourcePiecesOpts{
+                       LeaveIncompleteChunks: true,
+               },
+       )
 }
 
 func TestMergingTrackersByAddingSpecs(t *testing.T) {
index b72f25c0b2eec0c1bab0720eb6c187f9800e11ec..753b914acbaa0741d0faa563d714f10d9cabd27e 100644 (file)
@@ -27,6 +27,7 @@ func BenchmarkPieceMarkComplete(tb testing.TB, pi PieceImpl, data []byte) {
                }(off)
        }
        wg.Wait()
+       //pi.MarkNotComplete()
        // This might not apply if users of this benchmark don't cache with the expected capacity.
        c.Assert(pi.Completion(), qt.Equals, Completion{Complete: false, Ok: true})
        c.Assert(pi.MarkComplete(), qt.IsNil)
index 04566515fc80b719fb0ee5f8c02ceef21b3f4722..11d80ed4f20cb2ad160cfeef564ec957e9b0890a 100644 (file)
@@ -14,12 +14,22 @@ import (
 )
 
 type piecePerResource struct {
-       p PieceProvider
+       rp   PieceProvider
+       opts ResourcePiecesOpts
+}
+
+type ResourcePiecesOpts struct {
+       LeaveIncompleteChunks bool
+       AllowSizedPuts        bool
 }
 
 func NewResourcePieces(p PieceProvider) ClientImpl {
+       return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
+}
+
+func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
        return &piecePerResource{
-               p: p,
+               rp: p,
        }
 }
 
@@ -37,8 +47,8 @@ func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Has
 
 func (s piecePerResource) Piece(p metainfo.Piece) PieceImpl {
        return piecePerResourcePiece{
-               mp: p,
-               rp: s.p,
+               mp:               p,
+               piecePerResource: s,
        }
 }
 
@@ -52,7 +62,7 @@ type ConsecutiveChunkWriter interface {
 
 type piecePerResourcePiece struct {
        mp metainfo.Piece
-       rp resource.Provider
+       piecePerResource
 }
 
 var _ io.WriterTo = piecePerResourcePiece{}
@@ -90,6 +100,10 @@ func (s piecePerResourcePiece) Completion() Completion {
        }
 }
 
+type SizedPutter interface {
+       PutSized(io.Reader, int64) error
+}
+
 func (s piecePerResourcePiece) MarkComplete() error {
        incompleteChunks := s.getChunks()
        r, w := io.Pipe()
@@ -102,8 +116,19 @@ func (s piecePerResourcePiece) MarkComplete() error {
                }
                w.CloseWithError(err)
        }()
-       err := s.completed().Put(r)
-       if err == nil {
+       completedInstance := s.completed()
+       err := func() error {
+               if sp, ok := completedInstance.(SizedPutter); ok && s.opts.AllowSizedPuts {
+                       return sp.PutSized(r, s.mp.Length())
+               } else {
+                       return completedInstance.Put(r)
+               }
+       }()
+       if err == nil && !s.opts.LeaveIncompleteChunks {
+               // I think we do this synchronously here since we don't want callers to act on the completed
+               // piece if we're concurrently still deleting chunks. The caller may decide to start
+               // downloading chunks again and won't expect us to delete them. It seems to be much faster
+               // to let the resource provider do this if possible.
                var wg sync.WaitGroup
                for _, c := range incompleteChunks {
                        wg.Add(1)
index b046d0d3fddd814424a8cb7df82aef8e1b99d3c5..0f8dc13670b8c720ee581d387e98ac3bcf4c71a5 100644 (file)
@@ -52,6 +52,7 @@ func initSchema(conn conn) error {
 -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
 -- can trim the database file size with partial vacuums without having to do a full vacuum, which 
 -- locks everything.
+pragma page_size=16384;
 pragma auto_vacuum=incremental;
 
 create table if not exists blob (
@@ -139,7 +140,10 @@ func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error)
                conns.Close()
                return
        }
-       store := storage.NewResourcePieces(prov)
+       store := storage.NewResourcePiecesOpts(prov, storage.ResourcePiecesOpts{
+               LeaveIncompleteChunks: true,
+               AllowSizedPuts:        true,
+       })
        return struct {
                storage.ClientImpl
                io.Closer
@@ -259,15 +263,20 @@ func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
        if err != nil {
                return
        }
-       writes := make(chan writeRequest, 1<<(20-14))
-       prov := &provider{pool: pool, writes: writes, opts: opts}
-       runtime.SetFinalizer(prov, func(p *provider) {
-               // This is done in a finalizer, as it's easier than trying to synchronize on whether the
-               // channel has been closed. It also means that the provider writer can pass back errors from
-               // a closed ConnPool.
-               close(p.writes)
-       })
-       go providerWriter(writes, prov.pool)
+       prov := &provider{pool: pool, opts: opts}
+       if opts.BatchWrites {
+               if opts.NumConns < 2 {
+                       err = errors.New("batch writes requires more than 1 conn")
+                       return
+               }
+               writes := make(chan writeRequest)
+               prov.writes = writes
+               // This is retained for backwards compatibility. It may not be necessary.
+               runtime.SetFinalizer(prov, func(p *provider) {
+                       p.Close()
+               })
+               go providerWriter(writes, prov.pool)
+       }
        return prov, nil
 }
 
@@ -301,9 +310,11 @@ type ConnPool interface {
 }
 
 type provider struct {
-       pool   ConnPool
-       writes chan<- writeRequest
-       opts   ProviderOpts
+       pool     ConnPool
+       writes   chan<- writeRequest
+       opts     ProviderOpts
+       closed   sync.Once
+       closeErr error
 }
 
 var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
@@ -337,7 +348,13 @@ func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written i
 }
 
 func (me *provider) Close() error {
-       return me.pool.Close()
+       me.closed.Do(func() {
+               if me.writes != nil {
+                       close(me.writes)
+               }
+               me.closeErr = me.pool.Close()
+       })
+       return me.closeErr
 }
 
 type writeRequest struct {
@@ -350,6 +367,11 @@ var expvars = expvar.NewMap("sqliteStorage")
 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
 // stronger typing on the writes channel.
 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
+       conn := pool.Get(context.TODO())
+       if conn == nil {
+               return
+       }
+       defer pool.Put(conn)
        for {
                first, ok := <-writes
                if !ok {
@@ -358,11 +380,6 @@ func providerWriter(writes <-chan writeRequest, pool ConnPool) {
                var buf []func()
                var cantFail error
                func() {
-                       conn := pool.Get(context.TODO())
-                       if conn == nil {
-                               return
-                       }
-                       defer pool.Put(conn)
                        defer sqlitex.Save(conn)(&cantFail)
                        firstErr := first.query(conn)
                        buf = append(buf, func() { first.done <- firstErr })
@@ -513,28 +530,50 @@ func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, e
        return conn.OpenBlob("main", "blob", "data", rowid, write)
 }
 
+func (i instance) PutSized(reader io.Reader, size int64) (err error) {
+       err = i.withConn(func(conn conn) error {
+               err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
+                       nil,
+                       i.location, size)
+               if err != nil {
+                       return err
+               }
+               blob, err := i.openBlob(conn, true, false)
+               if err != nil {
+                       return err
+               }
+               defer blob.Close()
+               _, err = io.Copy(blob, reader)
+               return err
+       }, true)
+       return
+}
+
 func (i instance) Put(reader io.Reader) (err error) {
        var buf bytes.Buffer
        _, err = io.Copy(&buf, reader)
        if err != nil {
                return err
        }
-       err = i.withConn(func(conn conn) error {
-               for range iter.N(10) {
-                       err = sqlitex.Exec(conn,
-                               "insert or replace into blob(name, data) values(?, cast(? as blob))",
-                               nil,
-                               i.location, buf.Bytes())
-                       if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
-                               log.Print("sqlite busy")
-                               time.Sleep(time.Second)
-                               continue
+       if false {
+               return i.PutSized(&buf, int64(buf.Len()))
+       } else {
+               return i.withConn(func(conn conn) error {
+                       for range iter.N(10) {
+                               err = sqlitex.Exec(conn,
+                                       "insert or replace into blob(name, data) values(?, cast(? as blob))",
+                                       nil,
+                                       i.location, buf.Bytes())
+                               if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
+                                       log.Print("sqlite busy")
+                                       time.Sleep(time.Second)
+                                       continue
+                               }
+                               break
                        }
-                       break
-               }
-               return err
-       }, true)
-       return
+                       return err
+               }, true)
+       }
 }
 
 type fileInfo struct {
index 836cc385efa4d7b2fa8ecd9c0d11dc6b576ade80..2427cf3a3f6e9afa583dc76f448eba9a231e5cb8 100644 (file)
@@ -42,7 +42,7 @@ func TestTextBlobSize(t *testing.T) {
 
 func TestSimultaneousIncrementalBlob(t *testing.T) {
        _, p := newConnsAndProv(t, NewPoolOpts{
-               NumConns:            2,
+               NumConns:            3,
                ConcurrentBlobReads: true,
        })
        a, err := p.NewInstance("a")
@@ -76,7 +76,11 @@ func BenchmarkMarkComplete(b *testing.B) {
        rand.Read(data)
        dbPath := filepath.Join(b.TempDir(), "storage.db")
        b.Logf("storage db path: %q", dbPath)
-       ci, err := NewPiecesStorage(NewPoolOpts{Path: dbPath, Capacity: pieceSize})
+       ci, err := NewPiecesStorage(NewPoolOpts{
+               Path:                dbPath,
+               Capacity:            pieceSize,
+               ConcurrentBlobReads: true,
+       })
        c.Assert(err, qt.IsNil)
        defer ci.Close()
        ti, err := ci.OpenTorrent(nil, metainfo.Hash{})
@@ -89,6 +93,8 @@ func BenchmarkMarkComplete(b *testing.B) {
                        Length:      pieceSize,
                },
        })
+       // Do it untimed the first time to prime the cache.
+       storage.BenchmarkPieceMarkComplete(b, pi, data)
        b.ResetTimer()
        for range iter.N(b.N) {
                storage.BenchmarkPieceMarkComplete(b, pi, data)