]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/direct.go
Set page size before initializing connections
[btrtrc.git] / storage / sqlite / direct.go
1 package sqliteStorage
2
3 import (
4         "errors"
5         "fmt"
6         "runtime"
7         "sync"
8         "time"
9
10         "crawshaw.io/sqlite"
11         "crawshaw.io/sqlite/sqlitex"
12         "github.com/anacrolix/torrent/metainfo"
13         "github.com/anacrolix/torrent/storage"
14 )
15
16 type NewDirectStorageOpts struct {
17         NewConnOpts
18         InitDbOpts
19         InitConnOpts
20         GcBlobs           bool
21         CacheBlobs        bool
22         BlobFlushInterval time.Duration
23 }
24
25 // A convenience function that creates a connection pool, resource provider, and a pieces storage
26 // ClientImpl and returns them all with a Close attached.
27 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
28         conn, err := newConn(opts.NewConnOpts)
29         if err != nil {
30                 return
31         }
32         err = initDatabase(conn, opts.InitDbOpts)
33         if err != nil {
34                 conn.Close()
35                 return
36         }
37         err = initConn(conn, opts.InitConnOpts)
38         if err != nil {
39                 conn.Close()
40                 return
41         }
42         cl := &client{
43                 conn:  conn,
44                 blobs: make(map[string]*sqlite.Blob),
45                 opts:  opts,
46         }
47         if opts.BlobFlushInterval != 0 {
48                 cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
49         }
50         return cl, nil
51 }
52
53 type client struct {
54         l           sync.Mutex
55         conn        conn
56         blobs       map[string]*sqlite.Blob
57         blobFlusher *time.Timer
58         opts        NewDirectStorageOpts
59         closed      bool
60 }
61
62 func (c *client) blobFlusherFunc() {
63         c.l.Lock()
64         defer c.l.Unlock()
65         c.flushBlobs()
66         if !c.closed {
67                 c.blobFlusher.Reset(c.opts.BlobFlushInterval)
68         }
69 }
70
71 func (c *client) flushBlobs() {
72         for key, b := range c.blobs {
73                 // Need the lock to prevent racing with the GC finalizers.
74                 b.Close()
75                 delete(c.blobs, key)
76         }
77 }
78
79 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
80         return torrent{c}, nil
81 }
82
83 func (c *client) Close() error {
84         c.l.Lock()
85         defer c.l.Unlock()
86         c.flushBlobs()
87         c.closed = true
88         if c.opts.BlobFlushInterval != 0 {
89                 c.blobFlusher.Stop()
90         }
91         return c.conn.Close()
92 }
93
94 type torrent struct {
95         c *client
96 }
97
98 func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
99         rowidOk := false
100         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
101                 if rowidOk {
102                         panic("expected at most one row")
103                 }
104                 // TODO: How do we know if we got this wrong?
105                 rowid = stmt.ColumnInt64(0)
106                 rowidOk = true
107                 return nil
108         }, name)
109         if err != nil {
110                 return
111         }
112         if rowidOk {
113                 return
114         }
115         if !create {
116                 err = errors.New("no existing row")
117                 return
118         }
119         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
120         if err != nil {
121                 return
122         }
123         rowid = c.LastInsertRowID()
124         return
125 }
126
127 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
128         t.c.l.Lock()
129         defer t.c.l.Unlock()
130         name := p.Hash().HexString()
131         return piece{
132                 name,
133                 p.Length(),
134                 t.c,
135         }
136 }
137
138 func (t torrent) Close() error {
139         return nil
140 }
141
142 type piece struct {
143         name   string
144         length int64
145         *client
146 }
147
148 func (p piece) doAtIoWithBlob(
149         atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
150         b []byte,
151         off int64,
152         create bool,
153 ) (n int, err error) {
154         p.l.Lock()
155         defer p.l.Unlock()
156         if !p.opts.CacheBlobs {
157                 defer p.forgetBlob()
158         }
159         blob, err := p.getBlob(create)
160         if err != nil {
161                 err = fmt.Errorf("getting blob: %w", err)
162                 return
163         }
164         n, err = atIo(blob)(b, off)
165         if err == nil {
166                 return
167         }
168         var se sqlite.Error
169         if !errors.As(err, &se) {
170                 return
171         }
172         // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
173         // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
174         // because they may be attached to names that have since moved on to another blob.
175         if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
176                 return
177         }
178         p.forgetBlob()
179         // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
180         // might be possible to skip to this version if we don't cache blobs.
181         blob, err = p.getBlob(create)
182         if err != nil {
183                 err = fmt.Errorf("getting blob: %w", err)
184                 return
185         }
186         return atIo(blob)(b, off)
187 }
188
189 func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
190         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
191                 return blob.ReadAt
192         }, b, off, false)
193 }
194
195 func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
196         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
197                 return blob.WriteAt
198         }, b, off, true)
199 }
200
201 func (p piece) MarkComplete() error {
202         p.l.Lock()
203         defer p.l.Unlock()
204         err := sqlitex.Exec(p.conn, "update blob set verified=true where name=?", nil, p.name)
205         if err != nil {
206                 return err
207         }
208         changes := p.conn.Changes()
209         if changes != 1 {
210                 panic(changes)
211         }
212         return nil
213 }
214
215 func (p piece) forgetBlob() {
216         blob, ok := p.blobs[p.name]
217         if !ok {
218                 return
219         }
220         blob.Close()
221         delete(p.blobs, p.name)
222 }
223
224 func (p piece) MarkNotComplete() error {
225         p.l.Lock()
226         defer p.l.Unlock()
227         return sqlitex.Exec(p.conn, "update blob set verified=false where name=?", nil, p.name)
228 }
229
230 func (p piece) Completion() (ret storage.Completion) {
231         p.l.Lock()
232         defer p.l.Unlock()
233         err := sqlitex.Exec(p.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
234                 ret.Complete = stmt.ColumnInt(0) != 0
235                 return nil
236         }, p.name)
237         ret.Ok = err == nil
238         if err != nil {
239                 panic(err)
240         }
241         return
242 }
243
244 func (p piece) getBlob(create bool) (*sqlite.Blob, error) {
245         blob, ok := p.blobs[p.name]
246         if !ok {
247                 rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
248                 if err != nil {
249                         return nil, fmt.Errorf("getting rowid for blob: %w", err)
250                 }
251                 blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
252                 if err != nil {
253                         panic(err)
254                 }
255                 if p.opts.GcBlobs {
256                         herp := new(byte)
257                         runtime.SetFinalizer(herp, func(*byte) {
258                                 p.l.Lock()
259                                 defer p.l.Unlock()
260                                 // Note there's no guarantee that the finalizer fired while this blob is the same
261                                 // one in the blob cache. It might be possible to rework this so that we check, or
262                                 // strip finalizers as appropriate.
263                                 blob.Close()
264                         })
265                 }
266                 p.blobs[p.name] = blob
267         }
268         return blob, nil
269 }