var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written int64, err error) {
- p.withConn(func(conn conn) {
+ err = p.withConn(func(conn conn) error {
err = io.EOF
err = sqlitex.Exec(conn, `
select
len(prefix),
prefix,
)
+ return err
}, false)
return
}
}
type writeRequest struct {
- query func(*sqlite.Conn)
- done chan<- struct{}
+ query withConn
+ done chan<- error
}
// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
if !ok {
return
}
- buf := []chan<- struct{}{first.done}
+ var buf []func()
var cantFail error
func() {
conn := pool.Get(context.TODO())
+ if conn == nil {
+ return
+ }
defer pool.Put(conn)
defer sqlitex.Save(conn)(&cantFail)
- first.query(conn)
+ firstErr := first.query(conn)
+ buf = append(buf, func() { first.done <- firstErr })
for {
select {
case wr, ok := <-writes:
if ok {
- buf = append(buf, wr.done)
- wr.query(conn)
+ err := wr.query(conn)
+ buf = append(buf, func() { wr.done <- err })
continue
}
default:
}
// Signal done after we know the transaction succeeded.
for _, done := range buf {
- close(done)
+ done()
}
//log.Printf("batched %v write queries", len(buf))
}
p *provider
}
-func (p *provider) withConn(with func(conn conn), write bool) {
+func (p *provider) withConn(with withConn, write bool) error {
if write && p.opts.BatchWrites {
- done := make(chan struct{})
+ done := make(chan error)
p.writes <- writeRequest{
query: with,
done: done,
}
- <-done
+ return <-done
} else {
conn := p.pool.Get(context.TODO())
+ if conn == nil {
+ return errors.New("couldn't get pool conn")
+ }
defer p.pool.Put(conn)
- with(conn)
+ return with(conn)
}
}
-func (i instance) withConn(with func(conn conn), write bool) {
- i.p.withConn(with, write)
+type withConn func(conn) error
+
+func (i instance) withConn(with withConn, write bool) error {
+ return i.p.withConn(with, write)
}
func (i instance) getConn() *sqlite.Conn {
func (i instance) Readdirnames() (names []string, err error) {
prefix := i.location + "/"
- i.withConn(func(conn conn) {
- err = sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
+ err = i.withConn(func(conn conn) error {
+ return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
names = append(names, stmt.ColumnText(0)[len(prefix):])
return nil
}, prefix+"%")
if err != nil {
return err
}
- i.withConn(func(conn conn) {
+ err = i.withConn(func(conn conn) error {
for range iter.N(10) {
err = sqlitex.Exec(conn,
"insert or replace into blob(name, data) values(?, cast(? as blob))",
}
break
}
+ return err
}, true)
return
}
}
func (i instance) Stat() (ret os.FileInfo, err error) {
- i.withConn(func(conn conn) {
+ err = i.withConn(func(conn conn) error {
var blob *sqlite.Blob
blob, err = i.openBlob(conn, false, false)
if err != nil {
- return
+ return err
}
defer blob.Close()
ret = fileInfo{blob.Size()}
+ return nil
}, false)
return
}
func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
- i.withConn(func(conn conn) {
+ err = i.withConn(func(conn conn) error {
if false {
var blob *sqlite.Blob
blob, err = i.openBlob(conn, false, true)
if err != nil {
- return
+ return err
}
defer blob.Close()
if off >= blob.Size() {
err = io.EOF
- return
+ return err
}
if off+int64(len(p)) > blob.Size() {
p = p[:blob.Size()-off]
off+1, len(p), i.location,
)
if err != nil {
- return
+ return err
}
if !gotRow {
err = errors.New("blob not found")
- return
+ return err
}
if n < len(p) {
err = io.EOF
}
}
+ return nil
}, false)
return
}
panic("implement me")
}
-func (i instance) Delete() (err error) {
- i.withConn(func(conn conn) {
- err = sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
+func (i instance) Delete() error {
+ return i.withConn(func(conn conn) error {
+ return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
}, true)
- return
}