]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/direct.go
Rework lots of option handling
[btrtrc.git] / storage / sqlite / direct.go
1 package sqliteStorage
2
3 import (
4         "errors"
5         "sync"
6
7         "crawshaw.io/sqlite"
8         "crawshaw.io/sqlite/sqlitex"
9         "github.com/anacrolix/torrent/metainfo"
10         "github.com/anacrolix/torrent/storage"
11 )
12
13 type NewDirectStorageOpts struct {
14         NewConnOpts
15         InitDbOpts
16 }
17
18 // A convenience function that creates a connection pool, resource provider, and a pieces storage
19 // ClientImpl and returns them all with a Close attached.
20 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
21         conn, err := newConn(opts.NewConnOpts)
22         if err != nil {
23                 return
24         }
25         journalMode := "delete"
26         if opts.Memory {
27                 journalMode = "off"
28         }
29         err = initConn(conn, InitConnOpts{
30                 SetJournalMode: journalMode,
31                 MmapSizeOk:     true,
32                 MmapSize:       1 << 25,
33         })
34         if err != nil {
35                 return
36         }
37         err = initDatabase(conn, opts.InitDbOpts)
38         if err != nil {
39                 return
40         }
41         return &client{
42                 conn:  conn,
43                 blobs: make(map[string]*sqlite.Blob),
44         }, nil
45 }
46
47 type client struct {
48         l     sync.Mutex
49         conn  conn
50         blobs map[string]*sqlite.Blob
51 }
52
53 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
54         return torrent{c}, nil
55 }
56
57 func (c *client) Close() error {
58         for _, b := range c.blobs {
59                 b.Close()
60         }
61         return c.conn.Close()
62 }
63
64 type torrent struct {
65         c *client
66 }
67
68 func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
69         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
70                 rowid = stmt.ColumnInt64(0)
71                 return nil
72         }, name)
73         if err != nil {
74                 return
75         }
76         if rowid != 0 {
77                 return
78         }
79         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
80         if err != nil {
81                 return
82         }
83         rowid = c.LastInsertRowID()
84         return
85 }
86
87 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
88         t.c.l.Lock()
89         defer t.c.l.Unlock()
90         name := p.Hash().HexString()
91         return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()}
92 }
93
94 func (t torrent) Close() error {
95         return nil
96 }
97
98 type piece struct {
99         conn   conn
100         l      *sync.Mutex
101         name   string
102         blobs  map[string]*sqlite.Blob
103         length int64
104 }
105
106 func (p2 piece) doAtIoWithBlob(
107         atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
108         p []byte,
109         off int64,
110 ) (n int, err error) {
111         p2.l.Lock()
112         defer p2.l.Unlock()
113         //defer p2.blobWouldExpire()
114         n, err = atIo(p2.getBlob())(p, off)
115         var se sqlite.Error
116         if !errors.As(err, &se) || se.Code != sqlite.SQLITE_ABORT {
117                 return
118         }
119         p2.blobWouldExpire()
120         return atIo(p2.getBlob())(p, off)
121 }
122
123 func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
124         return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
125                 return blob.ReadAt
126         }, p, off)
127 }
128
129 func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) {
130         return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
131                 return blob.WriteAt
132         }, p, off)
133 }
134
135 func (p2 piece) MarkComplete() error {
136         p2.l.Lock()
137         defer p2.l.Unlock()
138         err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name)
139         if err != nil {
140                 return err
141         }
142         changes := p2.conn.Changes()
143         if changes != 1 {
144                 panic(changes)
145         }
146         return nil
147 }
148
149 func (p2 piece) blobWouldExpire() {
150         blob, ok := p2.blobs[p2.name]
151         if !ok {
152                 return
153         }
154         blob.Close()
155         delete(p2.blobs, p2.name)
156 }
157
158 func (p2 piece) MarkNotComplete() error {
159         return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name)
160 }
161
162 func (p2 piece) Completion() (ret storage.Completion) {
163         p2.l.Lock()
164         defer p2.l.Unlock()
165         err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
166                 ret.Complete = stmt.ColumnInt(0) != 0
167                 return nil
168         }, p2.name)
169         ret.Ok = err == nil
170         if err != nil {
171                 panic(err)
172         }
173         return
174 }
175
176 func (p2 piece) closeBlobIfExists() {
177         if b, ok := p2.blobs[p2.name]; ok {
178                 b.Close()
179                 delete(p2.blobs, p2.name)
180         }
181 }
182
183 func (p2 piece) getBlob() *sqlite.Blob {
184         blob, ok := p2.blobs[p2.name]
185         if !ok {
186                 rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
187                 if err != nil {
188                         panic(err)
189                 }
190                 blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
191                 if err != nil {
192                         panic(err)
193                 }
194                 p2.blobs[p2.name] = blob
195         }
196         return blob
197 }