]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Performance fiddling on sqlite storage
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "log"
8         "os"
9         "path/filepath"
10         "sync"
11         "testing"
12         "time"
13
14         "crawshaw.io/sqlite"
15         "crawshaw.io/sqlite/sqlitex"
16         "github.com/anacrolix/missinggo/v2/filecache"
17         "github.com/anacrolix/torrent"
18         "github.com/anacrolix/torrent/internal/testutil"
19         "github.com/anacrolix/torrent/storage"
20         sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
21         "golang.org/x/time/rate"
22
23         "github.com/stretchr/testify/assert"
24         "github.com/stretchr/testify/require"
25 )
26
27 type testClientTransferParams struct {
28         Responsive                 bool
29         Readahead                  int64
30         SetReadahead               bool
31         LeecherStorage             func(string) storage.ClientImplCloser
32         SeederStorage              func(string) storage.ClientImplCloser
33         SeederUploadRateLimiter    *rate.Limiter
34         LeecherDownloadRateLimiter *rate.Limiter
35         ConfigureSeeder            ConfigureClient
36         ConfigureLeecher           ConfigureClient
37
38         LeecherStartsWithoutMetadata bool
39 }
40
41 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
42         pos, err := r.Seek(0, io.SeekStart)
43         assert.NoError(t, err)
44         assert.EqualValues(t, 0, pos)
45         _greeting, err := ioutil.ReadAll(r)
46         assert.NoError(t, err)
47         assert.EqualValues(t, testutil.GreetingFileContents, string(_greeting))
48 }
49
50 // Creates a seeder and a leecher, and ensures the data transfers when a read
51 // is attempted on the leecher.
52 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
53         greetingTempDir, mi := testutil.GreetingTestTorrent()
54         defer os.RemoveAll(greetingTempDir)
55         // Create seeder and a Torrent.
56         cfg := torrent.TestingConfig()
57         cfg.Seed = true
58         if ps.SeederUploadRateLimiter != nil {
59                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
60         }
61         // cfg.ListenAddr = "localhost:4000"
62         if ps.SeederStorage != nil {
63                 storage := ps.SeederStorage(greetingTempDir)
64                 defer storage.Close()
65                 cfg.DefaultStorage = storage
66         } else {
67                 cfg.DataDir = greetingTempDir
68         }
69         if ps.ConfigureSeeder.Config != nil {
70                 ps.ConfigureSeeder.Config(cfg)
71         }
72         seeder, err := torrent.NewClient(cfg)
73         require.NoError(t, err)
74         if ps.ConfigureSeeder.Client != nil {
75                 ps.ConfigureSeeder.Client(seeder)
76         }
77         defer testutil.ExportStatusWriter(seeder, "s", t)()
78         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
79         // Run a Stats right after Closing the Client. This will trigger the Stats
80         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
81         defer seederTorrent.Stats()
82         defer seeder.Close()
83         seederTorrent.VerifyData()
84         // Create leecher and a Torrent.
85         leecherDataDir, err := ioutil.TempDir("", "")
86         require.NoError(t, err)
87         defer os.RemoveAll(leecherDataDir)
88         cfg = torrent.TestingConfig()
89         if ps.LeecherStorage == nil {
90                 cfg.DataDir = leecherDataDir
91         } else {
92                 storage := ps.LeecherStorage(leecherDataDir)
93                 defer storage.Close()
94                 cfg.DefaultStorage = storage
95         }
96         if ps.LeecherDownloadRateLimiter != nil {
97                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
98         }
99         cfg.Seed = false
100         if ps.ConfigureLeecher.Config != nil {
101                 ps.ConfigureLeecher.Config(cfg)
102         }
103         leecher, err := torrent.NewClient(cfg)
104         require.NoError(t, err)
105         defer leecher.Close()
106         if ps.ConfigureLeecher.Client != nil {
107                 ps.ConfigureLeecher.Client(leecher)
108         }
109         defer testutil.ExportStatusWriter(leecher, "l", t)()
110         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
111                 ret = torrent.TorrentSpecFromMetaInfo(mi)
112                 ret.ChunkSize = 2
113                 if ps.LeecherStartsWithoutMetadata {
114                         ret.InfoBytes = nil
115                 }
116                 return
117         }())
118         require.NoError(t, err)
119         assert.True(t, new)
120
121         //// This was used when observing coalescing of piece state changes.
122         //logPieceStateChanges(leecherTorrent)
123
124         // Now do some things with leecher and seeder.
125         added := leecherTorrent.AddClientPeer(seeder)
126         assert.False(t, leecherTorrent.Seeding())
127         // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
128         // should be sitting idle until we demand data.
129         if !ps.LeecherStartsWithoutMetadata {
130                 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
131         }
132         if ps.LeecherStartsWithoutMetadata {
133                 <-leecherTorrent.GotInfo()
134         }
135         r := leecherTorrent.NewReader()
136         defer r.Close()
137         go leecherTorrent.SetInfoBytes(mi.InfoBytes)
138         if ps.Responsive {
139                 r.SetResponsive()
140         }
141         if ps.SetReadahead {
142                 r.SetReadahead(ps.Readahead)
143         }
144         assertReadAllGreeting(t, r)
145         assert.NotEmpty(t, seederTorrent.PeerConns())
146         leecherPeerConns := leecherTorrent.PeerConns()
147         assert.NotEmpty(t, leecherPeerConns)
148         foundSeeder := false
149         for _, pc := range leecherPeerConns {
150                 completed := pc.PeerPieces().Len()
151                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
152                 if completed == leecherTorrent.Info().NumPieces() {
153                         foundSeeder = true
154                 }
155         }
156         if !foundSeeder {
157                 t.Errorf("didn't find seeder amongst leecher peer conns")
158         }
159
160         seederStats := seederTorrent.Stats()
161         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
162         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
163
164         leecherStats := leecherTorrent.Stats()
165         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
166         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
167
168         // Try reading through again for the cases where the torrent data size
169         // exceeds the size of the cache.
170         assertReadAllGreeting(t, r)
171 }
172
173 type fileCacheClientStorageFactoryParams struct {
174         Capacity    int64
175         SetCapacity bool
176         Wrapper     func(*filecache.Cache) storage.ClientImplCloser
177 }
178
179 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
180         return func(dataDir string) storage.ClientImplCloser {
181                 fc, err := filecache.NewCache(dataDir)
182                 if err != nil {
183                         panic(err)
184                 }
185                 if ps.SetCapacity {
186                         fc.SetCapacity(ps.Capacity)
187                 }
188                 return ps.Wrapper(fc)
189         }
190 }
191
192 type storageFactory func(string) storage.ClientImplCloser
193
194 func TestClientTransferDefault(t *testing.T) {
195         testClientTransfer(t, testClientTransferParams{
196                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
197                         Wrapper: fileCachePieceResourceStorage,
198                 }),
199         })
200 }
201
202 func TestClientTransferDefaultNoMetadata(t *testing.T) {
203         testClientTransfer(t, testClientTransferParams{
204                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
205                         Wrapper: fileCachePieceResourceStorage,
206                 }),
207                 LeecherStartsWithoutMetadata: true,
208         })
209 }
210
211 func TestClientTransferRateLimitedUpload(t *testing.T) {
212         started := time.Now()
213         testClientTransfer(t, testClientTransferParams{
214                 // We are uploading 13 bytes (the length of the greeting torrent). The
215                 // chunks are 2 bytes in length. Then the smallest burst we can run
216                 // with is 2. Time taken is (13-burst)/rate.
217                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
218         })
219         require.True(t, time.Since(started) > time.Second)
220 }
221
222 func TestClientTransferRateLimitedDownload(t *testing.T) {
223         testClientTransfer(t, testClientTransferParams{
224                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
225         })
226 }
227
228 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
229         return struct {
230                 storage.ClientImpl
231                 io.Closer
232         }{
233                 storage.NewResourcePieces(fc.AsResourceProvider()),
234                 ioutil.NopCloser(nil),
235         }
236 }
237
238 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
239         testClientTransfer(t, testClientTransferParams{
240                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
241                         SetCapacity: true,
242                         // Going below the piece length means it can't complete a piece so
243                         // that it can be hashed.
244                         Capacity: 5,
245                         Wrapper:  fileCachePieceResourceStorage,
246                 }),
247                 SetReadahead: setReadahead,
248                 // Can't readahead too far or the cache will thrash and drop data we
249                 // thought we had.
250                 Readahead: readahead,
251
252                 // These tests don't work well with more than 1 connection to the seeder.
253                 ConfigureLeecher: ConfigureClient{
254                         Config: func(cfg *torrent.ClientConfig) {
255                                 cfg.DropDuplicatePeerIds = true
256                                 //cfg.DisableIPv6 = true
257                                 //cfg.DisableUTP = true
258                         },
259                 },
260         })
261 }
262
263 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
264         testClientTransferSmallCache(t, true, 5)
265 }
266
267 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
268         testClientTransferSmallCache(t, true, 15)
269 }
270
271 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
272         testClientTransferSmallCache(t, false, -1)
273 }
274
275 func sqliteClientStorageFactory(connPathMaker func(dataDir string) string) storageFactory {
276         return func(dataDir string) storage.ClientImplCloser {
277                 path := connPathMaker(dataDir)
278                 log.Printf("opening sqlite db at %q", path)
279                 if true {
280                         conn, err := sqlite.OpenConn(path, 0)
281                         if err != nil {
282                                 panic(err)
283                         }
284                         prov, err := sqliteStorage.NewProvider(conn)
285                         if err != nil {
286                                 panic(err)
287                         }
288                         return struct {
289                                 storage.ClientImpl
290                                 io.Closer
291                         }{
292                                 storage.NewResourcePieces(prov),
293                                 conn,
294                         }
295                 } else {
296                         // Test pool implementation for SQLITE_BUSY when we want SQLITE_LOCKED (so the
297                         // crawshaw.io/sqlite unlock notify handler kicks in for us).
298                         const poolSize = 1
299                         pool, err := sqlitex.Open(path, 0, poolSize)
300                         if err != nil {
301                                 panic(err)
302                         }
303                         prov, err := sqliteStorage.NewProviderPool(pool, poolSize, false)
304                         if err != nil {
305                                 panic(err)
306                         }
307                         return struct {
308                                 storage.ClientImpl
309                                 io.Closer
310                         }{
311                                 storage.NewResourcePieces(prov),
312                                 pool,
313                         }
314                 }
315         }
316 }
317
318 func TestClientTransferVarious(t *testing.T) {
319         // Leecher storage
320         for _, ls := range []struct {
321                 name string
322                 f    storageFactory
323         }{
324                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
325                         Wrapper: fileCachePieceResourceStorage,
326                 })},
327                 {"Boltdb", storage.NewBoltDB},
328                 {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) string {
329                         return "file:" + filepath.Join(dataDir, "sqlite.db")
330                 })},
331                 {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) string {
332                         return "file:memory:?mode=memory&cache=shared"
333                 })},
334         } {
335                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
336                         // Seeder storage
337                         for _, ss := range []struct {
338                                 name string
339                                 f    storageFactory
340                         }{
341                                 {"File", storage.NewFile},
342                                 {"Mmap", storage.NewMMap},
343                         } {
344                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
345                                         for _, responsive := range []bool{false, true} {
346                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
347                                                         t.Run("NoReadahead", func(t *testing.T) {
348                                                                 testClientTransfer(t, testClientTransferParams{
349                                                                         Responsive:     responsive,
350                                                                         SeederStorage:  ss.f,
351                                                                         LeecherStorage: ls.f,
352                                                                 })
353                                                         })
354                                                         for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
355                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
356                                                                         testClientTransfer(t, testClientTransferParams{
357                                                                                 SeederStorage:  ss.f,
358                                                                                 Responsive:     responsive,
359                                                                                 SetReadahead:   true,
360                                                                                 Readahead:      readahead,
361                                                                                 LeecherStorage: ls.f,
362                                                                         })
363                                                                 })
364                                                         }
365                                                 })
366                                         }
367                                 })
368                         }
369                 })
370         }
371 }
372
373 // Check that after completing leeching, a leecher transitions to a seeding
374 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
375 func TestSeedAfterDownloading(t *testing.T) {
376         greetingTempDir, mi := testutil.GreetingTestTorrent()
377         defer os.RemoveAll(greetingTempDir)
378
379         cfg := torrent.TestingConfig()
380         cfg.Seed = true
381         cfg.DataDir = greetingTempDir
382         seeder, err := torrent.NewClient(cfg)
383         require.NoError(t, err)
384         defer seeder.Close()
385         defer testutil.ExportStatusWriter(seeder, "s", t)()
386         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
387         require.NoError(t, err)
388         assert.True(t, ok)
389         seederTorrent.VerifyData()
390
391         cfg = torrent.TestingConfig()
392         cfg.Seed = true
393         cfg.DataDir, err = ioutil.TempDir("", "")
394         require.NoError(t, err)
395         defer os.RemoveAll(cfg.DataDir)
396         leecher, err := torrent.NewClient(cfg)
397         require.NoError(t, err)
398         defer leecher.Close()
399         defer testutil.ExportStatusWriter(leecher, "l", t)()
400
401         cfg = torrent.TestingConfig()
402         cfg.Seed = false
403         cfg.DataDir, err = ioutil.TempDir("", "")
404         require.NoError(t, err)
405         defer os.RemoveAll(cfg.DataDir)
406         leecherLeecher, _ := torrent.NewClient(cfg)
407         require.NoError(t, err)
408         defer leecherLeecher.Close()
409         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
410         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
411                 ret = torrent.TorrentSpecFromMetaInfo(mi)
412                 ret.ChunkSize = 2
413                 return
414         }())
415         require.NoError(t, err)
416         assert.True(t, ok)
417         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
418                 ret = torrent.TorrentSpecFromMetaInfo(mi)
419                 ret.ChunkSize = 3
420                 return
421         }())
422         require.NoError(t, err)
423         assert.True(t, ok)
424         // Simultaneously DownloadAll in Leecher, and read the contents
425         // consecutively in LeecherLeecher. This non-deterministically triggered a
426         // case where the leecher wouldn't unchoke the LeecherLeecher.
427         var wg sync.WaitGroup
428         wg.Add(1)
429         go func() {
430                 defer wg.Done()
431                 r := llg.NewReader()
432                 defer r.Close()
433                 b, err := ioutil.ReadAll(r)
434                 require.NoError(t, err)
435                 assert.EqualValues(t, testutil.GreetingFileContents, b)
436         }()
437         done := make(chan struct{})
438         defer close(done)
439         go leecherGreeting.AddClientPeer(seeder)
440         go leecherGreeting.AddClientPeer(leecherLeecher)
441         wg.Add(1)
442         go func() {
443                 defer wg.Done()
444                 leecherGreeting.DownloadAll()
445                 leecher.WaitAll()
446         }()
447         wg.Wait()
448 }
449
450 type ConfigureClient struct {
451         Config func(*torrent.ClientConfig)
452         Client func(*torrent.Client)
453 }