]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/direct.go
Benchmark different mmap sizes and journal modes
[btrtrc.git] / storage / sqlite / direct.go
1 package sqliteStorage
2
3 import (
4         "errors"
5         "sync"
6
7         "crawshaw.io/sqlite"
8         "crawshaw.io/sqlite/sqlitex"
9         "github.com/anacrolix/torrent/metainfo"
10         "github.com/anacrolix/torrent/storage"
11 )
12
13 type NewDirectStorageOpts struct {
14         NewConnOpts
15         InitDbOpts
16         InitConnOpts
17 }
18
19 // A convenience function that creates a connection pool, resource provider, and a pieces storage
20 // ClientImpl and returns them all with a Close attached.
21 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
22         conn, err := newConn(opts.NewConnOpts)
23         if err != nil {
24                 return
25         }
26         err = initConn(conn, opts.InitConnOpts)
27         if err != nil {
28                 conn.Close()
29                 return
30         }
31         err = initDatabase(conn, opts.InitDbOpts)
32         if err != nil {
33                 return
34         }
35         return &client{
36                 conn:  conn,
37                 blobs: make(map[string]*sqlite.Blob),
38         }, nil
39 }
40
41 type client struct {
42         l     sync.Mutex
43         conn  conn
44         blobs map[string]*sqlite.Blob
45 }
46
47 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
48         return torrent{c}, nil
49 }
50
51 func (c *client) Close() error {
52         for _, b := range c.blobs {
53                 b.Close()
54         }
55         return c.conn.Close()
56 }
57
58 type torrent struct {
59         c *client
60 }
61
62 func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
63         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
64                 rowid = stmt.ColumnInt64(0)
65                 return nil
66         }, name)
67         if err != nil {
68                 return
69         }
70         if rowid != 0 {
71                 return
72         }
73         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
74         if err != nil {
75                 return
76         }
77         rowid = c.LastInsertRowID()
78         return
79 }
80
81 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
82         t.c.l.Lock()
83         defer t.c.l.Unlock()
84         name := p.Hash().HexString()
85         return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()}
86 }
87
88 func (t torrent) Close() error {
89         return nil
90 }
91
92 type piece struct {
93         conn   conn
94         l      *sync.Mutex
95         name   string
96         blobs  map[string]*sqlite.Blob
97         length int64
98 }
99
100 func (p2 piece) doAtIoWithBlob(
101         atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
102         p []byte,
103         off int64,
104 ) (n int, err error) {
105         p2.l.Lock()
106         defer p2.l.Unlock()
107         //defer p2.blobWouldExpire()
108         n, err = atIo(p2.getBlob())(p, off)
109         var se sqlite.Error
110         if !errors.As(err, &se) || se.Code != sqlite.SQLITE_ABORT {
111                 return
112         }
113         p2.blobWouldExpire()
114         return atIo(p2.getBlob())(p, off)
115 }
116
117 func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
118         return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
119                 return blob.ReadAt
120         }, p, off)
121 }
122
123 func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) {
124         return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
125                 return blob.WriteAt
126         }, p, off)
127 }
128
129 func (p2 piece) MarkComplete() error {
130         p2.l.Lock()
131         defer p2.l.Unlock()
132         err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name)
133         if err != nil {
134                 return err
135         }
136         changes := p2.conn.Changes()
137         if changes != 1 {
138                 panic(changes)
139         }
140         return nil
141 }
142
143 func (p2 piece) blobWouldExpire() {
144         blob, ok := p2.blobs[p2.name]
145         if !ok {
146                 return
147         }
148         blob.Close()
149         delete(p2.blobs, p2.name)
150 }
151
152 func (p2 piece) MarkNotComplete() error {
153         return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name)
154 }
155
156 func (p2 piece) Completion() (ret storage.Completion) {
157         p2.l.Lock()
158         defer p2.l.Unlock()
159         err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
160                 ret.Complete = stmt.ColumnInt(0) != 0
161                 return nil
162         }, p2.name)
163         ret.Ok = err == nil
164         if err != nil {
165                 panic(err)
166         }
167         return
168 }
169
170 func (p2 piece) closeBlobIfExists() {
171         if b, ok := p2.blobs[p2.name]; ok {
172                 b.Close()
173                 delete(p2.blobs, p2.name)
174         }
175 }
176
177 func (p2 piece) getBlob() *sqlite.Blob {
178         blob, ok := p2.blobs[p2.name]
179         if !ok {
180                 rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
181                 if err != nil {
182                         panic(err)
183                 }
184                 blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
185                 if err != nil {
186                         panic(err)
187                 }
188                 p2.blobs[p2.name] = blob
189         }
190         return blob
191 }