]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/direct.go
Progress testing without cgo a bit
[btrtrc.git] / storage / sqlite / direct.go
1 //go:build cgo
2 // +build cgo
3
4 package sqliteStorage
5
6 import (
7         "errors"
8         "fmt"
9         "runtime"
10         "sync"
11         "time"
12
13         "crawshaw.io/sqlite"
14         "crawshaw.io/sqlite/sqlitex"
15
16         "github.com/anacrolix/torrent/metainfo"
17         "github.com/anacrolix/torrent/storage"
18 )
19
20 type NewDirectStorageOpts struct {
21         NewConnOpts
22         InitDbOpts
23         InitConnOpts
24         GcBlobs           bool
25         NoCacheBlobs      bool
26         BlobFlushInterval time.Duration
27 }
28
29 // A convenience function that creates a connection pool, resource provider, and a pieces storage
30 // ClientImpl and returns them all with a Close attached.
31 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
32         conn, err := newConn(opts.NewConnOpts)
33         if err != nil {
34                 return
35         }
36         if opts.PageSize == 0 {
37                 // The largest size sqlite supports. I think we want this to be the smallest piece size we
38                 // can expect, which is probably 1<<17.
39                 opts.PageSize = 1 << 16
40         }
41         err = initDatabase(conn, opts.InitDbOpts)
42         if err != nil {
43                 conn.Close()
44                 return
45         }
46         err = initConn(conn, opts.InitConnOpts)
47         if err != nil {
48                 conn.Close()
49                 return
50         }
51         if opts.BlobFlushInterval == 0 && !opts.GcBlobs {
52                 // This is influenced by typical busy timeouts, of 5-10s. We want to give other connections
53                 // a few chances at getting a transaction through.
54                 opts.BlobFlushInterval = time.Second
55         }
56         cl := &client{
57                 conn:  conn,
58                 blobs: make(map[string]*sqlite.Blob),
59                 opts:  opts,
60         }
61         if opts.BlobFlushInterval != 0 {
62                 cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
63         }
64         return cl, nil
65 }
66
67 type client struct {
68         l           sync.Mutex
69         conn        conn
70         blobs       map[string]*sqlite.Blob
71         blobFlusher *time.Timer
72         opts        NewDirectStorageOpts
73         closed      bool
74 }
75
76 func (c *client) blobFlusherFunc() {
77         c.l.Lock()
78         defer c.l.Unlock()
79         c.flushBlobs()
80         if !c.closed {
81                 c.blobFlusher.Reset(c.opts.BlobFlushInterval)
82         }
83 }
84
85 func (c *client) flushBlobs() {
86         for key, b := range c.blobs {
87                 // Need the lock to prevent racing with the GC finalizers.
88                 b.Close()
89                 delete(c.blobs, key)
90         }
91 }
92
93 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
94         return torrent{c}, nil
95 }
96
97 func (c *client) Close() error {
98         c.l.Lock()
99         defer c.l.Unlock()
100         c.flushBlobs()
101         c.closed = true
102         if c.opts.BlobFlushInterval != 0 {
103                 c.blobFlusher.Stop()
104         }
105         return c.conn.Close()
106 }
107
108 type torrent struct {
109         c *client
110 }
111
112 func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
113         rowidOk := false
114         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
115                 if rowidOk {
116                         panic("expected at most one row")
117                 }
118                 // TODO: How do we know if we got this wrong?
119                 rowid = stmt.ColumnInt64(0)
120                 rowidOk = true
121                 return nil
122         }, name)
123         if err != nil {
124                 return
125         }
126         if rowidOk {
127                 return
128         }
129         if !create {
130                 err = errors.New("no existing row")
131                 return
132         }
133         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
134         if err != nil {
135                 return
136         }
137         rowid = c.LastInsertRowID()
138         return
139 }
140
141 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
142         t.c.l.Lock()
143         defer t.c.l.Unlock()
144         name := p.Hash().HexString()
145         return piece{
146                 name,
147                 p.Length(),
148                 t.c,
149         }
150 }
151
152 func (t torrent) Close() error {
153         return nil
154 }
155
156 type piece struct {
157         name   string
158         length int64
159         *client
160 }
161
162 func (p piece) doAtIoWithBlob(
163         atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
164         b []byte,
165         off int64,
166         create bool,
167 ) (n int, err error) {
168         p.l.Lock()
169         defer p.l.Unlock()
170         if p.opts.NoCacheBlobs {
171                 defer p.forgetBlob()
172         }
173         blob, err := p.getBlob(create)
174         if err != nil {
175                 err = fmt.Errorf("getting blob: %w", err)
176                 return
177         }
178         n, err = atIo(blob)(b, off)
179         if err == nil {
180                 return
181         }
182         var se sqlite.Error
183         if !errors.As(err, &se) {
184                 return
185         }
186         // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
187         // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
188         // because they may be attached to names that have since moved on to another blob.
189         if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
190                 return
191         }
192         p.forgetBlob()
193         // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
194         // might be possible to skip to this version if we don't cache blobs.
195         blob, err = p.getBlob(create)
196         if err != nil {
197                 err = fmt.Errorf("getting blob: %w", err)
198                 return
199         }
200         return atIo(blob)(b, off)
201 }
202
203 func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
204         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
205                 return blob.ReadAt
206         }, b, off, false)
207 }
208
209 func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
210         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
211                 return blob.WriteAt
212         }, b, off, true)
213 }
214
215 func (p piece) MarkComplete() error {
216         p.l.Lock()
217         defer p.l.Unlock()
218         err := sqlitex.Exec(p.conn, "update blob set verified=true where name=?", nil, p.name)
219         if err != nil {
220                 return err
221         }
222         changes := p.conn.Changes()
223         if changes != 1 {
224                 panic(changes)
225         }
226         return nil
227 }
228
229 func (p piece) forgetBlob() {
230         blob, ok := p.blobs[p.name]
231         if !ok {
232                 return
233         }
234         blob.Close()
235         delete(p.blobs, p.name)
236 }
237
238 func (p piece) MarkNotComplete() error {
239         p.l.Lock()
240         defer p.l.Unlock()
241         return sqlitex.Exec(p.conn, "update blob set verified=false where name=?", nil, p.name)
242 }
243
244 func (p piece) Completion() (ret storage.Completion) {
245         p.l.Lock()
246         defer p.l.Unlock()
247         err := sqlitex.Exec(p.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
248                 ret.Complete = stmt.ColumnInt(0) != 0
249                 return nil
250         }, p.name)
251         ret.Ok = err == nil
252         if err != nil {
253                 panic(err)
254         }
255         return
256 }
257
258 func (p piece) getBlob(create bool) (*sqlite.Blob, error) {
259         blob, ok := p.blobs[p.name]
260         if !ok {
261                 rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
262                 if err != nil {
263                         return nil, fmt.Errorf("getting rowid for blob: %w", err)
264                 }
265                 blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
266                 if err != nil {
267                         panic(err)
268                 }
269                 if p.opts.GcBlobs {
270                         herp := new(byte)
271                         runtime.SetFinalizer(herp, func(*byte) {
272                                 p.l.Lock()
273                                 defer p.l.Unlock()
274                                 // Note there's no guarantee that the finalizer fired while this blob is the same
275                                 // one in the blob cache. It might be possible to rework this so that we check, or
276                                 // strip finalizers as appropriate.
277                                 blob.Close()
278                         })
279                 }
280                 p.blobs[p.name] = blob
281         }
282         return blob, nil
283 }