]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/direct.go
Begin extracting 'squirrel' from storage/sqlite
[btrtrc.git] / storage / sqlite / direct.go
1 //go:build cgo
2 // +build cgo
3
4 package sqliteStorage
5
6 import (
7         "errors"
8         "fmt"
9         "runtime"
10         "sync"
11         "time"
12
13         "crawshaw.io/sqlite"
14         "crawshaw.io/sqlite/sqlitex"
15
16         "github.com/anacrolix/torrent/metainfo"
17         "github.com/anacrolix/torrent/storage"
18 )
19
20 type NewDirectStorageOpts struct {
21         NewConnOpts
22         InitDbOpts
23         InitConnOpts
24         GcBlobs           bool
25         NoCacheBlobs      bool
26         BlobFlushInterval time.Duration
27 }
28
29 func NewSquirrelCache(opts NewDirectStorageOpts) (_ *SquirrelCache, err error) {
30         conn, err := newConn(opts.NewConnOpts)
31         if err != nil {
32                 return
33         }
34         if opts.PageSize == 0 {
35                 // The largest size sqlite supports. I think we want this to be the smallest SquirrelBlob size we
36                 // can expect, which is probably 1<<17.
37                 opts.PageSize = 1 << 16
38         }
39         err = initDatabase(conn, opts.InitDbOpts)
40         if err != nil {
41                 conn.Close()
42                 return
43         }
44         err = initConn(conn, opts.InitConnOpts)
45         if err != nil {
46                 conn.Close()
47                 return
48         }
49         if opts.BlobFlushInterval == 0 && !opts.GcBlobs {
50                 // This is influenced by typical busy timeouts, of 5-10s. We want to give other connections
51                 // a few chances at getting a transaction through.
52                 opts.BlobFlushInterval = time.Second
53         }
54         cl := &SquirrelCache{
55                 conn:  conn,
56                 blobs: make(map[string]*sqlite.Blob),
57                 opts:  opts,
58         }
59         // Avoid race with cl.blobFlusherFunc
60         cl.l.Lock()
61         defer cl.l.Unlock()
62         if opts.BlobFlushInterval != 0 {
63                 cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
64         }
65         cl.capacity = cl.getCapacity
66         return cl, nil
67 }
68
69 // A convenience function that creates a connection pool, resource provider, and a pieces storage
70 // ClientImpl and returns them all with a Close attached.
71 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
72         cache, err := NewSquirrelCache(opts)
73         if err != nil {
74                 return
75         }
76         return &client{cache}, nil
77 }
78
79 func (cl *SquirrelCache) getCapacity() (ret *int64) {
80         cl.l.Lock()
81         defer cl.l.Unlock()
82         err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error {
83                 ret = new(int64)
84                 *ret = stmt.ColumnInt64(0)
85                 return nil
86         })
87         if err != nil {
88                 panic(err)
89         }
90         return
91 }
92
93 type SquirrelCache struct {
94         l           sync.Mutex
95         conn        conn
96         blobs       map[string]*sqlite.Blob
97         blobFlusher *time.Timer
98         opts        NewDirectStorageOpts
99         closed      bool
100         capacity    func() *int64
101 }
102
103 type client struct {
104         *SquirrelCache
105 }
106
107 func (c *SquirrelCache) blobFlusherFunc() {
108         c.l.Lock()
109         defer c.l.Unlock()
110         c.flushBlobs()
111         if !c.closed {
112                 c.blobFlusher.Reset(c.opts.BlobFlushInterval)
113         }
114 }
115
116 func (c *SquirrelCache) flushBlobs() {
117         for key, b := range c.blobs {
118                 // Need the lock to prevent racing with the GC finalizers.
119                 b.Close()
120                 delete(c.blobs, key)
121         }
122 }
123
124 func (c *SquirrelCache) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
125         t := torrent{c}
126         return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil
127 }
128
129 func (c *SquirrelCache) Close() (err error) {
130         c.l.Lock()
131         defer c.l.Unlock()
132         c.flushBlobs()
133         if c.opts.BlobFlushInterval != 0 {
134                 c.blobFlusher.Stop()
135         }
136         if !c.closed {
137                 c.closed = true
138                 err = c.conn.Close()
139                 c.conn = nil
140         }
141         return
142 }
143
144 type torrent struct {
145         c *SquirrelCache
146 }
147
148 func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
149         rowidOk := false
150         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
151                 if rowidOk {
152                         panic("expected at most one row")
153                 }
154                 // TODO: How do we know if we got this wrong?
155                 rowid = stmt.ColumnInt64(0)
156                 rowidOk = true
157                 return nil
158         }, name)
159         if err != nil {
160                 return
161         }
162         if rowidOk {
163                 return
164         }
165         if !create {
166                 err = errors.New("no existing row")
167                 return
168         }
169         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
170         if err != nil {
171                 return
172         }
173         rowid = c.LastInsertRowID()
174         return
175 }
176
177 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
178         name := p.Hash().HexString()
179         return piece{SquirrelBlob{
180                 name,
181                 p.Length(),
182                 t.c,
183         }}
184 }
185
186 func (t torrent) Close() error {
187         return nil
188 }
189
190 type SquirrelBlob struct {
191         name   string
192         length int64
193         *SquirrelCache
194 }
195
196 type piece struct {
197         SquirrelBlob
198 }
199
200 func (p SquirrelBlob) doAtIoWithBlob(
201         atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
202         b []byte,
203         off int64,
204         create bool,
205 ) (n int, err error) {
206         p.l.Lock()
207         defer p.l.Unlock()
208         if p.opts.NoCacheBlobs {
209                 defer p.forgetBlob()
210         }
211         blob, err := p.getBlob(create)
212         if err != nil {
213                 err = fmt.Errorf("getting blob: %w", err)
214                 return
215         }
216         n, err = atIo(blob)(b, off)
217         if err == nil {
218                 return
219         }
220         var se sqlite.Error
221         if !errors.As(err, &se) {
222                 return
223         }
224         // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
225         // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
226         // because they may be attached to names that have since moved on to another blob.
227         if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
228                 return
229         }
230         p.forgetBlob()
231         // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
232         // might be possible to skip to this version if we don't cache blobs.
233         blob, err = p.getBlob(create)
234         if err != nil {
235                 err = fmt.Errorf("getting blob: %w", err)
236                 return
237         }
238         return atIo(blob)(b, off)
239 }
240
241 func (p SquirrelBlob) ReadAt(b []byte, off int64) (n int, err error) {
242         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
243                 return blob.ReadAt
244         }, b, off, false)
245 }
246
247 func (p SquirrelBlob) WriteAt(b []byte, off int64) (n int, err error) {
248         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
249                 return blob.WriteAt
250         }, b, off, true)
251 }
252
253 func (p piece) MarkComplete() error {
254         p.l.Lock()
255         defer p.l.Unlock()
256         err := sqlitex.Exec(p.conn, "update blob set verified=true where name=?", nil, p.name)
257         if err != nil {
258                 return err
259         }
260         changes := p.conn.Changes()
261         if changes != 1 {
262                 panic(changes)
263         }
264         return nil
265 }
266
267 func (p SquirrelBlob) forgetBlob() {
268         blob, ok := p.blobs[p.name]
269         if !ok {
270                 return
271         }
272         blob.Close()
273         delete(p.blobs, p.name)
274 }
275
276 func (p piece) MarkNotComplete() error {
277         p.l.Lock()
278         defer p.l.Unlock()
279         return sqlitex.Exec(p.conn, "update blob set verified=false where name=?", nil, p.name)
280 }
281
282 func (p piece) Completion() (ret storage.Completion) {
283         p.l.Lock()
284         defer p.l.Unlock()
285         err := sqlitex.Exec(p.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
286                 ret.Complete = stmt.ColumnInt(0) != 0
287                 return nil
288         }, p.name)
289         ret.Ok = err == nil
290         if err != nil {
291                 panic(err)
292         }
293         return
294 }
295
296 func (p SquirrelBlob) getBlob(create bool) (*sqlite.Blob, error) {
297         blob, ok := p.blobs[p.name]
298         if !ok {
299                 rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
300                 if err != nil {
301                         return nil, fmt.Errorf("getting rowid for blob: %w", err)
302                 }
303                 blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
304                 if err != nil {
305                         panic(err)
306                 }
307                 if p.opts.GcBlobs {
308                         herp := new(byte)
309                         runtime.SetFinalizer(herp, func(*byte) {
310                                 p.l.Lock()
311                                 defer p.l.Unlock()
312                                 // Note there's no guarantee that the finalizer fired while this blob is the same
313                                 // one in the blob cache. It might be possible to rework this so that we check, or
314                                 // strip finalizers as appropriate.
315                                 blob.Close()
316                         })
317                 }
318                 p.blobs[p.name] = blob
319         }
320         return blob, nil
321 }