]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/new.go
Implement sqlite directly without using piece resources
[btrtrc.git] / storage / sqlite / new.go
1 package sqliteStorage
2
3 import (
4         "sync"
5
6         "crawshaw.io/sqlite"
7         "crawshaw.io/sqlite/sqlitex"
8         "github.com/anacrolix/torrent/metainfo"
9         "github.com/anacrolix/torrent/storage"
10 )
11
12 type NewDirectStorageOpts struct {
13         NewPoolOpts
14         ProvOpts func(*ProviderOpts)
15 }
16
17 // A convenience function that creates a connection pool, resource provider, and a pieces storage
18 // ClientImpl and returns them all with a Close attached.
19 func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
20         conns, provOpts, err := NewPool(opts.NewPoolOpts)
21         if err != nil {
22                 return
23         }
24         if f := opts.ProvOpts; f != nil {
25                 f(&provOpts)
26         }
27         provOpts.BatchWrites = false
28         prov, err := NewProvider(conns, provOpts)
29         if err != nil {
30                 conns.Close()
31                 return
32         }
33         return &client{
34                 prov: prov,
35                 conn: prov.pool.Get(nil),
36         }, nil
37 }
38
39 type client struct {
40         l    sync.Mutex
41         prov *provider
42         conn conn
43         blob *sqlite.Blob
44 }
45
46 func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
47         return torrent{c}, nil
48 }
49
50 func (c *client) Close() error {
51         if c.blob != nil {
52                 c.blob.Close()
53         }
54         c.prov.pool.Put(c.conn)
55         return c.prov.Close()
56 }
57
58 type torrent struct {
59         c *client
60 }
61
62 func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
63         err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
64                 rowid = stmt.ColumnInt64(0)
65                 return nil
66         }, name)
67         if err != nil {
68                 return
69         }
70         if rowid != 0 {
71                 return
72         }
73         err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
74         if err != nil {
75                 return
76         }
77         rowid = c.LastInsertRowID()
78         return
79 }
80
81 func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
82         t.c.l.Lock()
83         defer t.c.l.Unlock()
84         name := p.Hash().HexString()
85         return piece{t.c.conn, name, &t.c.l, p.Length(), &t.c.blob}
86 }
87
88 func (t torrent) Close() error {
89         return nil
90 }
91
92 type piece struct {
93         conn   conn
94         name   string
95         l      *sync.Mutex
96         length int64
97         blob   **sqlite.Blob
98 }
99
100 func (p2 piece) getBlob() *sqlite.Blob {
101         if *p2.blob != nil {
102                 err := (*p2.blob).Close()
103                 if err != nil {
104                         panic(err)
105                 }
106                 *p2.blob = nil
107         }
108         rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
109         if err != nil {
110                 panic(err)
111         }
112         *p2.blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
113         if err != nil {
114                 panic(err)
115         }
116         return *p2.blob
117 }
118
119 func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
120         p2.l.Lock()
121         defer p2.l.Unlock()
122         blob := p2.getBlob()
123         return blob.ReadAt(p, off)
124 }
125
126 func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) {
127         p2.l.Lock()
128         defer p2.l.Unlock()
129         return p2.getBlob().WriteAt(p, off)
130 }
131
132 func (p2 piece) MarkComplete() error {
133         p2.l.Lock()
134         defer p2.l.Unlock()
135         err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name)
136         if err != nil {
137                 return err
138         }
139         changes := p2.conn.Changes()
140         if changes != 1 {
141                 panic(changes)
142         }
143         return nil
144 }
145
146 func (p2 piece) MarkNotComplete() error {
147         panic("implement me")
148 }
149
150 func (p2 piece) Completion() (ret storage.Completion) {
151         p2.l.Lock()
152         defer p2.l.Unlock()
153         err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
154                 ret.Complete = stmt.ColumnInt(0) != 0
155                 return nil
156         }, p2.name)
157         ret.Ok = err == nil
158         if err != nil {
159                 panic(err)
160         }
161         return
162 }