]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/direct.go
Remove unused method
[btrtrc.git] / storage / sqlite / direct.go
1 package sqliteStorage
2
3 import (
4         "errors"
5         "runtime"
6         "sync"
7         "time"
8
9         "crawshaw.io/sqlite"
10         "crawshaw.io/sqlite/sqlitex"
11         "github.com/anacrolix/torrent/metainfo"
12         "github.com/anacrolix/torrent/storage"
13 )
14
15 type NewDirectStorageOpts struct {
16         NewConnOpts
17         InitDbOpts
18         InitConnOpts
19         GcBlobs           bool
20         CacheBlobs        bool
21         BlobFlushInterval time.Duration
22 }
23
24 // A convenience function that creates a connection pool, resource provider, and a pieces storage
25 // ClientImpl and returns them all with a Close attached.
26 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
27         conn, err := newConn(opts.NewConnOpts)
28         if err != nil {
29                 return
30         }
31         err = initConn(conn, opts.InitConnOpts)
32         if err != nil {
33                 conn.Close()
34                 return
35         }
36         err = initDatabase(conn, opts.InitDbOpts)
37         if err != nil {
38                 return
39         }
40         cl := &client{
41                 conn:  conn,
42                 blobs: make(map[string]*sqlite.Blob),
43                 opts:  opts,
44         }
45         if opts.BlobFlushInterval != 0 {
46                 cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
47         }
48         return cl, nil
49 }
50
51 type client struct {
52         l           sync.Mutex
53         conn        conn
54         blobs       map[string]*sqlite.Blob
55         blobFlusher *time.Timer
56         opts        NewDirectStorageOpts
57         closed      bool
58 }
59
60 func (c *client) blobFlusherFunc() {
61         c.l.Lock()
62         defer c.l.Unlock()
63         c.flushBlobs()
64         if !c.closed {
65                 c.blobFlusher.Reset(c.opts.BlobFlushInterval)
66         }
67 }
68
69 func (c *client) flushBlobs() {
70         for key, b := range c.blobs {
71                 // Need the lock to prevent racing with the GC finalizers.
72                 b.Close()
73                 delete(c.blobs, key)
74         }
75 }
76
77 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
78         return torrent{c}, nil
79 }
80
81 func (c *client) Close() error {
82         c.l.Lock()
83         defer c.l.Unlock()
84         c.flushBlobs()
85         c.closed = true
86         if c.opts.BlobFlushInterval != 0 {
87                 c.blobFlusher.Stop()
88         }
89         return c.conn.Close()
90 }
91
92 type torrent struct {
93         c *client
94 }
95
96 func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
97         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
98                 rowid = stmt.ColumnInt64(0)
99                 return nil
100         }, name)
101         if err != nil {
102                 return
103         }
104         if rowid != 0 {
105                 return
106         }
107         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
108         if err != nil {
109                 return
110         }
111         rowid = c.LastInsertRowID()
112         return
113 }
114
115 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
116         t.c.l.Lock()
117         defer t.c.l.Unlock()
118         name := p.Hash().HexString()
119         return piece{
120                 name,
121                 p.Length(),
122                 t.c,
123         }
124 }
125
126 func (t torrent) Close() error {
127         return nil
128 }
129
130 type piece struct {
131         name   string
132         length int64
133         *client
134 }
135
136 func (p2 piece) doAtIoWithBlob(
137         atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
138         p []byte,
139         off int64,
140 ) (n int, err error) {
141         p2.l.Lock()
142         defer p2.l.Unlock()
143         if !p2.opts.CacheBlobs {
144                 defer p2.forgetBlob()
145         }
146         n, err = atIo(p2.getBlob())(p, off)
147         if err == nil {
148                 return
149         }
150         var se sqlite.Error
151         if !errors.As(err, &se) {
152                 return
153         }
154         if se.Code != sqlite.SQLITE_ABORT && !(p2.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
155                 return
156         }
157         p2.forgetBlob()
158         return atIo(p2.getBlob())(p, off)
159 }
160
161 func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
162         return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
163                 return blob.ReadAt
164         }, p, off)
165 }
166
167 func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) {
168         return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
169                 return blob.WriteAt
170         }, p, off)
171 }
172
173 func (p2 piece) MarkComplete() error {
174         p2.l.Lock()
175         defer p2.l.Unlock()
176         err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name)
177         if err != nil {
178                 return err
179         }
180         changes := p2.conn.Changes()
181         if changes != 1 {
182                 panic(changes)
183         }
184         return nil
185 }
186
187 func (p2 piece) forgetBlob() {
188         blob, ok := p2.blobs[p2.name]
189         if !ok {
190                 return
191         }
192         blob.Close()
193         delete(p2.blobs, p2.name)
194 }
195
196 func (p2 piece) MarkNotComplete() error {
197         return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name)
198 }
199
200 func (p2 piece) Completion() (ret storage.Completion) {
201         p2.l.Lock()
202         defer p2.l.Unlock()
203         err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
204                 ret.Complete = stmt.ColumnInt(0) != 0
205                 return nil
206         }, p2.name)
207         ret.Ok = err == nil
208         if err != nil {
209                 panic(err)
210         }
211         return
212 }
213
214 func (p2 piece) getBlob() *sqlite.Blob {
215         blob, ok := p2.blobs[p2.name]
216         if !ok {
217                 rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
218                 if err != nil {
219                         panic(err)
220                 }
221                 blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
222                 if err != nil {
223                         panic(err)
224                 }
225                 if p2.opts.GcBlobs {
226                         herp := new(byte)
227                         runtime.SetFinalizer(herp, func(*byte) {
228                                 p2.l.Lock()
229                                 defer p2.l.Unlock()
230                                 blob.Close()
231                         })
232                 }
233                 p2.blobs[p2.name] = blob
234         }
235         return blob
236 }