From cb5f80ec117400f444b8ced0e74a721e29c16096 Mon Sep 17 00:00:00 2001
From: Matt Joiner <anacrolix@gmail.com>
Date: Mon, 18 Jan 2021 19:29:53 +1100
Subject: [PATCH] Improve configurability and add PutSized to sqlite storage

---
 client_test.go                        |   7 +-
 storage/bench-resource-pieces.go      |   1 +
 storage/piece_resource.go             |  39 ++++++++--
 storage/sqlite/sqlite-storage.go      | 107 ++++++++++++++++++--------
 storage/sqlite/sqlite-storage_test.go |  10 ++-
 5 files changed, 120 insertions(+), 44 deletions(-)

diff --git a/client_test.go b/client_test.go
index 6be06cbe..24820ac4 100644
--- a/client_test.go
+++ b/client_test.go
@@ -152,7 +152,12 @@ func TestAddDropManyTorrents(t *testing.T) {
 }
 
 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
-	return storage.NewResourcePieces(fc.AsResourceProvider())
+	return storage.NewResourcePiecesOpts(
+		fc.AsResourceProvider(),
+		storage.ResourcePiecesOpts{
+			LeaveIncompleteChunks: true,
+		},
+	)
 }
 
 func TestMergingTrackersByAddingSpecs(t *testing.T) {
diff --git a/storage/bench-resource-pieces.go b/storage/bench-resource-pieces.go
index b72f25c0..753b914a 100644
--- a/storage/bench-resource-pieces.go
+++ b/storage/bench-resource-pieces.go
@@ -27,6 +27,7 @@ func BenchmarkPieceMarkComplete(tb testing.TB, pi PieceImpl, data []byte) {
 		}(off)
 	}
 	wg.Wait()
+	//pi.MarkNotComplete()
 	// This might not apply if users of this benchmark don't cache with the expected capacity.
 	c.Assert(pi.Completion(), qt.Equals, Completion{Complete: false, Ok: true})
 	c.Assert(pi.MarkComplete(), qt.IsNil)
diff --git a/storage/piece_resource.go b/storage/piece_resource.go
index 04566515..11d80ed4 100644
--- a/storage/piece_resource.go
+++ b/storage/piece_resource.go
@@ -14,12 +14,22 @@ import (
 )
 
 type piecePerResource struct {
-	p PieceProvider
+	rp   PieceProvider
+	opts ResourcePiecesOpts
+}
+
+type ResourcePiecesOpts struct {
+	LeaveIncompleteChunks bool
+	AllowSizedPuts        bool
 }
 
 func NewResourcePieces(p PieceProvider) ClientImpl {
+	return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
+}
+
+func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
 	return &piecePerResource{
-		p: p,
+		rp: p,
 	}
 }
 
@@ -37,8 +47,8 @@ func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Has
 
 func (s piecePerResource) Piece(p metainfo.Piece) PieceImpl {
 	return piecePerResourcePiece{
-		mp: p,
-		rp: s.p,
+		mp:               p,
+		piecePerResource: s,
 	}
 }
 
@@ -52,7 +62,7 @@ type ConsecutiveChunkWriter interface {
 
 type piecePerResourcePiece struct {
 	mp metainfo.Piece
-	rp resource.Provider
+	piecePerResource
 }
 
 var _ io.WriterTo = piecePerResourcePiece{}
@@ -90,6 +100,10 @@ func (s piecePerResourcePiece) Completion() Completion {
 	}
 }
 
+type SizedPutter interface {
+	PutSized(io.Reader, int64) error
+}
+
 func (s piecePerResourcePiece) MarkComplete() error {
 	incompleteChunks := s.getChunks()
 	r, w := io.Pipe()
@@ -102,8 +116,19 @@ func (s piecePerResourcePiece) MarkComplete() error {
 		}
 		w.CloseWithError(err)
 	}()
-	err := s.completed().Put(r)
-	if err == nil {
+	completedInstance := s.completed()
+	err := func() error {
+		if sp, ok := completedInstance.(SizedPutter); ok && s.opts.AllowSizedPuts {
+			return sp.PutSized(r, s.mp.Length())
+		} else {
+			return completedInstance.Put(r)
+		}
+	}()
+	if err == nil && !s.opts.LeaveIncompleteChunks {
+		// I think we do this synchronously here since we don't want callers to act on the completed
+		// piece if we're concurrently still deleting chunks. The caller may decide to start
+		// downloading chunks again and won't expect us to delete them. It seems to be much faster
+		// to let the resource provider do this if possible.
 		var wg sync.WaitGroup
 		for _, c := range incompleteChunks {
 			wg.Add(1)
diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go
index b046d0d3..0f8dc136 100644
--- a/storage/sqlite/sqlite-storage.go
+++ b/storage/sqlite/sqlite-storage.go
@@ -52,6 +52,7 @@ func initSchema(conn conn) error {
 -- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
 -- can trim the database file size with partial vacuums without having to do a full vacuum, which 
 -- locks everything.
+pragma page_size=16384;
 pragma auto_vacuum=incremental;
 
 create table if not exists blob (
@@ -139,7 +140,10 @@ func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error)
 		conns.Close()
 		return
 	}
-	store := storage.NewResourcePieces(prov)
+	store := storage.NewResourcePiecesOpts(prov, storage.ResourcePiecesOpts{
+		LeaveIncompleteChunks: true,
+		AllowSizedPuts:        true,
+	})
 	return struct {
 		storage.ClientImpl
 		io.Closer
@@ -259,15 +263,20 @@ func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
 	if err != nil {
 		return
 	}
-	writes := make(chan writeRequest, 1<<(20-14))
-	prov := &provider{pool: pool, writes: writes, opts: opts}
-	runtime.SetFinalizer(prov, func(p *provider) {
-		// This is done in a finalizer, as it's easier than trying to synchronize on whether the
-		// channel has been closed. It also means that the provider writer can pass back errors from
-		// a closed ConnPool.
-		close(p.writes)
-	})
-	go providerWriter(writes, prov.pool)
+	prov := &provider{pool: pool, opts: opts}
+	if opts.BatchWrites {
+		if opts.NumConns < 2 {
+			err = errors.New("batch writes requires more than 1 conn")
+			return
+		}
+		writes := make(chan writeRequest)
+		prov.writes = writes
+		// This is retained for backwards compatibility. It may not be necessary.
+		runtime.SetFinalizer(prov, func(p *provider) {
+			p.Close()
+		})
+		go providerWriter(writes, prov.pool)
+	}
 	return prov, nil
 }
 
@@ -301,9 +310,11 @@ type ConnPool interface {
 }
 
 type provider struct {
-	pool   ConnPool
-	writes chan<- writeRequest
-	opts   ProviderOpts
+	pool     ConnPool
+	writes   chan<- writeRequest
+	opts     ProviderOpts
+	closed   sync.Once
+	closeErr error
 }
 
 var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
@@ -337,7 +348,13 @@ func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written i
 }
 
 func (me *provider) Close() error {
-	return me.pool.Close()
+	me.closed.Do(func() {
+		if me.writes != nil {
+			close(me.writes)
+		}
+		me.closeErr = me.pool.Close()
+	})
+	return me.closeErr
 }
 
 type writeRequest struct {
@@ -350,6 +367,11 @@ var expvars = expvar.NewMap("sqliteStorage")
 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
 // stronger typing on the writes channel.
 func providerWriter(writes <-chan writeRequest, pool ConnPool) {
+	conn := pool.Get(context.TODO())
+	if conn == nil {
+		return
+	}
+	defer pool.Put(conn)
 	for {
 		first, ok := <-writes
 		if !ok {
@@ -358,11 +380,6 @@ func providerWriter(writes <-chan writeRequest, pool ConnPool) {
 		var buf []func()
 		var cantFail error
 		func() {
-			conn := pool.Get(context.TODO())
-			if conn == nil {
-				return
-			}
-			defer pool.Put(conn)
 			defer sqlitex.Save(conn)(&cantFail)
 			firstErr := first.query(conn)
 			buf = append(buf, func() { first.done <- firstErr })
@@ -513,28 +530,50 @@ func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, e
 	return conn.OpenBlob("main", "blob", "data", rowid, write)
 }
 
+func (i instance) PutSized(reader io.Reader, size int64) (err error) {
+	err = i.withConn(func(conn conn) error {
+		err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
+			nil,
+			i.location, size)
+		if err != nil {
+			return err
+		}
+		blob, err := i.openBlob(conn, true, false)
+		if err != nil {
+			return err
+		}
+		defer blob.Close()
+		_, err = io.Copy(blob, reader)
+		return err
+	}, true)
+	return
+}
+
 func (i instance) Put(reader io.Reader) (err error) {
 	var buf bytes.Buffer
 	_, err = io.Copy(&buf, reader)
 	if err != nil {
 		return err
 	}
-	err = i.withConn(func(conn conn) error {
-		for range iter.N(10) {
-			err = sqlitex.Exec(conn,
-				"insert or replace into blob(name, data) values(?, cast(? as blob))",
-				nil,
-				i.location, buf.Bytes())
-			if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
-				log.Print("sqlite busy")
-				time.Sleep(time.Second)
-				continue
+	if false {
+		return i.PutSized(&buf, int64(buf.Len()))
+	} else {
+		return i.withConn(func(conn conn) error {
+			for range iter.N(10) {
+				err = sqlitex.Exec(conn,
+					"insert or replace into blob(name, data) values(?, cast(? as blob))",
+					nil,
+					i.location, buf.Bytes())
+				if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
+					log.Print("sqlite busy")
+					time.Sleep(time.Second)
+					continue
+				}
+				break
 			}
-			break
-		}
-		return err
-	}, true)
-	return
+			return err
+		}, true)
+	}
 }
 
 type fileInfo struct {
diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go
index 836cc385..2427cf3a 100644
--- a/storage/sqlite/sqlite-storage_test.go
+++ b/storage/sqlite/sqlite-storage_test.go
@@ -42,7 +42,7 @@ func TestTextBlobSize(t *testing.T) {
 
 func TestSimultaneousIncrementalBlob(t *testing.T) {
 	_, p := newConnsAndProv(t, NewPoolOpts{
-		NumConns:            2,
+		NumConns:            3,
 		ConcurrentBlobReads: true,
 	})
 	a, err := p.NewInstance("a")
@@ -76,7 +76,11 @@ func BenchmarkMarkComplete(b *testing.B) {
 	rand.Read(data)
 	dbPath := filepath.Join(b.TempDir(), "storage.db")
 	b.Logf("storage db path: %q", dbPath)
-	ci, err := NewPiecesStorage(NewPoolOpts{Path: dbPath, Capacity: pieceSize})
+	ci, err := NewPiecesStorage(NewPoolOpts{
+		Path:                dbPath,
+		Capacity:            pieceSize,
+		ConcurrentBlobReads: true,
+	})
 	c.Assert(err, qt.IsNil)
 	defer ci.Close()
 	ti, err := ci.OpenTorrent(nil, metainfo.Hash{})
@@ -89,6 +93,8 @@ func BenchmarkMarkComplete(b *testing.B) {
 			Length:      pieceSize,
 		},
 	})
+	// Do it untimed the first time to prime the cache.
+	storage.BenchmarkPieceMarkComplete(b, pi, data)
 	b.ResetTimer()
 	for range iter.N(b.N) {
 		storage.BenchmarkPieceMarkComplete(b, pi, data)
-- 
2.51.0