]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/direct.go
Merge branch 'request-strategy-rewrite'
[btrtrc.git] / storage / sqlite / direct.go
1 //go:build cgo
2 // +build cgo
3
4 package sqliteStorage
5
6 import (
7         "errors"
8         "fmt"
9         "runtime"
10         "sync"
11         "time"
12
13         "crawshaw.io/sqlite"
14         "crawshaw.io/sqlite/sqlitex"
15
16         "github.com/anacrolix/torrent/metainfo"
17         "github.com/anacrolix/torrent/storage"
18 )
19
20 type NewDirectStorageOpts struct {
21         NewConnOpts
22         InitDbOpts
23         InitConnOpts
24         GcBlobs           bool
25         NoCacheBlobs      bool
26         BlobFlushInterval time.Duration
27 }
28
29 // A convenience function that creates a connection pool, resource provider, and a pieces storage
30 // ClientImpl and returns them all with a Close attached.
31 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
32         conn, err := newConn(opts.NewConnOpts)
33         if err != nil {
34                 return
35         }
36         if opts.PageSize == 0 {
37                 // The largest size sqlite supports. I think we want this to be the smallest piece size we
38                 // can expect, which is probably 1<<17.
39                 opts.PageSize = 1 << 16
40         }
41         err = initDatabase(conn, opts.InitDbOpts)
42         if err != nil {
43                 conn.Close()
44                 return
45         }
46         err = initConn(conn, opts.InitConnOpts)
47         if err != nil {
48                 conn.Close()
49                 return
50         }
51         if opts.BlobFlushInterval == 0 && !opts.GcBlobs {
52                 // This is influenced by typical busy timeouts, of 5-10s. We want to give other connections
53                 // a few chances at getting a transaction through.
54                 opts.BlobFlushInterval = time.Second
55         }
56         cl := &client{
57                 conn:  conn,
58                 blobs: make(map[string]*sqlite.Blob),
59                 opts:  opts,
60         }
61         if opts.BlobFlushInterval != 0 {
62                 cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
63         }
64         cl.capacity = cl.getCapacity
65         return cl, nil
66 }
67
68 func (cl *client) getCapacity() (ret *int64) {
69         cl.l.Lock()
70         defer cl.l.Unlock()
71         err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error {
72                 ret = new(int64)
73                 *ret = stmt.ColumnInt64(0)
74                 return nil
75         })
76         if err != nil {
77                 panic(err)
78         }
79         return
80 }
81
82 type client struct {
83         l           sync.Mutex
84         conn        conn
85         blobs       map[string]*sqlite.Blob
86         blobFlusher *time.Timer
87         opts        NewDirectStorageOpts
88         closed      bool
89         capacity    func() *int64
90 }
91
92 func (c *client) blobFlusherFunc() {
93         c.l.Lock()
94         defer c.l.Unlock()
95         c.flushBlobs()
96         if !c.closed {
97                 c.blobFlusher.Reset(c.opts.BlobFlushInterval)
98         }
99 }
100
101 func (c *client) flushBlobs() {
102         for key, b := range c.blobs {
103                 // Need the lock to prevent racing with the GC finalizers.
104                 b.Close()
105                 delete(c.blobs, key)
106         }
107 }
108
109 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
110         t := torrent{c}
111         return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil
112 }
113
114 func (c *client) Close() error {
115         c.l.Lock()
116         defer c.l.Unlock()
117         c.flushBlobs()
118         c.closed = true
119         if c.opts.BlobFlushInterval != 0 {
120                 c.blobFlusher.Stop()
121         }
122         return c.conn.Close()
123 }
124
125 type torrent struct {
126         c *client
127 }
128
129 func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
130         rowidOk := false
131         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
132                 if rowidOk {
133                         panic("expected at most one row")
134                 }
135                 // TODO: How do we know if we got this wrong?
136                 rowid = stmt.ColumnInt64(0)
137                 rowidOk = true
138                 return nil
139         }, name)
140         if err != nil {
141                 return
142         }
143         if rowidOk {
144                 return
145         }
146         if !create {
147                 err = errors.New("no existing row")
148                 return
149         }
150         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
151         if err != nil {
152                 return
153         }
154         rowid = c.LastInsertRowID()
155         return
156 }
157
158 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
159         t.c.l.Lock()
160         defer t.c.l.Unlock()
161         name := p.Hash().HexString()
162         return piece{
163                 name,
164                 p.Length(),
165                 t.c,
166         }
167 }
168
169 func (t torrent) Close() error {
170         return nil
171 }
172
173 type piece struct {
174         name   string
175         length int64
176         *client
177 }
178
179 func (p piece) doAtIoWithBlob(
180         atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
181         b []byte,
182         off int64,
183         create bool,
184 ) (n int, err error) {
185         p.l.Lock()
186         defer p.l.Unlock()
187         if p.opts.NoCacheBlobs {
188                 defer p.forgetBlob()
189         }
190         blob, err := p.getBlob(create)
191         if err != nil {
192                 err = fmt.Errorf("getting blob: %w", err)
193                 return
194         }
195         n, err = atIo(blob)(b, off)
196         if err == nil {
197                 return
198         }
199         var se sqlite.Error
200         if !errors.As(err, &se) {
201                 return
202         }
203         // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
204         // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
205         // because they may be attached to names that have since moved on to another blob.
206         if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
207                 return
208         }
209         p.forgetBlob()
210         // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
211         // might be possible to skip to this version if we don't cache blobs.
212         blob, err = p.getBlob(create)
213         if err != nil {
214                 err = fmt.Errorf("getting blob: %w", err)
215                 return
216         }
217         return atIo(blob)(b, off)
218 }
219
220 func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
221         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
222                 return blob.ReadAt
223         }, b, off, false)
224 }
225
226 func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
227         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
228                 return blob.WriteAt
229         }, b, off, true)
230 }
231
232 func (p piece) MarkComplete() error {
233         p.l.Lock()
234         defer p.l.Unlock()
235         err := sqlitex.Exec(p.conn, "update blob set verified=true where name=?", nil, p.name)
236         if err != nil {
237                 return err
238         }
239         changes := p.conn.Changes()
240         if changes != 1 {
241                 panic(changes)
242         }
243         return nil
244 }
245
246 func (p piece) forgetBlob() {
247         blob, ok := p.blobs[p.name]
248         if !ok {
249                 return
250         }
251         blob.Close()
252         delete(p.blobs, p.name)
253 }
254
255 func (p piece) MarkNotComplete() error {
256         p.l.Lock()
257         defer p.l.Unlock()
258         return sqlitex.Exec(p.conn, "update blob set verified=false where name=?", nil, p.name)
259 }
260
261 func (p piece) Completion() (ret storage.Completion) {
262         p.l.Lock()
263         defer p.l.Unlock()
264         err := sqlitex.Exec(p.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
265                 ret.Complete = stmt.ColumnInt(0) != 0
266                 return nil
267         }, p.name)
268         ret.Ok = err == nil
269         if err != nil {
270                 panic(err)
271         }
272         return
273 }
274
275 func (p piece) getBlob(create bool) (*sqlite.Blob, error) {
276         blob, ok := p.blobs[p.name]
277         if !ok {
278                 rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
279                 if err != nil {
280                         return nil, fmt.Errorf("getting rowid for blob: %w", err)
281                 }
282                 blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
283                 if err != nil {
284                         panic(err)
285                 }
286                 if p.opts.GcBlobs {
287                         herp := new(byte)
288                         runtime.SetFinalizer(herp, func(*byte) {
289                                 p.l.Lock()
290                                 defer p.l.Unlock()
291                                 // Note there's no guarantee that the finalizer fired while this blob is the same
292                                 // one in the blob cache. It might be possible to rework this so that we check, or
293                                 // strip finalizers as appropriate.
294                                 blob.Close()
295                         })
296                 }
297                 p.blobs[p.name] = blob
298         }
299         return blob, nil
300 }