From e65fac26ac1c05c4e45760b460da8324ee8f5d5d Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sat, 14 Nov 2020 22:56:27 +1100 Subject: [PATCH] sqlite storage: Add error return from withConn --- storage/sqlite/sqlite-storage.go | 68 +++++++++++++++++++------------- 1 file changed, 40 insertions(+), 28 deletions(-) diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index 9dded6b5..6fc5bc42 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -308,7 +308,7 @@ type provider struct { var _ storage.ConsecutiveChunkWriter = (*provider)(nil) func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written int64, err error) { - p.withConn(func(conn conn) { + err = p.withConn(func(conn conn) error { err = io.EOF err = sqlitex.Exec(conn, ` select @@ -328,6 +328,7 @@ func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written i len(prefix), prefix, ) + return err }, false) return } @@ -337,8 +338,8 @@ func (me *provider) Close() error { } type writeRequest struct { - query func(*sqlite.Conn) - done chan<- struct{} + query withConn + done chan<- error } // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have @@ -349,19 +350,23 @@ func providerWriter(writes <-chan writeRequest, pool ConnPool) { if !ok { return } - buf := []chan<- struct{}{first.done} + var buf []func() var cantFail error func() { conn := pool.Get(context.TODO()) + if conn == nil { + return + } defer pool.Put(conn) defer sqlitex.Save(conn)(&cantFail) - first.query(conn) + firstErr := first.query(conn) + buf = append(buf, func() { first.done <- firstErr }) for { select { case wr, ok := <-writes: if ok { - buf = append(buf, wr.done) - wr.query(conn) + err := wr.query(conn) + buf = append(buf, func() { wr.done <- err }) continue } default: @@ -375,7 +380,7 @@ func providerWriter(writes <-chan writeRequest, pool ConnPool) { } // Signal done after we know the transaction succeeded. for _, done := range buf { - close(done) + done() } //log.Printf("batched %v write queries", len(buf)) } @@ -390,23 +395,28 @@ type instance struct { p *provider } -func (p *provider) withConn(with func(conn conn), write bool) { +func (p *provider) withConn(with withConn, write bool) error { if write && p.opts.BatchWrites { - done := make(chan struct{}) + done := make(chan error) p.writes <- writeRequest{ query: with, done: done, } - <-done + return <-done } else { conn := p.pool.Get(context.TODO()) + if conn == nil { + return errors.New("couldn't get pool conn") + } defer p.pool.Put(conn) - with(conn) + return with(conn) } } -func (i instance) withConn(with func(conn conn), write bool) { - i.p.withConn(with, write) +type withConn func(conn) error + +func (i instance) withConn(with withConn, write bool) error { + return i.p.withConn(with, write) } func (i instance) getConn() *sqlite.Conn { @@ -419,8 +429,8 @@ func (i instance) putConn(conn *sqlite.Conn) { func (i instance) Readdirnames() (names []string, err error) { prefix := i.location + "/" - i.withConn(func(conn conn) { - err = sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error { + err = i.withConn(func(conn conn) error { + return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error { names = append(names, stmt.ColumnText(0)[len(prefix):]) return nil }, prefix+"%") @@ -499,7 +509,7 @@ func (i instance) Put(reader io.Reader) (err error) { if err != nil { return err } - i.withConn(func(conn conn) { + err = i.withConn(func(conn conn) error { for range iter.N(10) { err = sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, cast(? as blob))", @@ -512,6 +522,7 @@ func (i instance) Put(reader io.Reader) (err error) { } break } + return err }, true) return } @@ -545,30 +556,31 @@ func (f fileInfo) Sys() interface{} { } func (i instance) Stat() (ret os.FileInfo, err error) { - i.withConn(func(conn conn) { + err = i.withConn(func(conn conn) error { var blob *sqlite.Blob blob, err = i.openBlob(conn, false, false) if err != nil { - return + return err } defer blob.Close() ret = fileInfo{blob.Size()} + return nil }, false) return } func (i instance) ReadAt(p []byte, off int64) (n int, err error) { - i.withConn(func(conn conn) { + err = i.withConn(func(conn conn) error { if false { var blob *sqlite.Blob blob, err = i.openBlob(conn, false, true) if err != nil { - return + return err } defer blob.Close() if off >= blob.Size() { err = io.EOF - return + return err } if off+int64(len(p)) > blob.Size() { p = p[:blob.Size()-off] @@ -591,16 +603,17 @@ func (i instance) ReadAt(p []byte, off int64) (n int, err error) { off+1, len(p), i.location, ) if err != nil { - return + return err } if !gotRow { err = errors.New("blob not found") - return + return err } if n < len(p) { err = io.EOF } } + return nil }, false) return } @@ -609,9 +622,8 @@ func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) { panic("implement me") } -func (i instance) Delete() (err error) { - i.withConn(func(conn conn) { - err = sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location) +func (i instance) Delete() error { + return i.withConn(func(conn conn) error { + return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location) }, true) - return } -- 2.48.1