8 "crawshaw.io/sqlite/sqlitex"
9 "github.com/anacrolix/torrent/metainfo"
10 "github.com/anacrolix/torrent/storage"
13 type NewDirectStorageOpts struct {
15 ProvOpts func(*ProviderOpts)
18 // A convenience function that creates a connection pool, resource provider, and a pieces storage
19 // ClientImpl and returns them all with a Close attached.
20 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
21 conns, provOpts, err := NewPool(opts.NewPoolOpts)
25 if f := opts.ProvOpts; f != nil {
28 provOpts.BatchWrites = false
29 prov, err := NewProvider(conns, provOpts)
36 conn: prov.pool.Get(nil),
37 blobs: make(map[string]*sqlite.Blob),
45 blobs map[string]*sqlite.Blob
48 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
49 return torrent{c}, nil
52 func (c *client) Close() error {
53 for _, b := range c.blobs {
56 c.prov.pool.Put(c.conn)
64 func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
65 err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
66 rowid = stmt.ColumnInt64(0)
75 err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
79 rowid = c.LastInsertRowID()
83 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
86 name := p.Hash().HexString()
87 return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()}
90 func (t torrent) Close() error {
98 blobs map[string]*sqlite.Blob
102 func (p2 piece) doAtIoWithBlob(
103 atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
106 ) (n int, err error) {
109 n, err = atIo(p2.getBlob())(p, off)
111 if !errors.As(err, &se) || se.Code != sqlite.SQLITE_ABORT {
115 return atIo(p2.getBlob())(p, off)
118 func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
119 return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
124 func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) {
125 return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
130 func (p2 piece) MarkComplete() error {
133 err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name)
137 changes := p2.conn.Changes()
144 func (p2 piece) blobWouldExpire() {
145 blob, ok := p2.blobs[p2.name]
150 delete(p2.blobs, p2.name)
153 func (p2 piece) MarkNotComplete() error {
154 return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name)
157 func (p2 piece) Completion() (ret storage.Completion) {
160 err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
161 ret.Complete = stmt.ColumnInt(0) != 0
171 func (p2 piece) closeBlobIfExists() {
172 if b, ok := p2.blobs[p2.name]; ok {
174 delete(p2.blobs, p2.name)
178 func (p2 piece) getBlob() *sqlite.Blob {
179 blob, ok := p2.blobs[p2.name]
181 rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
185 blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
189 p2.blobs[p2.name] = blob