14 "crawshaw.io/sqlite/sqlitex"
16 "github.com/anacrolix/torrent/metainfo"
17 "github.com/anacrolix/torrent/storage"
20 type NewDirectStorageOpts struct {
26 BlobFlushInterval time.Duration
29 func NewSquirrelCache(opts NewDirectStorageOpts) (_ *SquirrelCache, err error) {
30 conn, err := newConn(opts.NewConnOpts)
34 if opts.PageSize == 0 {
35 // The largest size sqlite supports. I think we want this to be the smallest SquirrelBlob size we
36 // can expect, which is probably 1<<17.
37 opts.PageSize = 1 << 16
39 err = initDatabase(conn, opts.InitDbOpts)
44 err = initConn(conn, opts.InitConnOpts)
49 if opts.BlobFlushInterval == 0 && !opts.GcBlobs {
50 // This is influenced by typical busy timeouts, of 5-10s. We want to give other connections
51 // a few chances at getting a transaction through.
52 opts.BlobFlushInterval = time.Second
56 blobs: make(map[string]*sqlite.Blob),
59 // Avoid race with cl.blobFlusherFunc
62 if opts.BlobFlushInterval != 0 {
63 cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
65 cl.capacity = cl.getCapacity
69 // A convenience function that creates a connection pool, resource provider, and a pieces storage
70 // ClientImpl and returns them all with a Close attached.
71 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
72 cache, err := NewSquirrelCache(opts)
76 return &client{cache}, nil
79 func (cl *SquirrelCache) getCapacity() (ret *int64) {
82 err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error {
84 *ret = stmt.ColumnInt64(0)
93 type SquirrelCache struct {
96 blobs map[string]*sqlite.Blob
97 blobFlusher *time.Timer
98 opts NewDirectStorageOpts
100 capacity func() *int64
107 func (c *SquirrelCache) blobFlusherFunc() {
112 c.blobFlusher.Reset(c.opts.BlobFlushInterval)
116 func (c *SquirrelCache) flushBlobs() {
117 for key, b := range c.blobs {
118 // Need the lock to prevent racing with the GC finalizers.
124 func (c *SquirrelCache) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
126 return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil
129 func (c *SquirrelCache) Close() (err error) {
133 if c.opts.BlobFlushInterval != 0 {
144 type torrent struct {
148 func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
150 err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
152 panic("expected at most one row")
154 // TODO: How do we know if we got this wrong?
155 rowid = stmt.ColumnInt64(0)
166 err = errors.New("no existing row")
169 err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
173 rowid = c.LastInsertRowID()
177 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
178 name := p.Hash().HexString()
179 return piece{SquirrelBlob{
186 func (t torrent) Close() error {
190 type SquirrelBlob struct {
200 func (p SquirrelBlob) doAtIoWithBlob(
201 atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
205 ) (n int, err error) {
208 if p.opts.NoCacheBlobs {
211 blob, err := p.getBlob(create)
213 err = fmt.Errorf("getting blob: %w", err)
216 n, err = atIo(blob)(b, off)
221 if !errors.As(err, &se) {
224 // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
225 // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
226 // because they may be attached to names that have since moved on to another blob.
227 if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
231 // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
232 // might be possible to skip to this version if we don't cache blobs.
233 blob, err = p.getBlob(create)
235 err = fmt.Errorf("getting blob: %w", err)
238 return atIo(blob)(b, off)
241 func (p SquirrelBlob) ReadAt(b []byte, off int64) (n int, err error) {
242 return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
247 func (p SquirrelBlob) WriteAt(b []byte, off int64) (n int, err error) {
248 return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
253 func (p piece) MarkComplete() error {
256 err := sqlitex.Exec(p.conn, "update blob set verified=true where name=?", nil, p.name)
260 changes := p.conn.Changes()
267 func (p SquirrelBlob) forgetBlob() {
268 blob, ok := p.blobs[p.name]
273 delete(p.blobs, p.name)
276 func (p piece) MarkNotComplete() error {
279 return sqlitex.Exec(p.conn, "update blob set verified=false where name=?", nil, p.name)
282 func (p piece) Completion() (ret storage.Completion) {
285 err := sqlitex.Exec(p.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
286 ret.Complete = stmt.ColumnInt(0) != 0
296 func (p SquirrelBlob) getBlob(create bool) (*sqlite.Blob, error) {
297 blob, ok := p.blobs[p.name]
299 rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
301 return nil, fmt.Errorf("getting rowid for blob: %w", err)
303 blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
309 runtime.SetFinalizer(herp, func(*byte) {
312 // Note there's no guarantee that the finalizer fired while this blob is the same
313 // one in the blob cache. It might be possible to rework this so that we check, or
314 // strip finalizers as appropriate.
318 p.blobs[p.name] = blob