import (
"errors"
+ "fmt"
"runtime"
"sync"
"time"
c *client
}
-func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
+func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
+ rowidOk := false
err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
+ if rowidOk {
+ panic("expected at most one row")
+ }
+ // TODO: How do we know if we got this wrong?
rowid = stmt.ColumnInt64(0)
+ rowidOk = true
return nil
}, name)
if err != nil {
return
}
- if rowid != 0 {
+ if rowidOk {
+ return
+ }
+ if !create {
+ err = errors.New("no existing row")
return
}
err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
b []byte,
off int64,
+ create bool,
) (n int, err error) {
p.l.Lock()
defer p.l.Unlock()
if !p.opts.CacheBlobs {
defer p.forgetBlob()
}
- n, err = atIo(p.getBlob())(b, off)
+ blob, err := p.getBlob(create)
+ if err != nil {
+ err = fmt.Errorf("getting blob: %w", err)
+ return
+ }
+ n, err = atIo(blob)(b, off)
if err == nil {
return
}
if !errors.As(err, &se) {
return
}
+ // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
+ // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
+ // because they may be attached to names that have since moved on to another blob.
if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
return
}
p.forgetBlob()
- return atIo(p.getBlob())(b, off)
+ // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
+ // might be possible to skip to this version if we don't cache blobs.
+ blob, err = p.getBlob(create)
+ if err != nil {
+ err = fmt.Errorf("getting blob: %w", err)
+ return
+ }
+ return atIo(blob)(b, off)
}
func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
return blob.ReadAt
- }, b, off)
+ }, b, off, false)
}
func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
return blob.WriteAt
- }, b, off)
+ }, b, off, true)
}
func (p piece) MarkComplete() error {
return
}
-func (p piece) getBlob() *sqlite.Blob {
+func (p piece) getBlob(create bool) (*sqlite.Blob, error) {
blob, ok := p.blobs[p.name]
if !ok {
- rowid, err := rowidForBlob(p.conn, p.name, p.length)
+ rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
if err != nil {
- panic(err)
+ return nil, fmt.Errorf("getting rowid for blob: %w", err)
}
blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
if err != nil {
runtime.SetFinalizer(herp, func(*byte) {
p.l.Lock()
defer p.l.Unlock()
+ // Note there's no guarantee that the finalizer fired while this blob is the same
+ // one in the blob cache. It might be possible to rework this so that we check, or
+ // strip finalizers as appropriate.
blob.Close()
})
}
p.blobs[p.name] = blob
}
- return blob
+ return blob, nil
}