From: Matt Joiner Date: Tue, 4 May 2021 09:51:42 +0000 (+1000) Subject: Implement sqlite directly without using piece resources X-Git-Tag: v1.28.0~18^2~13 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=afea28091f1975b075d6a25999b90268bfc5b5b8;p=btrtrc.git Implement sqlite directly without using piece resources --- diff --git a/go.sum b/go.sum index e12648a1..7cbf393e 100644 --- a/go.sum +++ b/go.sum @@ -268,6 +268,8 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/getlantern/sqlite v0.3.3-0.20210215090556-4f83cf7731f0 h1:zvFSvII5rTbMZ3idAqSUjUCDgZFbWMKzxQot3/Y7nzA= +github.com/getlantern/sqlite v0.3.3-0.20210215090556-4f83cf7731f0/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1 h1:j3L6gSLQalDETeEg/Jg0mGY0/y/N6zI2xX1978P0Uqw= diff --git a/storage/sqlite/new.go b/storage/sqlite/new.go new file mode 100644 index 00000000..99a5c33a --- /dev/null +++ b/storage/sqlite/new.go @@ -0,0 +1,162 @@ +package sqliteStorage + +import ( + "sync" + + "crawshaw.io/sqlite" + "crawshaw.io/sqlite/sqlitex" + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/storage" +) + +type NewDirectStorageOpts struct { + NewPoolOpts + ProvOpts func(*ProviderOpts) +} + +// A convenience function that creates a connection pool, resource provider, and a pieces storage +// ClientImpl and returns them all with a Close attached. +func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) { + conns, provOpts, err := NewPool(opts.NewPoolOpts) + if err != nil { + return + } + if f := opts.ProvOpts; f != nil { + f(&provOpts) + } + provOpts.BatchWrites = false + prov, err := NewProvider(conns, provOpts) + if err != nil { + conns.Close() + return + } + return &client{ + prov: prov, + conn: prov.pool.Get(nil), + }, nil +} + +type client struct { + l sync.Mutex + prov *provider + conn conn + blob *sqlite.Blob +} + +func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { + return torrent{c}, nil +} + +func (c *client) Close() error { + if c.blob != nil { + c.blob.Close() + } + c.prov.pool.Put(c.conn) + return c.prov.Close() +} + +type torrent struct { + c *client +} + +func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) { + err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error { + rowid = stmt.ColumnInt64(0) + return nil + }, name) + if err != nil { + return + } + if rowid != 0 { + return + } + err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length) + if err != nil { + return + } + rowid = c.LastInsertRowID() + return +} + +func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl { + t.c.l.Lock() + defer t.c.l.Unlock() + name := p.Hash().HexString() + return piece{t.c.conn, name, &t.c.l, p.Length(), &t.c.blob} +} + +func (t torrent) Close() error { + return nil +} + +type piece struct { + conn conn + name string + l *sync.Mutex + length int64 + blob **sqlite.Blob +} + +func (p2 piece) getBlob() *sqlite.Blob { + if *p2.blob != nil { + err := (*p2.blob).Close() + if err != nil { + panic(err) + } + *p2.blob = nil + } + rowid, err := rowidForBlob(p2.conn, p2.name, p2.length) + if err != nil { + panic(err) + } + *p2.blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true) + if err != nil { + panic(err) + } + return *p2.blob +} + +func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) { + p2.l.Lock() + defer p2.l.Unlock() + blob := p2.getBlob() + return blob.ReadAt(p, off) +} + +func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) { + p2.l.Lock() + defer p2.l.Unlock() + return p2.getBlob().WriteAt(p, off) +} + +func (p2 piece) MarkComplete() error { + p2.l.Lock() + defer p2.l.Unlock() + err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name) + if err != nil { + return err + } + changes := p2.conn.Changes() + if changes != 1 { + panic(changes) + } + return nil +} + +func (p2 piece) MarkNotComplete() error { + panic("implement me") +} + +func (p2 piece) Completion() (ret storage.Completion) { + p2.l.Lock() + defer p2.l.Unlock() + err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error { + ret.Complete = stmt.ColumnInt(0) != 0 + return nil + }, p2.name) + ret.Ok = err == nil + if err != nil { + panic(err) + } + return +} diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index 4b5abb24..49ea4d74 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -68,6 +68,7 @@ func InitSchema(conn conn, pageSize int, triggers bool) error { name text, last_used timestamp default (datetime('now')), data blob, + verified bool, primary key (name) ); diff --git a/test/init_test.go b/test/init_test.go index 38335ef8..b862d4ba 100644 --- a/test/init_test.go +++ b/test/init_test.go @@ -1,5 +1,11 @@ package test import ( + "log" + _ "github.com/anacrolix/envpprof" ) + +func init() { + log.SetFlags(log.Flags() | log.Lshortfile) +} diff --git a/test/transfer_test.go b/test/transfer_test.go index f232b186..0ae10060 100644 --- a/test/transfer_test.go +++ b/test/transfer_test.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "io/ioutil" + "log" "os" "path/filepath" "runtime" @@ -110,6 +111,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter } cfg.Seed = false + cfg.Debug = true if ps.ConfigureLeecher.Config != nil { ps.ConfigureLeecher.Config(cfg) } @@ -330,6 +332,20 @@ func TestClientTransferVarious(t *testing.T) { Wrapper: fileCachePieceResourceStorage, }), 0}, {"Boltdb", storage.NewBoltDB, 0}, + {"SqliteDirect", func(s string) storage.ClientImplCloser { + path := filepath.Join(s, "sqlite3.db") + log.Print(path) + cl, err := sqliteStorage.NewDirectStorage(sqliteStorage.NewDirectStorageOpts{ + NewPoolOpts: sqliteStorage.NewPoolOpts{ + Path: path, + }, + ProvOpts: nil, + }) + if err != nil { + panic(err) + } + return cl + }, 0}, sqliteLeecherStorageTestCase(1), sqliteLeecherStorageTestCase(2), // This should use a number of connections equal to the number of CPUs @@ -362,7 +378,7 @@ func TestClientTransferVarious(t *testing.T) { GOMAXPROCS: ls.gomaxprocs, }) }) - for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} { + for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} { t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) { testClientTransfer(t, testClientTransferParams{ SeederStorage: ss.f,