]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Don't create blobs when reading
authorMatt Joiner <anacrolix@gmail.com>
Tue, 11 May 2021 02:45:51 +0000 (12:45 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 14 May 2021 05:41:23 +0000 (15:41 +1000)
storage/sqlite/direct.go

index dd01816a0323773ccf2424fbd3a90f1393641539..dba48259dd74cc7e2d7e88501b84c459a4ebdd12 100644 (file)
@@ -2,6 +2,7 @@ package sqliteStorage
 
 import (
        "errors"
+       "fmt"
        "runtime"
        "sync"
        "time"
@@ -93,15 +94,25 @@ type torrent struct {
        c *client
 }
 
-func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
+func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
+       rowidOk := false
        err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
+               if rowidOk {
+                       panic("expected at most one row")
+               }
+               // TODO: How do we know if we got this wrong?
                rowid = stmt.ColumnInt64(0)
+               rowidOk = true
                return nil
        }, name)
        if err != nil {
                return
        }
-       if rowid != 0 {
+       if rowidOk {
+               return
+       }
+       if !create {
+               err = errors.New("no existing row")
                return
        }
        err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
@@ -137,13 +148,19 @@ func (p piece) doAtIoWithBlob(
        atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
        b []byte,
        off int64,
+       create bool,
 ) (n int, err error) {
        p.l.Lock()
        defer p.l.Unlock()
        if !p.opts.CacheBlobs {
                defer p.forgetBlob()
        }
-       n, err = atIo(p.getBlob())(b, off)
+       blob, err := p.getBlob(create)
+       if err != nil {
+               err = fmt.Errorf("getting blob: %w", err)
+               return
+       }
+       n, err = atIo(blob)(b, off)
        if err == nil {
                return
        }
@@ -151,23 +168,33 @@ func (p piece) doAtIoWithBlob(
        if !errors.As(err, &se) {
                return
        }
+       // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
+       // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
+       // because they may be attached to names that have since moved on to another blob.
        if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
                return
        }
        p.forgetBlob()
-       return atIo(p.getBlob())(b, off)
+       // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
+       // might be possible to skip to this version if we don't cache blobs.
+       blob, err = p.getBlob(create)
+       if err != nil {
+               err = fmt.Errorf("getting blob: %w", err)
+               return
+       }
+       return atIo(blob)(b, off)
 }
 
 func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
        return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
                return blob.ReadAt
-       }, b, off)
+       }, b, off, false)
 }
 
 func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
        return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
                return blob.WriteAt
-       }, b, off)
+       }, b, off, true)
 }
 
 func (p piece) MarkComplete() error {
@@ -211,12 +238,12 @@ func (p piece) Completion() (ret storage.Completion) {
        return
 }
 
-func (p piece) getBlob() *sqlite.Blob {
+func (p piece) getBlob(create bool) (*sqlite.Blob, error) {
        blob, ok := p.blobs[p.name]
        if !ok {
-               rowid, err := rowidForBlob(p.conn, p.name, p.length)
+               rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
                if err != nil {
-                       panic(err)
+                       return nil, fmt.Errorf("getting rowid for blob: %w", err)
                }
                blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
                if err != nil {
@@ -227,10 +254,13 @@ func (p piece) getBlob() *sqlite.Blob {
                        runtime.SetFinalizer(herp, func(*byte) {
                                p.l.Lock()
                                defer p.l.Unlock()
+                               // Note there's no guarantee that the finalizer fired while this blob is the same
+                               // one in the blob cache. It might be possible to rework this so that we check, or
+                               // strip finalizers as appropriate.
                                blob.Close()
                        })
                }
                p.blobs[p.name] = blob
        }
-       return blob
+       return blob, nil
 }