var _ storage.ConsecutiveChunkReader = (*provider)(nil)
func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
+ p.closeMu.RLock()
runner, err := p.getReadWithConnRunner()
if err != nil {
+ p.closeMu.RUnlock()
return nil, err
}
r, w := io.Pipe()
go func() {
+ defer p.closeMu.RUnlock()
err = runner(func(_ context.Context, conn conn) error {
var written int64
err = sqlitex.Exec(conn, `
func (p *provider) withConn(with withConn, write bool, skip int) error {
p.closeMu.RLock()
+ // I think we need to check this here because it may not be valid to send to the writes channel
+ // if we're already closed. So don't try to move this check into getReadWithConnRunner.
if p.closed {
p.closeMu.RUnlock()
return errors.New("closed")
}
// Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
-// function, the runner *must* be used or the conn is leaked.
+// function, the runner *must* be used or the conn is leaked. You should check the provider isn't
+// closed before using this.
func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
conn := p.pool.Get(context.TODO())
if conn == nil {