]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/direct.go
Don't create blobs when reading
[btrtrc.git] / storage / sqlite / direct.go
1 package sqliteStorage
2
3 import (
4         "errors"
5         "fmt"
6         "runtime"
7         "sync"
8         "time"
9
10         "crawshaw.io/sqlite"
11         "crawshaw.io/sqlite/sqlitex"
12         "github.com/anacrolix/torrent/metainfo"
13         "github.com/anacrolix/torrent/storage"
14 )
15
16 type NewDirectStorageOpts struct {
17         NewConnOpts
18         InitDbOpts
19         InitConnOpts
20         GcBlobs           bool
21         CacheBlobs        bool
22         BlobFlushInterval time.Duration
23 }
24
25 // A convenience function that creates a connection pool, resource provider, and a pieces storage
26 // ClientImpl and returns them all with a Close attached.
27 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
28         conn, err := newConn(opts.NewConnOpts)
29         if err != nil {
30                 return
31         }
32         err = initConn(conn, opts.InitConnOpts)
33         if err != nil {
34                 conn.Close()
35                 return
36         }
37         err = initDatabase(conn, opts.InitDbOpts)
38         if err != nil {
39                 return
40         }
41         cl := &client{
42                 conn:  conn,
43                 blobs: make(map[string]*sqlite.Blob),
44                 opts:  opts,
45         }
46         if opts.BlobFlushInterval != 0 {
47                 cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
48         }
49         return cl, nil
50 }
51
52 type client struct {
53         l           sync.Mutex
54         conn        conn
55         blobs       map[string]*sqlite.Blob
56         blobFlusher *time.Timer
57         opts        NewDirectStorageOpts
58         closed      bool
59 }
60
61 func (c *client) blobFlusherFunc() {
62         c.l.Lock()
63         defer c.l.Unlock()
64         c.flushBlobs()
65         if !c.closed {
66                 c.blobFlusher.Reset(c.opts.BlobFlushInterval)
67         }
68 }
69
70 func (c *client) flushBlobs() {
71         for key, b := range c.blobs {
72                 // Need the lock to prevent racing with the GC finalizers.
73                 b.Close()
74                 delete(c.blobs, key)
75         }
76 }
77
78 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
79         return torrent{c}, nil
80 }
81
82 func (c *client) Close() error {
83         c.l.Lock()
84         defer c.l.Unlock()
85         c.flushBlobs()
86         c.closed = true
87         if c.opts.BlobFlushInterval != 0 {
88                 c.blobFlusher.Stop()
89         }
90         return c.conn.Close()
91 }
92
93 type torrent struct {
94         c *client
95 }
96
97 func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
98         rowidOk := false
99         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
100                 if rowidOk {
101                         panic("expected at most one row")
102                 }
103                 // TODO: How do we know if we got this wrong?
104                 rowid = stmt.ColumnInt64(0)
105                 rowidOk = true
106                 return nil
107         }, name)
108         if err != nil {
109                 return
110         }
111         if rowidOk {
112                 return
113         }
114         if !create {
115                 err = errors.New("no existing row")
116                 return
117         }
118         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
119         if err != nil {
120                 return
121         }
122         rowid = c.LastInsertRowID()
123         return
124 }
125
126 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
127         t.c.l.Lock()
128         defer t.c.l.Unlock()
129         name := p.Hash().HexString()
130         return piece{
131                 name,
132                 p.Length(),
133                 t.c,
134         }
135 }
136
137 func (t torrent) Close() error {
138         return nil
139 }
140
141 type piece struct {
142         name   string
143         length int64
144         *client
145 }
146
147 func (p piece) doAtIoWithBlob(
148         atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
149         b []byte,
150         off int64,
151         create bool,
152 ) (n int, err error) {
153         p.l.Lock()
154         defer p.l.Unlock()
155         if !p.opts.CacheBlobs {
156                 defer p.forgetBlob()
157         }
158         blob, err := p.getBlob(create)
159         if err != nil {
160                 err = fmt.Errorf("getting blob: %w", err)
161                 return
162         }
163         n, err = atIo(blob)(b, off)
164         if err == nil {
165                 return
166         }
167         var se sqlite.Error
168         if !errors.As(err, &se) {
169                 return
170         }
171         // "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
172         // if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
173         // because they may be attached to names that have since moved on to another blob.
174         if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
175                 return
176         }
177         p.forgetBlob()
178         // Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
179         // might be possible to skip to this version if we don't cache blobs.
180         blob, err = p.getBlob(create)
181         if err != nil {
182                 err = fmt.Errorf("getting blob: %w", err)
183                 return
184         }
185         return atIo(blob)(b, off)
186 }
187
188 func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
189         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
190                 return blob.ReadAt
191         }, b, off, false)
192 }
193
194 func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
195         return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
196                 return blob.WriteAt
197         }, b, off, true)
198 }
199
200 func (p piece) MarkComplete() error {
201         p.l.Lock()
202         defer p.l.Unlock()
203         err := sqlitex.Exec(p.conn, "update blob set verified=true where name=?", nil, p.name)
204         if err != nil {
205                 return err
206         }
207         changes := p.conn.Changes()
208         if changes != 1 {
209                 panic(changes)
210         }
211         return nil
212 }
213
214 func (p piece) forgetBlob() {
215         blob, ok := p.blobs[p.name]
216         if !ok {
217                 return
218         }
219         blob.Close()
220         delete(p.blobs, p.name)
221 }
222
223 func (p piece) MarkNotComplete() error {
224         return sqlitex.Exec(p.conn, "update blob set verified=false where name=?", nil, p.name)
225 }
226
227 func (p piece) Completion() (ret storage.Completion) {
228         p.l.Lock()
229         defer p.l.Unlock()
230         err := sqlitex.Exec(p.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
231                 ret.Complete = stmt.ColumnInt(0) != 0
232                 return nil
233         }, p.name)
234         ret.Ok = err == nil
235         if err != nil {
236                 panic(err)
237         }
238         return
239 }
240
241 func (p piece) getBlob(create bool) (*sqlite.Blob, error) {
242         blob, ok := p.blobs[p.name]
243         if !ok {
244                 rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
245                 if err != nil {
246                         return nil, fmt.Errorf("getting rowid for blob: %w", err)
247                 }
248                 blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
249                 if err != nil {
250                         panic(err)
251                 }
252                 if p.opts.GcBlobs {
253                         herp := new(byte)
254                         runtime.SetFinalizer(herp, func(*byte) {
255                                 p.l.Lock()
256                                 defer p.l.Unlock()
257                                 // Note there's no guarantee that the finalizer fired while this blob is the same
258                                 // one in the blob cache. It might be possible to rework this so that we check, or
259                                 // strip finalizers as appropriate.
260                                 blob.Close()
261                         })
262                 }
263                 p.blobs[p.name] = blob
264         }
265         return blob, nil
266 }