]> Sergey Matveev's repositories - btrtrc.git/blob - storage/sqlite/sqlite-storage_test.go
Rework to use a pool of blobs
[btrtrc.git] / storage / sqlite / sqlite-storage_test.go
1 package sqliteStorage
2
3 import (
4         "bytes"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "path/filepath"
9         "sync"
10         "testing"
11
12         _ "github.com/anacrolix/envpprof"
13         "github.com/anacrolix/torrent/storage"
14         test_storage "github.com/anacrolix/torrent/storage/test"
15         qt "github.com/frankban/quicktest"
16         "github.com/stretchr/testify/assert"
17         "github.com/stretchr/testify/require"
18 )
19
20 func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
21         opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
22         conns, provOpts, err := NewPool(opts)
23         require.NoError(t, err)
24         // sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now.
25         //t.Cleanup(func() { conns.Close() })
26         prov, err := NewProvider(conns, provOpts)
27         require.NoError(t, err)
28         t.Cleanup(func() { prov.Close() })
29         return conns, prov
30 }
31
32 func TestTextBlobSize(t *testing.T) {
33         _, prov := newConnsAndProv(t, NewPoolOpts{})
34         a, _ := prov.NewInstance("a")
35         err := a.Put(bytes.NewBufferString("\x00hello"))
36         qt.Assert(t, err, qt.IsNil)
37         fi, err := a.Stat()
38         qt.Assert(t, err, qt.IsNil)
39         assert.EqualValues(t, 6, fi.Size())
40 }
41
42 func TestSimultaneousIncrementalBlob(t *testing.T) {
43         _, p := newConnsAndProv(t, NewPoolOpts{
44                 NumConns: 3,
45         })
46         a, err := p.NewInstance("a")
47         require.NoError(t, err)
48         const contents = "hello, world"
49         require.NoError(t, a.Put(bytes.NewReader([]byte("hello, world"))))
50         rc1, err := a.Get()
51         require.NoError(t, err)
52         rc2, err := a.Get()
53         require.NoError(t, err)
54         var b1, b2 []byte
55         var e1, e2 error
56         var wg sync.WaitGroup
57         doRead := func(b *[]byte, e *error, rc io.ReadCloser, n int) {
58                 defer wg.Done()
59                 defer rc.Close()
60                 *b, *e = ioutil.ReadAll(rc)
61                 require.NoError(t, *e, n)
62                 assert.EqualValues(t, contents, *b)
63         }
64         wg.Add(2)
65         go doRead(&b2, &e2, rc2, 2)
66         go doRead(&b1, &e1, rc1, 1)
67         wg.Wait()
68 }
69
70 func BenchmarkMarkComplete(b *testing.B) {
71         const pieceSize = test_storage.DefaultPieceSize
72         const capacity = test_storage.DefaultNumPieces * pieceSize / 2
73         c := qt.New(b)
74         for _, storage := range []struct {
75                 name  string
76                 maker func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser
77         }{
78                 {"SqliteDirect", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser {
79                         ci, err := NewDirectStorage(NewDirectStorageOpts{
80                                 NewPoolOpts: newPoolOpts,
81                                 ProvOpts:    provOpts,
82                         })
83                         c.Assert(err, qt.IsNil)
84                         return ci
85                 }},
86                 {"SqlitePieceStorage", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser {
87                         ci, err := NewPiecesStorage(NewPiecesStorageOpts{
88                                 NewPoolOpts: newPoolOpts,
89                                 ProvOpts:    provOpts,
90                         })
91                         c.Assert(err, qt.IsNil)
92                         return ci
93                 }},
94         } {
95                 b.Run(storage.name, func(b *testing.B) {
96                         for _, memory := range []bool{false, true} {
97                                 b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
98                                         for _, batchWrites := range []bool{false, true} {
99                                                 b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
100                                                         dbPath := filepath.Join(b.TempDir(), "storage.db")
101                                                         //b.Logf("storage db path: %q", dbPath)
102                                                         newPoolOpts := NewPoolOpts{
103                                                                 Path:                  dbPath,
104                                                                 Capacity:              capacity,
105                                                                 NoConcurrentBlobReads: false,
106                                                                 PageSize:              1 << 14,
107                                                                 Memory:                memory,
108                                                         }
109                                                         provOpts := func(opts *ProviderOpts) {
110                                                                 opts.BatchWrites = batchWrites
111                                                         }
112                                                         ci := storage.maker(newPoolOpts, provOpts)
113                                                         defer ci.Close()
114                                                         test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
115                                                 })
116                                         }
117                                 })
118                         }
119                 })
120         }
121 }