]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/direct.go
Set direct sqlite storage conn to nil on close
[btrtrc.git] / storage / sqlite / direct.go
1 //go:build cgo
2 // +build cgo
3
4 package sqliteStorage
5
6 import (
7         "errors"
8         "fmt"
9         "runtime"
10         "sync"
11         "time"
12
13         "crawshaw.io/sqlite"
14         "crawshaw.io/sqlite/sqlitex"
15
16         "github.com/anacrolix/torrent/metainfo"
17         "github.com/anacrolix/torrent/storage"
18 )
19
20 type NewDirectStorageOpts struct {
21         NewConnOpts
22         InitDbOpts
23         InitConnOpts
24         GcBlobs           bool
25         NoCacheBlobs      bool
26         BlobFlushInterval time.Duration
27 }
28
29 // A convenience function that creates a connection pool, resource provider, and a pieces storage
30 // ClientImpl and returns them all with a Close attached.
31 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
32         conn, err := newConn(opts.NewConnOpts)
33         if err != nil {
34                 return
35         }
36         if opts.PageSize == 0 {
37                 // The largest size sqlite supports. I think we want this to be the smallest piece size we
38                 // can expect, which is probably 1<<17.
39                 opts.PageSize = 1 << 16
40         }
41         err = initDatabase(conn, opts.InitDbOpts)
42         if err != nil {
43                 conn.Close()
44                 return
45         }
46         err = initConn(conn, opts.InitConnOpts)
47         if err != nil {
48                 conn.Close()
49                 return
50         }
51         if opts.BlobFlushInterval == 0 && !opts.GcBlobs {
52                 // This is influenced by typical busy timeouts, of 5-10s. We want to give other connections
53                 // a few chances at getting a transaction through.
54                 opts.BlobFlushInterval = time.Second
55         }
56         cl := &client{
57                 conn:  conn,
58                 blobs: make(map[string]*sqlite.Blob),
59                 opts:  opts,
60         }
61         // Avoid race with cl.blobFlusherFunc
62         cl.l.Lock()
63         defer cl.l.Unlock()
64         if opts.BlobFlushInterval != 0 {
65                 cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
66         }
67         cl.capacity = cl.getCapacity
68         return cl, nil
69 }
70
71 func (cl *client) getCapacity() (ret *int64) {
72         cl.l.Lock()
73         defer cl.l.Unlock()
74         err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error {
75                 ret = new(int64)
76                 *ret = stmt.ColumnInt64(0)
77                 return nil
78         })
79         if err != nil {
80                 panic(err)
81         }
82         return
83 }
84
85 type client struct {
86         l           sync.Mutex
87         conn        conn
88         blobs       map[string]*sqlite.Blob
89         blobFlusher *time.Timer
90         opts        NewDirectStorageOpts
91         closed      bool
92         capacity    func() *int64
93 }
94
95 func (c *client) blobFlusherFunc() {
96         c.l.Lock()
97         defer c.l.Unlock()
98         c.flushBlobs()
99         if !c.closed {
100                 c.blobFlusher.Reset(c.opts.BlobFlushInterval)
101         }
102 }
103
104 func (c *client) flushBlobs() {
105         for key, b := range c.blobs {
106                 // Need the lock to prevent racing with the GC finalizers.
107                 b.Close()
108                 delete(c.blobs, key)
109         }
110 }
111
112 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
113         t := torrent{c}
114         return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil
115 }
116
117 func (c *client) Close() (err error) {
118         c.l.Lock()
119         defer c.l.Unlock()
120         c.flushBlobs()
121         if c.opts.BlobFlushInterval != 0 {
122                 c.blobFlusher.Stop()
123         }
124         if !c.closed {
125                 c.closed = true
126                 err = c.conn.Close()
127                 c.conn = nil
128         }
129         return
130 }
131
132 type torrent struct {
133         c *client
134 }
135
136 func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
137         rowidOk := false
138         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
139                 if rowidOk {
140                         panic("expected at most one row")
141                 }
142                 // TODO: How do we know if we got this wrong?
143                 rowid = stmt.ColumnInt64(0)
144                 rowidOk = true
145                 return nil
146         }, name)
147         if err != nil {
148                 return
149         }
150         if rowidOk {
151                 return
152         }
153         if !create {
154                 err = errors.New("no existing row")
155                 return
156         }
157         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
158         if err != nil {
159                 return
160         }
161         rowid = c.LastInsertRowID()
162         return
163 }
164
165 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
166         t.c.l.Lock()
167         defer t.c.l.Unlock()
168         name := p.Hash().HexString()
169         return piece{
170                 name,
171                 p.Length(),
172                 t.c,
173         }
174 }
175
176 func (t torrent) Close() error {
177         return nil
178 }
179
180 type piece struct {
181         name   string
182         length int64
183         *client
184 }
185
186 func (p piece) doAtIoWithBlob(
187         atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
188         b []byte,
189         off int64,
190         create bool,
191 ) (n int, err error) {
192         p.l.Lock()
193         defer p.l.Unlock()
194         if p.opts.NoCacheBlobs {
195                 defer p.forgetBlob()
196         }
197         blob, err := p.getBlob(create)
198         if err != nil {
199                 err = fmt.Errorf("getting blob: %w", err)
200                 return
201         }
202         n, err = atIo(blob)(b, off)
203         if err == nil {
204                 return
205         }
206         var se sqlite.Error
207         if !errors.As(err, &se) {
208                 return
209         }
210         // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
211         // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
212         // because they may be attached to names that have since moved on to another blob.
213         if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
214                 return
215         }
216         p.forgetBlob()
217         // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
218         // might be possible to skip to this version if we don't cache blobs.
219         blob, err = p.getBlob(create)
220         if err != nil {
221                 err = fmt.Errorf("getting blob: %w", err)
222                 return
223         }
224         return atIo(blob)(b, off)
225 }
226
227 func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
228         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
229                 return blob.ReadAt
230         }, b, off, false)
231 }
232
233 func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
234         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
235                 return blob.WriteAt
236         }, b, off, true)
237 }
238
239 func (p piece) MarkComplete() error {
240         p.l.Lock()
241         defer p.l.Unlock()
242         err := sqlitex.Exec(p.conn, "update blob set verified=true where name=?", nil, p.name)
243         if err != nil {
244                 return err
245         }
246         changes := p.conn.Changes()
247         if changes != 1 {
248                 panic(changes)
249         }
250         return nil
251 }
252
253 func (p piece) forgetBlob() {
254         blob, ok := p.blobs[p.name]
255         if !ok {
256                 return
257         }
258         blob.Close()
259         delete(p.blobs, p.name)
260 }
261
262 func (p piece) MarkNotComplete() error {
263         p.l.Lock()
264         defer p.l.Unlock()
265         return sqlitex.Exec(p.conn, "update blob set verified=false where name=?", nil, p.name)
266 }
267
268 func (p piece) Completion() (ret storage.Completion) {
269         p.l.Lock()
270         defer p.l.Unlock()
271         err := sqlitex.Exec(p.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
272                 ret.Complete = stmt.ColumnInt(0) != 0
273                 return nil
274         }, p.name)
275         ret.Ok = err == nil
276         if err != nil {
277                 panic(err)
278         }
279         return
280 }
281
282 func (p piece) getBlob(create bool) (*sqlite.Blob, error) {
283         blob, ok := p.blobs[p.name]
284         if !ok {
285                 rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
286                 if err != nil {
287                         return nil, fmt.Errorf("getting rowid for blob: %w", err)
288                 }
289                 blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
290                 if err != nil {
291                         panic(err)
292                 }
293                 if p.opts.GcBlobs {
294                         herp := new(byte)
295                         runtime.SetFinalizer(herp, func(*byte) {
296                                 p.l.Lock()
297                                 defer p.l.Unlock()
298                                 // Note there's no guarantee that the finalizer fired while this blob is the same
299                                 // one in the blob cache. It might be possible to rework this so that we check, or
300                                 // strip finalizers as appropriate.
301                                 blob.Close()
302                         })
303                 }
304                 p.blobs[p.name] = blob
305         }
306         return blob, nil
307 }