10 "crawshaw.io/sqlite/sqlitex"
11 "github.com/anacrolix/torrent/metainfo"
12 "github.com/anacrolix/torrent/storage"
15 type NewDirectStorageOpts struct {
21 BlobFlushInterval time.Duration
24 // A convenience function that creates a connection pool, resource provider, and a pieces storage
25 // ClientImpl and returns them all with a Close attached.
26 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
27 conn, err := newConn(opts.NewConnOpts)
31 err = initConn(conn, opts.InitConnOpts)
36 err = initDatabase(conn, opts.InitDbOpts)
42 blobs: make(map[string]*sqlite.Blob),
45 if opts.BlobFlushInterval != 0 {
46 cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
54 blobs map[string]*sqlite.Blob
55 blobFlusher *time.Timer
56 opts NewDirectStorageOpts
60 func (c *client) blobFlusherFunc() {
65 c.blobFlusher.Reset(c.opts.BlobFlushInterval)
69 func (c *client) flushBlobs() {
70 for key, b := range c.blobs {
71 // Need the lock to prevent racing with the GC finalizers.
77 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
78 return torrent{c}, nil
81 func (c *client) Close() error {
86 if c.opts.BlobFlushInterval != 0 {
96 func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
97 err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
98 rowid = stmt.ColumnInt64(0)
107 err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
111 rowid = c.LastInsertRowID()
115 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
118 name := p.Hash().HexString()
126 func (t torrent) Close() error {
136 func (p2 piece) doAtIoWithBlob(
137 atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
140 ) (n int, err error) {
143 if !p2.opts.CacheBlobs {
144 defer p2.forgetBlob()
146 n, err = atIo(p2.getBlob())(p, off)
151 if !errors.As(err, &se) {
154 if se.Code != sqlite.SQLITE_ABORT && !(p2.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
158 return atIo(p2.getBlob())(p, off)
161 func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
162 return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
167 func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) {
168 return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
173 func (p2 piece) MarkComplete() error {
176 err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name)
180 changes := p2.conn.Changes()
187 func (p2 piece) forgetBlob() {
188 blob, ok := p2.blobs[p2.name]
193 delete(p2.blobs, p2.name)
196 func (p2 piece) MarkNotComplete() error {
197 return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name)
200 func (p2 piece) Completion() (ret storage.Completion) {
203 err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
204 ret.Complete = stmt.ColumnInt(0) != 0
214 func (p2 piece) getBlob() *sqlite.Blob {
215 blob, ok := p2.blobs[p2.name]
217 rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
221 blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
227 runtime.SetFinalizer(herp, func(*byte) {
233 p2.blobs[p2.name] = blob