]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/direct.go
Set smarter defaults
[btrtrc.git] / storage / sqlite / direct.go
1 package sqliteStorage
2
3 import (
4         "errors"
5         "fmt"
6         "runtime"
7         "sync"
8         "time"
9
10         "crawshaw.io/sqlite"
11         "crawshaw.io/sqlite/sqlitex"
12         "github.com/anacrolix/torrent/metainfo"
13         "github.com/anacrolix/torrent/storage"
14 )
15
16 type NewDirectStorageOpts struct {
17         NewConnOpts
18         InitDbOpts
19         InitConnOpts
20         GcBlobs           bool
21         NoCacheBlobs      bool
22         BlobFlushInterval time.Duration
23 }
24
25 // A convenience function that creates a connection pool, resource provider, and a pieces storage
26 // ClientImpl and returns them all with a Close attached.
27 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
28         conn, err := newConn(opts.NewConnOpts)
29         if err != nil {
30                 return
31         }
32         if opts.PageSize == 0 {
33                 // The largest size sqlite supports. I think we want this to be the smallest piece size we
34                 // can expect, which is probably 1<<17.
35                 opts.PageSize = 1 << 16
36         }
37         err = initDatabase(conn, opts.InitDbOpts)
38         if err != nil {
39                 conn.Close()
40                 return
41         }
42         err = initConn(conn, opts.InitConnOpts)
43         if err != nil {
44                 conn.Close()
45                 return
46         }
47         if opts.BlobFlushInterval == 0 && !opts.GcBlobs {
48                 // This is influenced by typical busy timeouts, of 5-10s. We want to give other connections
49                 // a few chances at getting a transaction through.
50                 opts.BlobFlushInterval = time.Second
51         }
52         cl := &client{
53                 conn:  conn,
54                 blobs: make(map[string]*sqlite.Blob),
55                 opts:  opts,
56         }
57         if opts.BlobFlushInterval != 0 {
58                 cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
59         }
60         return cl, nil
61 }
62
63 type client struct {
64         l           sync.Mutex
65         conn        conn
66         blobs       map[string]*sqlite.Blob
67         blobFlusher *time.Timer
68         opts        NewDirectStorageOpts
69         closed      bool
70 }
71
72 func (c *client) blobFlusherFunc() {
73         c.l.Lock()
74         defer c.l.Unlock()
75         c.flushBlobs()
76         if !c.closed {
77                 c.blobFlusher.Reset(c.opts.BlobFlushInterval)
78         }
79 }
80
81 func (c *client) flushBlobs() {
82         for key, b := range c.blobs {
83                 // Need the lock to prevent racing with the GC finalizers.
84                 b.Close()
85                 delete(c.blobs, key)
86         }
87 }
88
89 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
90         return torrent{c}, nil
91 }
92
93 func (c *client) Close() error {
94         c.l.Lock()
95         defer c.l.Unlock()
96         c.flushBlobs()
97         c.closed = true
98         if c.opts.BlobFlushInterval != 0 {
99                 c.blobFlusher.Stop()
100         }
101         return c.conn.Close()
102 }
103
104 type torrent struct {
105         c *client
106 }
107
108 func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
109         rowidOk := false
110         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
111                 if rowidOk {
112                         panic("expected at most one row")
113                 }
114                 // TODO: How do we know if we got this wrong?
115                 rowid = stmt.ColumnInt64(0)
116                 rowidOk = true
117                 return nil
118         }, name)
119         if err != nil {
120                 return
121         }
122         if rowidOk {
123                 return
124         }
125         if !create {
126                 err = errors.New("no existing row")
127                 return
128         }
129         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
130         if err != nil {
131                 return
132         }
133         rowid = c.LastInsertRowID()
134         return
135 }
136
137 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
138         t.c.l.Lock()
139         defer t.c.l.Unlock()
140         name := p.Hash().HexString()
141         return piece{
142                 name,
143                 p.Length(),
144                 t.c,
145         }
146 }
147
148 func (t torrent) Close() error {
149         return nil
150 }
151
152 type piece struct {
153         name   string
154         length int64
155         *client
156 }
157
158 func (p piece) doAtIoWithBlob(
159         atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
160         b []byte,
161         off int64,
162         create bool,
163 ) (n int, err error) {
164         p.l.Lock()
165         defer p.l.Unlock()
166         if p.opts.NoCacheBlobs {
167                 defer p.forgetBlob()
168         }
169         blob, err := p.getBlob(create)
170         if err != nil {
171                 err = fmt.Errorf("getting blob: %w", err)
172                 return
173         }
174         n, err = atIo(blob)(b, off)
175         if err == nil {
176                 return
177         }
178         var se sqlite.Error
179         if !errors.As(err, &se) {
180                 return
181         }
182         // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
183         // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
184         // because they may be attached to names that have since moved on to another blob.
185         if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
186                 return
187         }
188         p.forgetBlob()
189         // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
190         // might be possible to skip to this version if we don't cache blobs.
191         blob, err = p.getBlob(create)
192         if err != nil {
193                 err = fmt.Errorf("getting blob: %w", err)
194                 return
195         }
196         return atIo(blob)(b, off)
197 }
198
199 func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
200         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
201                 return blob.ReadAt
202         }, b, off, false)
203 }
204
205 func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
206         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
207                 return blob.WriteAt
208         }, b, off, true)
209 }
210
211 func (p piece) MarkComplete() error {
212         p.l.Lock()
213         defer p.l.Unlock()
214         err := sqlitex.Exec(p.conn, "update blob set verified=true where name=?", nil, p.name)
215         if err != nil {
216                 return err
217         }
218         changes := p.conn.Changes()
219         if changes != 1 {
220                 panic(changes)
221         }
222         return nil
223 }
224
225 func (p piece) forgetBlob() {
226         blob, ok := p.blobs[p.name]
227         if !ok {
228                 return
229         }
230         blob.Close()
231         delete(p.blobs, p.name)
232 }
233
234 func (p piece) MarkNotComplete() error {
235         p.l.Lock()
236         defer p.l.Unlock()
237         return sqlitex.Exec(p.conn, "update blob set verified=false where name=?", nil, p.name)
238 }
239
240 func (p piece) Completion() (ret storage.Completion) {
241         p.l.Lock()
242         defer p.l.Unlock()
243         err := sqlitex.Exec(p.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
244                 ret.Complete = stmt.ColumnInt(0) != 0
245                 return nil
246         }, p.name)
247         ret.Ok = err == nil
248         if err != nil {
249                 panic(err)
250         }
251         return
252 }
253
254 func (p piece) getBlob(create bool) (*sqlite.Blob, error) {
255         blob, ok := p.blobs[p.name]
256         if !ok {
257                 rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
258                 if err != nil {
259                         return nil, fmt.Errorf("getting rowid for blob: %w", err)
260                 }
261                 blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
262                 if err != nil {
263                         panic(err)
264                 }
265                 if p.opts.GcBlobs {
266                         herp := new(byte)
267                         runtime.SetFinalizer(herp, func(*byte) {
268                                 p.l.Lock()
269                                 defer p.l.Unlock()
270                                 // Note there's no guarantee that the finalizer fired while this blob is the same
271                                 // one in the blob cache. It might be possible to rework this so that we check, or
272                                 // strip finalizers as appropriate.
273                                 blob.Close()
274                         })
275                 }
276                 p.blobs[p.name] = blob
277         }
278         return blob, nil
279 }