]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/direct.go
Rename new.go
[btrtrc.git] / storage / sqlite / direct.go
1 package sqliteStorage
2
3 import (
4         "errors"
5         "sync"
6
7         "crawshaw.io/sqlite"
8         "crawshaw.io/sqlite/sqlitex"
9         "github.com/anacrolix/torrent/metainfo"
10         "github.com/anacrolix/torrent/storage"
11 )
12
13 type NewDirectStorageOpts struct {
14         NewPoolOpts
15         ProvOpts func(*ProviderOpts)
16 }
17
18 // A convenience function that creates a connection pool, resource provider, and a pieces storage
19 // ClientImpl and returns them all with a Close attached.
20 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
21         conns, provOpts, err := NewPool(opts.NewPoolOpts)
22         if err != nil {
23                 return
24         }
25         if f := opts.ProvOpts; f != nil {
26                 f(&provOpts)
27         }
28         provOpts.BatchWrites = false
29         prov, err := NewProvider(conns, provOpts)
30         if err != nil {
31                 conns.Close()
32                 return
33         }
34         return &client{
35                 prov:  prov,
36                 conn:  prov.pool.Get(nil),
37                 blobs: make(map[string]*sqlite.Blob),
38         }, nil
39 }
40
41 type client struct {
42         l     sync.Mutex
43         prov  *provider
44         conn  conn
45         blobs map[string]*sqlite.Blob
46 }
47
48 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
49         return torrent{c}, nil
50 }
51
52 func (c *client) Close() error {
53         for _, b := range c.blobs {
54                 b.Close()
55         }
56         c.prov.pool.Put(c.conn)
57         return c.prov.Close()
58 }
59
60 type torrent struct {
61         c *client
62 }
63
64 func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
65         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
66                 rowid = stmt.ColumnInt64(0)
67                 return nil
68         }, name)
69         if err != nil {
70                 return
71         }
72         if rowid != 0 {
73                 return
74         }
75         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
76         if err != nil {
77                 return
78         }
79         rowid = c.LastInsertRowID()
80         return
81 }
82
83 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
84         t.c.l.Lock()
85         defer t.c.l.Unlock()
86         name := p.Hash().HexString()
87         return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()}
88 }
89
90 func (t torrent) Close() error {
91         return nil
92 }
93
94 type piece struct {
95         conn   conn
96         l      *sync.Mutex
97         name   string
98         blobs  map[string]*sqlite.Blob
99         length int64
100 }
101
102 func (p2 piece) doAtIoWithBlob(
103         atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
104         p []byte,
105         off int64,
106 ) (n int, err error) {
107         p2.l.Lock()
108         defer p2.l.Unlock()
109         n, err = atIo(p2.getBlob())(p, off)
110         var se sqlite.Error
111         if !errors.As(err, &se) || se.Code != sqlite.SQLITE_ABORT {
112                 return
113         }
114         p2.blobWouldExpire()
115         return atIo(p2.getBlob())(p, off)
116 }
117
118 func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
119         return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
120                 return blob.ReadAt
121         }, p, off)
122 }
123
124 func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) {
125         return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
126                 return blob.WriteAt
127         }, p, off)
128 }
129
130 func (p2 piece) MarkComplete() error {
131         p2.l.Lock()
132         defer p2.l.Unlock()
133         err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name)
134         if err != nil {
135                 return err
136         }
137         changes := p2.conn.Changes()
138         if changes != 1 {
139                 panic(changes)
140         }
141         return nil
142 }
143
144 func (p2 piece) blobWouldExpire() {
145         blob, ok := p2.blobs[p2.name]
146         if !ok {
147                 return
148         }
149         blob.Close()
150         delete(p2.blobs, p2.name)
151 }
152
153 func (p2 piece) MarkNotComplete() error {
154         return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name)
155 }
156
157 func (p2 piece) Completion() (ret storage.Completion) {
158         p2.l.Lock()
159         defer p2.l.Unlock()
160         err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
161                 ret.Complete = stmt.ColumnInt(0) != 0
162                 return nil
163         }, p2.name)
164         ret.Ok = err == nil
165         if err != nil {
166                 panic(err)
167         }
168         return
169 }
170
171 func (p2 piece) closeBlobIfExists() {
172         if b, ok := p2.blobs[p2.name]; ok {
173                 b.Close()
174                 delete(p2.blobs, p2.name)
175         }
176 }
177
178 func (p2 piece) getBlob() *sqlite.Blob {
179         blob, ok := p2.blobs[p2.name]
180         if !ok {
181                 rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
182                 if err != nil {
183                         panic(err)
184                 }
185                 blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
186                 if err != nil {
187                         panic(err)
188                 }
189                 p2.blobs[p2.name] = blob
190         }
191         return blob
192 }