]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/new.go
Rework to use a pool of blobs
[btrtrc.git] / storage / sqlite / new.go
1 package sqliteStorage
2
3 import (
4         "sync"
5
6         "crawshaw.io/sqlite"
7         "crawshaw.io/sqlite/sqlitex"
8         "github.com/anacrolix/torrent/metainfo"
9         "github.com/anacrolix/torrent/storage"
10 )
11
12 type NewDirectStorageOpts struct {
13         NewPoolOpts
14         ProvOpts func(*ProviderOpts)
15 }
16
17 // A convenience function that creates a connection pool, resource provider, and a pieces storage
18 // ClientImpl and returns them all with a Close attached.
19 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
20         conns, provOpts, err := NewPool(opts.NewPoolOpts)
21         if err != nil {
22                 return
23         }
24         if f := opts.ProvOpts; f != nil {
25                 f(&provOpts)
26         }
27         provOpts.BatchWrites = false
28         prov, err := NewProvider(conns, provOpts)
29         if err != nil {
30                 conns.Close()
31                 return
32         }
33         return &client{
34                 prov:  prov,
35                 conn:  prov.pool.Get(nil),
36                 blobs: make(map[string]*sqlite.Blob),
37         }, nil
38 }
39
40 type client struct {
41         l     sync.Mutex
42         prov  *provider
43         conn  conn
44         blobs map[string]*sqlite.Blob
45 }
46
47 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
48         return torrent{c}, nil
49 }
50
51 func (c *client) Close() error {
52         for _, b := range c.blobs {
53                 b.Close()
54         }
55         c.prov.pool.Put(c.conn)
56         return c.prov.Close()
57 }
58
59 type torrent struct {
60         c *client
61 }
62
63 func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
64         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
65                 rowid = stmt.ColumnInt64(0)
66                 return nil
67         }, name)
68         if err != nil {
69                 return
70         }
71         if rowid != 0 {
72                 return
73         }
74         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
75         if err != nil {
76                 return
77         }
78         rowid = c.LastInsertRowID()
79         return
80 }
81
82 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
83         t.c.l.Lock()
84         defer t.c.l.Unlock()
85         name := p.Hash().HexString()
86         return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()}
87 }
88
89 func (t torrent) Close() error {
90         return nil
91 }
92
93 type piece struct {
94         conn   conn
95         l      *sync.Mutex
96         name   string
97         blobs  map[string]*sqlite.Blob
98         length int64
99 }
100
101 func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
102         p2.l.Lock()
103         defer p2.l.Unlock()
104         blob := p2.getBlob()
105         return blob.ReadAt(p, off)
106 }
107
108 func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) {
109         p2.l.Lock()
110         defer p2.l.Unlock()
111         return p2.getBlob().WriteAt(p, off)
112 }
113
114 func (p2 piece) MarkComplete() error {
115         p2.l.Lock()
116         defer p2.l.Unlock()
117         err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name)
118         if err != nil {
119                 return err
120         }
121         changes := p2.conn.Changes()
122         if changes != 1 {
123                 panic(changes)
124         }
125         p2.blobWouldExpire()
126         return nil
127 }
128
129 func (p2 piece) blobWouldExpire() {
130         blob, ok := p2.blobs[p2.name]
131         if !ok {
132                 return
133         }
134         blob.Close()
135         delete(p2.blobs, p2.name)
136 }
137
138 func (p2 piece) MarkNotComplete() error {
139         p2.blobWouldExpire()
140         return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name)
141 }
142
143 func (p2 piece) Completion() (ret storage.Completion) {
144         p2.l.Lock()
145         defer p2.l.Unlock()
146         err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
147                 ret.Complete = stmt.ColumnInt(0) != 0
148                 return nil
149         }, p2.name)
150         ret.Ok = err == nil
151         if err != nil {
152                 panic(err)
153         }
154         return
155 }
156
157 func (p2 piece) closeBlobIfExists() {
158         if b, ok := p2.blobs[p2.name]; ok {
159                 b.Close()
160                 delete(p2.blobs, p2.name)
161         }
162 }
163
164 func (p2 piece) getBlob() *sqlite.Blob {
165         blob, ok := p2.blobs[p2.name]
166         if !ok {
167                 rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
168                 if err != nil {
169                         panic(err)
170                 }
171                 blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
172                 if err != nil {
173                         panic(err)
174                 }
175                 p2.blobs[p2.name] = blob
176         }
177         return blob
178 }