]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Implement sqlite directly without using piece resources
authorMatt Joiner <anacrolix@gmail.com>
Tue, 4 May 2021 09:51:42 +0000 (19:51 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 4 May 2021 09:51:42 +0000 (19:51 +1000)
go.sum
storage/sqlite/new.go [new file with mode: 0644]
storage/sqlite/sqlite-storage.go
test/init_test.go
test/transfer_test.go

diff --git a/go.sum b/go.sum
index e12648a141689bdc33ebee05c7465c37c7447d0c..7cbf393e307dfe4fa3c4cc07a3e409c92e311a55 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -268,6 +268,8 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
 github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
+github.com/getlantern/sqlite v0.3.3-0.20210215090556-4f83cf7731f0 h1:zvFSvII5rTbMZ3idAqSUjUCDgZFbWMKzxQot3/Y7nzA=
+github.com/getlantern/sqlite v0.3.3-0.20210215090556-4f83cf7731f0/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4=
 github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
 github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
 github.com/gliderlabs/ssh v0.1.1 h1:j3L6gSLQalDETeEg/Jg0mGY0/y/N6zI2xX1978P0Uqw=
diff --git a/storage/sqlite/new.go b/storage/sqlite/new.go
new file mode 100644 (file)
index 0000000..99a5c33
--- /dev/null
@@ -0,0 +1,162 @@
+package sqliteStorage
+
+import (
+       "sync"
+
+       "crawshaw.io/sqlite"
+       "crawshaw.io/sqlite/sqlitex"
+       "github.com/anacrolix/torrent/metainfo"
+       "github.com/anacrolix/torrent/storage"
+)
+
+type NewDirectStorageOpts struct {
+       NewPoolOpts
+       ProvOpts func(*ProviderOpts)
+}
+
+// A convenience function that creates a connection pool, resource provider, and a pieces storage
+// ClientImpl and returns them all with a Close attached.
+func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
+       conns, provOpts, err := NewPool(opts.NewPoolOpts)
+       if err != nil {
+               return
+       }
+       if f := opts.ProvOpts; f != nil {
+               f(&provOpts)
+       }
+       provOpts.BatchWrites = false
+       prov, err := NewProvider(conns, provOpts)
+       if err != nil {
+               conns.Close()
+               return
+       }
+       return &client{
+               prov: prov,
+               conn: prov.pool.Get(nil),
+       }, nil
+}
+
+type client struct {
+       l    sync.Mutex
+       prov *provider
+       conn conn
+       blob *sqlite.Blob
+}
+
+func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
+       return torrent{c}, nil
+}
+
+func (c *client) Close() error {
+       if c.blob != nil {
+               c.blob.Close()
+       }
+       c.prov.pool.Put(c.conn)
+       return c.prov.Close()
+}
+
+type torrent struct {
+       c *client
+}
+
+func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
+       err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
+               rowid = stmt.ColumnInt64(0)
+               return nil
+       }, name)
+       if err != nil {
+               return
+       }
+       if rowid != 0 {
+               return
+       }
+       err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
+       if err != nil {
+               return
+       }
+       rowid = c.LastInsertRowID()
+       return
+}
+
+func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
+       t.c.l.Lock()
+       defer t.c.l.Unlock()
+       name := p.Hash().HexString()
+       return piece{t.c.conn, name, &t.c.l, p.Length(), &t.c.blob}
+}
+
+func (t torrent) Close() error {
+       return nil
+}
+
+type piece struct {
+       conn   conn
+       name   string
+       l      *sync.Mutex
+       length int64
+       blob   **sqlite.Blob
+}
+
+func (p2 piece) getBlob() *sqlite.Blob {
+       if *p2.blob != nil {
+               err := (*p2.blob).Close()
+               if err != nil {
+                       panic(err)
+               }
+               *p2.blob = nil
+       }
+       rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
+       if err != nil {
+               panic(err)
+       }
+       *p2.blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
+       if err != nil {
+               panic(err)
+       }
+       return *p2.blob
+}
+
+func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
+       p2.l.Lock()
+       defer p2.l.Unlock()
+       blob := p2.getBlob()
+       return blob.ReadAt(p, off)
+}
+
+func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) {
+       p2.l.Lock()
+       defer p2.l.Unlock()
+       return p2.getBlob().WriteAt(p, off)
+}
+
+func (p2 piece) MarkComplete() error {
+       p2.l.Lock()
+       defer p2.l.Unlock()
+       err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name)
+       if err != nil {
+               return err
+       }
+       changes := p2.conn.Changes()
+       if changes != 1 {
+               panic(changes)
+       }
+       return nil
+}
+
+func (p2 piece) MarkNotComplete() error {
+       panic("implement me")
+}
+
+func (p2 piece) Completion() (ret storage.Completion) {
+       p2.l.Lock()
+       defer p2.l.Unlock()
+       err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
+               ret.Complete = stmt.ColumnInt(0) != 0
+               return nil
+       }, p2.name)
+       ret.Ok = err == nil
+       if err != nil {
+               panic(err)
+       }
+       return
+}
index 4b5abb244067fcc950c62c9e215a144021928f90..49ea4d7495c8289c19907690096dbccc24fcfbe5 100644 (file)
@@ -68,6 +68,7 @@ func InitSchema(conn conn, pageSize int, triggers bool) error {
                        name text,
                        last_used timestamp default (datetime('now')),
                        data blob,
+                       verified bool,
                        primary key (name)
                );
                
index 38335ef8c6391f9144d98300d04ea0a912c41e12..b862d4baebde351a3c2d503ec71b9011187913a0 100644 (file)
@@ -1,5 +1,11 @@
 package test
 
 import (
+       "log"
+
        _ "github.com/anacrolix/envpprof"
 )
+
+func init() {
+       log.SetFlags(log.Flags() | log.Lshortfile)
+}
index f232b186ee7ce495b25af720366c432625f76945..0ae10060d63c5ba7d9c815a22c4ad108286f8366 100644 (file)
@@ -4,6 +4,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "log"
        "os"
        "path/filepath"
        "runtime"
@@ -110,6 +111,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
                cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
        }
        cfg.Seed = false
+       cfg.Debug = true
        if ps.ConfigureLeecher.Config != nil {
                ps.ConfigureLeecher.Config(cfg)
        }
@@ -330,6 +332,20 @@ func TestClientTransferVarious(t *testing.T) {
                        Wrapper: fileCachePieceResourceStorage,
                }), 0},
                {"Boltdb", storage.NewBoltDB, 0},
+               {"SqliteDirect", func(s string) storage.ClientImplCloser {
+                       path := filepath.Join(s, "sqlite3.db")
+                       log.Print(path)
+                       cl, err := sqliteStorage.NewDirectStorage(sqliteStorage.NewDirectStorageOpts{
+                               NewPoolOpts: sqliteStorage.NewPoolOpts{
+                                       Path: path,
+                               },
+                               ProvOpts: nil,
+                       })
+                       if err != nil {
+                               panic(err)
+                       }
+                       return cl
+               }, 0},
                sqliteLeecherStorageTestCase(1),
                sqliteLeecherStorageTestCase(2),
                // This should use a number of connections equal to the number of CPUs
@@ -362,7 +378,7 @@ func TestClientTransferVarious(t *testing.T) {
                                                                        GOMAXPROCS:     ls.gomaxprocs,
                                                                })
                                                        })
-                                                       for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
+                                                       for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} {
                                                                t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
                                                                        testClientTransfer(t, testClientTransferParams{
                                                                                SeederStorage:  ss.f,