]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add a sqlite data storage implementation
authorMatt Joiner <anacrolix@gmail.com>
Sun, 11 Oct 2020 01:58:27 +0000 (12:58 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 11 Oct 2020 01:58:27 +0000 (12:58 +1100)
storage/sqlite/sqlite-storage.go [new file with mode: 0644]
test/transfer_test.go

diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go
new file mode 100644 (file)
index 0000000..09c2edc
--- /dev/null
@@ -0,0 +1,204 @@
+package sqliteStorage
+
+import (
+       "bytes"
+       "errors"
+       "io"
+       "log"
+       "os"
+       "sync"
+       "time"
+
+       "crawshaw.io/sqlite"
+       "crawshaw.io/sqlite/sqlitex"
+       "github.com/anacrolix/missinggo/v2/resource"
+)
+
+type conn = *sqlite.Conn
+
+func initConn(conn conn) error {
+       return sqlitex.ExecScript(conn, `
+create table if not exists blobs(name, data, primary key (name));
+`)
+}
+
+func NewProvider(conn *sqlite.Conn) (*provider, error) {
+       err := initConn(conn)
+       return &provider{conn: conn}, err
+}
+
+type provider struct {
+       mu   sync.Mutex
+       conn conn
+}
+
+func (p *provider) NewInstance(s string) (resource.Instance, error) {
+       return instance{s, p}, nil
+}
+
+type instance struct {
+       location string
+       p        *provider
+}
+
+func (i instance) withConn(with func(conn conn)) {
+       i.lockConn()
+       defer i.unlockConn()
+       with(i.p.conn)
+}
+
+func (i instance) lockConn() {
+       i.p.mu.Lock()
+}
+
+func (i instance) unlockConn() {
+       i.p.mu.Unlock()
+}
+
+func (i instance) Readdirnames() (names []string, err error) {
+       prefix := i.location + "/"
+       i.withConn(func(conn conn) {
+               err = sqlitex.Exec(conn, "select name from blobs where name like ?", func(stmt *sqlite.Stmt) error {
+                       names = append(names, stmt.ColumnText(0)[len(prefix):])
+                       return nil
+               }, prefix+"%")
+       })
+       log.Printf("readdir %q gave %q", i.location, names)
+       return
+}
+
+func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
+       rows := 0
+       err = sqlitex.Exec(conn, "select rowid from blobs where name=?", func(stmt *sqlite.Stmt) error {
+               rowid = stmt.ColumnInt64(0)
+               rows++
+               return nil
+       }, i.location)
+       if err != nil {
+               return
+       }
+       if rows == 1 {
+               return
+       }
+       if rows == 0 {
+               err = errors.New("blob not found")
+               return
+       }
+       panic(rows)
+}
+
+type connBlob struct {
+       *sqlite.Blob
+       onClose func()
+}
+
+func (me connBlob) Close() error {
+       err := me.Blob.Close()
+       me.onClose()
+       return err
+}
+
+func (i instance) Get() (ret io.ReadCloser, err error) {
+       i.lockConn()
+       blob, err := i.openBlob(i.p.conn, false)
+       if err != nil {
+               i.unlockConn()
+               return
+       }
+       var once sync.Once
+       return connBlob{blob, func() {
+               once.Do(i.unlockConn)
+       }}, nil
+}
+
+func (i instance) openBlob(conn conn, write bool) (*sqlite.Blob, error) {
+       rowid, err := i.getBlobRowid(conn)
+       if err != nil {
+               return nil, err
+       }
+       return conn.OpenBlob("main", "blobs", "data", rowid, write)
+}
+
+func (i instance) Put(reader io.Reader) (err error) {
+       var buf bytes.Buffer
+       _, err = io.Copy(&buf, reader)
+       if err != nil {
+               return err
+       }
+       i.withConn(func(conn conn) {
+               err = sqlitex.Exec(conn, "insert or replace into blobs(name, data) values(?, ?)", nil, i.location, buf.Bytes())
+       })
+       return
+}
+
+type fileInfo struct {
+       size int64
+}
+
+func (f fileInfo) Name() string {
+       panic("implement me")
+}
+
+func (f fileInfo) Size() int64 {
+       return f.size
+}
+
+func (f fileInfo) Mode() os.FileMode {
+       panic("implement me")
+}
+
+func (f fileInfo) ModTime() time.Time {
+       panic("implement me")
+}
+
+func (f fileInfo) IsDir() bool {
+       panic("implement me")
+}
+
+func (f fileInfo) Sys() interface{} {
+       panic("implement me")
+}
+
+func (i instance) Stat() (ret os.FileInfo, err error) {
+       i.withConn(func(conn conn) {
+               var blob *sqlite.Blob
+               blob, err = i.openBlob(conn, false)
+               if err != nil {
+                       return
+               }
+               defer blob.Close()
+               ret = fileInfo{blob.Size()}
+       })
+       return
+}
+
+func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
+       i.withConn(func(conn conn) {
+               var blob *sqlite.Blob
+               blob, err = i.openBlob(conn, false)
+               if err != nil {
+                       return
+               }
+               defer blob.Close()
+               if off >= blob.Size() {
+                       err = io.EOF
+                       return
+               }
+               if off+int64(len(p)) > blob.Size() {
+                       p = p[:blob.Size()-off]
+               }
+               n, err = blob.ReadAt(p, off)
+       })
+       return
+}
+
+func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
+       panic("implement me")
+}
+
+func (i instance) Delete() (err error) {
+       i.withConn(func(conn conn) {
+               err = sqlitex.Exec(conn, "delete from blobs where name=?", nil, i.location)
+       })
+       return
+}
index f549f913676fb38be29ac27b040334c5d5f0df6d..3cd788c3b6b2372c4c837c7a45cef8c2e1a4ca1a 100644 (file)
@@ -4,15 +4,19 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "log"
        "os"
+       "path/filepath"
        "sync"
        "testing"
        "time"
 
+       "crawshaw.io/sqlite"
        "github.com/anacrolix/missinggo/v2/filecache"
        "github.com/anacrolix/torrent"
        "github.com/anacrolix/torrent/internal/testutil"
        "github.com/anacrolix/torrent/storage"
+       sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
        "golang.org/x/time/rate"
 
        "github.com/stretchr/testify/assert"
@@ -277,6 +281,25 @@ func TestClientTransferVarious(t *testing.T) {
                        Wrapper: fileCachePieceResourceStorage,
                })},
                {"Boltdb", storage.NewBoltDB},
+               {"Sqlite", func(dataDir string) storage.ClientImplCloser {
+                       path := filepath.Join(dataDir, "sqlite.db")
+                       log.Printf("creating sqlite db at %q", path)
+                       conn, err := sqlite.OpenConn(fmt.Sprintf("file:%s", path), 0)
+                       if err != nil {
+                               panic(err)
+                       }
+                       prov, err := sqliteStorage.NewProvider(conn)
+                       if err != nil {
+                               panic(err)
+                       }
+                       return struct {
+                               storage.ClientImpl
+                               io.Closer
+                       }{
+                               storage.NewResourcePieces(prov),
+                               conn,
+                       }
+               }},
        } {
                t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
                        // Seeder storage