]> Sergey Matveev's repositories - btrtrc.git/commitdiff
sqlite storage: Add error return from withConn
authorMatt Joiner <anacrolix@gmail.com>
Sat, 14 Nov 2020 11:56:27 +0000 (22:56 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 16 Nov 2020 05:37:11 +0000 (16:37 +1100)
storage/sqlite/sqlite-storage.go

index 9dded6b567a3d15c1ee3be8e7c10db6025d8a6c6..6fc5bc42c34f161203aa22cacd34fbd256f897e8 100644 (file)
@@ -308,7 +308,7 @@ type provider struct {
 var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
 
 func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written int64, err error) {
-       p.withConn(func(conn conn) {
+       err = p.withConn(func(conn conn) error {
                err = io.EOF
                err = sqlitex.Exec(conn, `
                                select
@@ -328,6 +328,7 @@ func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written i
                        len(prefix),
                        prefix,
                )
+               return err
        }, false)
        return
 }
@@ -337,8 +338,8 @@ func (me *provider) Close() error {
 }
 
 type writeRequest struct {
-       query func(*sqlite.Conn)
-       done  chan<- struct{}
+       query withConn
+       done  chan<- error
 }
 
 // Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
@@ -349,19 +350,23 @@ func providerWriter(writes <-chan writeRequest, pool ConnPool) {
                if !ok {
                        return
                }
-               buf := []chan<- struct{}{first.done}
+               var buf []func()
                var cantFail error
                func() {
                        conn := pool.Get(context.TODO())
+                       if conn == nil {
+                               return
+                       }
                        defer pool.Put(conn)
                        defer sqlitex.Save(conn)(&cantFail)
-                       first.query(conn)
+                       firstErr := first.query(conn)
+                       buf = append(buf, func() { first.done <- firstErr })
                        for {
                                select {
                                case wr, ok := <-writes:
                                        if ok {
-                                               buf = append(buf, wr.done)
-                                               wr.query(conn)
+                                               err := wr.query(conn)
+                                               buf = append(buf, func() { wr.done <- err })
                                                continue
                                        }
                                default:
@@ -375,7 +380,7 @@ func providerWriter(writes <-chan writeRequest, pool ConnPool) {
                }
                // Signal done after we know the transaction succeeded.
                for _, done := range buf {
-                       close(done)
+                       done()
                }
                //log.Printf("batched %v write queries", len(buf))
        }
@@ -390,23 +395,28 @@ type instance struct {
        p        *provider
 }
 
-func (p *provider) withConn(with func(conn conn), write bool) {
+func (p *provider) withConn(with withConn, write bool) error {
        if write && p.opts.BatchWrites {
-               done := make(chan struct{})
+               done := make(chan error)
                p.writes <- writeRequest{
                        query: with,
                        done:  done,
                }
-               <-done
+               return <-done
        } else {
                conn := p.pool.Get(context.TODO())
+               if conn == nil {
+                       return errors.New("couldn't get pool conn")
+               }
                defer p.pool.Put(conn)
-               with(conn)
+               return with(conn)
        }
 }
 
-func (i instance) withConn(with func(conn conn), write bool) {
-       i.p.withConn(with, write)
+type withConn func(conn) error
+
+func (i instance) withConn(with withConn, write bool) error {
+       return i.p.withConn(with, write)
 }
 
 func (i instance) getConn() *sqlite.Conn {
@@ -419,8 +429,8 @@ func (i instance) putConn(conn *sqlite.Conn) {
 
 func (i instance) Readdirnames() (names []string, err error) {
        prefix := i.location + "/"
-       i.withConn(func(conn conn) {
-               err = sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
+       err = i.withConn(func(conn conn) error {
+               return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
                        names = append(names, stmt.ColumnText(0)[len(prefix):])
                        return nil
                }, prefix+"%")
@@ -499,7 +509,7 @@ func (i instance) Put(reader io.Reader) (err error) {
        if err != nil {
                return err
        }
-       i.withConn(func(conn conn) {
+       err = i.withConn(func(conn conn) error {
                for range iter.N(10) {
                        err = sqlitex.Exec(conn,
                                "insert or replace into blob(name, data) values(?, cast(? as blob))",
@@ -512,6 +522,7 @@ func (i instance) Put(reader io.Reader) (err error) {
                        }
                        break
                }
+               return err
        }, true)
        return
 }
@@ -545,30 +556,31 @@ func (f fileInfo) Sys() interface{} {
 }
 
 func (i instance) Stat() (ret os.FileInfo, err error) {
-       i.withConn(func(conn conn) {
+       err = i.withConn(func(conn conn) error {
                var blob *sqlite.Blob
                blob, err = i.openBlob(conn, false, false)
                if err != nil {
-                       return
+                       return err
                }
                defer blob.Close()
                ret = fileInfo{blob.Size()}
+               return nil
        }, false)
        return
 }
 
 func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
-       i.withConn(func(conn conn) {
+       err = i.withConn(func(conn conn) error {
                if false {
                        var blob *sqlite.Blob
                        blob, err = i.openBlob(conn, false, true)
                        if err != nil {
-                               return
+                               return err
                        }
                        defer blob.Close()
                        if off >= blob.Size() {
                                err = io.EOF
-                               return
+                               return err
                        }
                        if off+int64(len(p)) > blob.Size() {
                                p = p[:blob.Size()-off]
@@ -591,16 +603,17 @@ func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
                                off+1, len(p), i.location,
                        )
                        if err != nil {
-                               return
+                               return err
                        }
                        if !gotRow {
                                err = errors.New("blob not found")
-                               return
+                               return err
                        }
                        if n < len(p) {
                                err = io.EOF
                        }
                }
+               return nil
        }, false)
        return
 }
@@ -609,9 +622,8 @@ func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
        panic("implement me")
 }
 
-func (i instance) Delete() (err error) {
-       i.withConn(func(conn conn) {
-               err = sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
+func (i instance) Delete() error {
+       return i.withConn(func(conn conn) error {
+               return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
        }, true)
-       return
 }