11 "crawshaw.io/sqlite/sqlitex"
12 "github.com/anacrolix/torrent/metainfo"
13 "github.com/anacrolix/torrent/storage"
16 type NewDirectStorageOpts struct {
22 BlobFlushInterval time.Duration
25 // A convenience function that creates a connection pool, resource provider, and a pieces storage
26 // ClientImpl and returns them all with a Close attached.
27 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
28 conn, err := newConn(opts.NewConnOpts)
32 err = initConn(conn, opts.InitConnOpts)
37 err = initDatabase(conn, opts.InitDbOpts)
43 blobs: make(map[string]*sqlite.Blob),
46 if opts.BlobFlushInterval != 0 {
47 cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
55 blobs map[string]*sqlite.Blob
56 blobFlusher *time.Timer
57 opts NewDirectStorageOpts
61 func (c *client) blobFlusherFunc() {
66 c.blobFlusher.Reset(c.opts.BlobFlushInterval)
70 func (c *client) flushBlobs() {
71 for key, b := range c.blobs {
72 // Need the lock to prevent racing with the GC finalizers.
78 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
79 return torrent{c}, nil
82 func (c *client) Close() error {
87 if c.opts.BlobFlushInterval != 0 {
97 func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
99 err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
101 panic("expected at most one row")
103 // TODO: How do we know if we got this wrong?
104 rowid = stmt.ColumnInt64(0)
115 err = errors.New("no existing row")
118 err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
122 rowid = c.LastInsertRowID()
126 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
129 name := p.Hash().HexString()
137 func (t torrent) Close() error {
147 func (p piece) doAtIoWithBlob(
148 atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
152 ) (n int, err error) {
155 if !p.opts.CacheBlobs {
158 blob, err := p.getBlob(create)
160 err = fmt.Errorf("getting blob: %w", err)
163 n, err = atIo(blob)(b, off)
168 if !errors.As(err, &se) {
171 // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
172 // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
173 // because they may be attached to names that have since moved on to another blob.
174 if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
178 // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
179 // might be possible to skip to this version if we don't cache blobs.
180 blob, err = p.getBlob(create)
182 err = fmt.Errorf("getting blob: %w", err)
185 return atIo(blob)(b, off)
188 func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
189 return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
194 func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
195 return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
200 func (p piece) MarkComplete() error {
203 err := sqlitex.Exec(p.conn, "update blob set verified=true where name=?", nil, p.name)
207 changes := p.conn.Changes()
214 func (p piece) forgetBlob() {
215 blob, ok := p.blobs[p.name]
220 delete(p.blobs, p.name)
223 func (p piece) MarkNotComplete() error {
224 return sqlitex.Exec(p.conn, "update blob set verified=false where name=?", nil, p.name)
227 func (p piece) Completion() (ret storage.Completion) {
230 err := sqlitex.Exec(p.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
231 ret.Complete = stmt.ColumnInt(0) != 0
241 func (p piece) getBlob(create bool) (*sqlite.Blob, error) {
242 blob, ok := p.blobs[p.name]
244 rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
246 return nil, fmt.Errorf("getting rowid for blob: %w", err)
248 blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
254 runtime.SetFinalizer(herp, func(*byte) {
257 // Note there's no guarantee that the finalizer fired while this blob is the same
258 // one in the blob cache. It might be possible to rework this so that we check, or
259 // strip finalizers as appropriate.
263 p.blobs[p.name] = blob