]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Implement sqlite directly without using piece resources
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "log"
8         "os"
9         "path/filepath"
10         "runtime"
11         "sync"
12         "testing"
13         "testing/iotest"
14         "time"
15
16         "github.com/anacrolix/missinggo/v2/filecache"
17         "github.com/anacrolix/torrent"
18         "github.com/anacrolix/torrent/internal/testutil"
19         "github.com/anacrolix/torrent/storage"
20         sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
21         "github.com/frankban/quicktest"
22         "golang.org/x/time/rate"
23
24         "github.com/stretchr/testify/assert"
25         "github.com/stretchr/testify/require"
26 )
27
28 type testClientTransferParams struct {
29         Responsive                 bool
30         Readahead                  int64
31         SetReadahead               bool
32         LeecherStorage             func(string) storage.ClientImplCloser
33         SeederStorage              func(string) storage.ClientImplCloser
34         SeederUploadRateLimiter    *rate.Limiter
35         LeecherDownloadRateLimiter *rate.Limiter
36         ConfigureSeeder            ConfigureClient
37         ConfigureLeecher           ConfigureClient
38         GOMAXPROCS                 int
39
40         LeecherStartsWithoutMetadata bool
41 }
42
43 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
44         pos, err := r.Seek(0, io.SeekStart)
45         assert.NoError(t, err)
46         assert.EqualValues(t, 0, pos)
47         quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
48 }
49
50 // Creates a seeder and a leecher, and ensures the data transfers when a read
51 // is attempted on the leecher.
52 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
53
54         prevGOMAXPROCS := runtime.GOMAXPROCS(ps.GOMAXPROCS)
55         newGOMAXPROCS := prevGOMAXPROCS
56         if ps.GOMAXPROCS > 0 {
57                 newGOMAXPROCS = ps.GOMAXPROCS
58         }
59         defer func() {
60                 quicktest.Check(t, runtime.GOMAXPROCS(prevGOMAXPROCS), quicktest.ContentEquals, newGOMAXPROCS)
61         }()
62
63         greetingTempDir, mi := testutil.GreetingTestTorrent()
64         defer os.RemoveAll(greetingTempDir)
65         // Create seeder and a Torrent.
66         cfg := torrent.TestingConfig(t)
67         cfg.Seed = true
68         // Some test instances don't like this being on, even when there's no cache involved.
69         cfg.DropMutuallyCompletePeers = false
70         if ps.SeederUploadRateLimiter != nil {
71                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
72         }
73         // cfg.ListenAddr = "localhost:4000"
74         if ps.SeederStorage != nil {
75                 storage := ps.SeederStorage(greetingTempDir)
76                 defer storage.Close()
77                 cfg.DefaultStorage = storage
78         } else {
79                 cfg.DataDir = greetingTempDir
80         }
81         if ps.ConfigureSeeder.Config != nil {
82                 ps.ConfigureSeeder.Config(cfg)
83         }
84         seeder, err := torrent.NewClient(cfg)
85         require.NoError(t, err)
86         if ps.ConfigureSeeder.Client != nil {
87                 ps.ConfigureSeeder.Client(seeder)
88         }
89         defer testutil.ExportStatusWriter(seeder, "s", t)()
90         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
91         // Run a Stats right after Closing the Client. This will trigger the Stats
92         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
93         defer seederTorrent.Stats()
94         defer seeder.Close()
95         seederTorrent.VerifyData()
96         // Create leecher and a Torrent.
97         leecherDataDir, err := ioutil.TempDir("", "")
98         require.NoError(t, err)
99         defer os.RemoveAll(leecherDataDir)
100         cfg = torrent.TestingConfig(t)
101         // See the seeder client config comment.
102         cfg.DropMutuallyCompletePeers = false
103         if ps.LeecherStorage == nil {
104                 cfg.DataDir = leecherDataDir
105         } else {
106                 storage := ps.LeecherStorage(leecherDataDir)
107                 defer storage.Close()
108                 cfg.DefaultStorage = storage
109         }
110         if ps.LeecherDownloadRateLimiter != nil {
111                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
112         }
113         cfg.Seed = false
114         cfg.Debug = true
115         if ps.ConfigureLeecher.Config != nil {
116                 ps.ConfigureLeecher.Config(cfg)
117         }
118         leecher, err := torrent.NewClient(cfg)
119         require.NoError(t, err)
120         defer leecher.Close()
121         if ps.ConfigureLeecher.Client != nil {
122                 ps.ConfigureLeecher.Client(leecher)
123         }
124         defer testutil.ExportStatusWriter(leecher, "l", t)()
125         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
126                 ret = torrent.TorrentSpecFromMetaInfo(mi)
127                 ret.ChunkSize = 2
128                 if ps.LeecherStartsWithoutMetadata {
129                         ret.InfoBytes = nil
130                 }
131                 return
132         }())
133         require.NoError(t, err)
134         assert.True(t, new)
135
136         //// This was used when observing coalescing of piece state changes.
137         //logPieceStateChanges(leecherTorrent)
138
139         // Now do some things with leecher and seeder.
140         added := leecherTorrent.AddClientPeer(seeder)
141         assert.False(t, leecherTorrent.Seeding())
142         // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
143         // should be sitting idle until we demand data.
144         if !ps.LeecherStartsWithoutMetadata {
145                 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
146         }
147         if ps.LeecherStartsWithoutMetadata {
148                 <-leecherTorrent.GotInfo()
149         }
150         r := leecherTorrent.NewReader()
151         defer r.Close()
152         go leecherTorrent.SetInfoBytes(mi.InfoBytes)
153         if ps.Responsive {
154                 r.SetResponsive()
155         }
156         if ps.SetReadahead {
157                 r.SetReadahead(ps.Readahead)
158         }
159         assertReadAllGreeting(t, r)
160         assert.NotEmpty(t, seederTorrent.PeerConns())
161         leecherPeerConns := leecherTorrent.PeerConns()
162         if cfg.DropMutuallyCompletePeers {
163                 // I don't think we can assume it will be empty already, due to timing.
164                 //assert.Empty(t, leecherPeerConns)
165         } else {
166                 assert.NotEmpty(t, leecherPeerConns)
167         }
168         foundSeeder := false
169         for _, pc := range leecherPeerConns {
170                 completed := pc.PeerPieces().Len()
171                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
172                 if completed == leecherTorrent.Info().NumPieces() {
173                         foundSeeder = true
174                 }
175         }
176         if !foundSeeder {
177                 t.Errorf("didn't find seeder amongst leecher peer conns")
178         }
179
180         seederStats := seederTorrent.Stats()
181         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
182         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
183
184         leecherStats := leecherTorrent.Stats()
185         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
186         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
187
188         // Try reading through again for the cases where the torrent data size
189         // exceeds the size of the cache.
190         assertReadAllGreeting(t, r)
191 }
192
193 type fileCacheClientStorageFactoryParams struct {
194         Capacity    int64
195         SetCapacity bool
196         Wrapper     func(*filecache.Cache) storage.ClientImplCloser
197 }
198
199 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
200         return func(dataDir string) storage.ClientImplCloser {
201                 fc, err := filecache.NewCache(dataDir)
202                 if err != nil {
203                         panic(err)
204                 }
205                 if ps.SetCapacity {
206                         fc.SetCapacity(ps.Capacity)
207                 }
208                 return ps.Wrapper(fc)
209         }
210 }
211
212 type storageFactory func(string) storage.ClientImplCloser
213
214 func TestClientTransferDefault(t *testing.T) {
215         testClientTransfer(t, testClientTransferParams{
216                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
217                         Wrapper: fileCachePieceResourceStorage,
218                 }),
219         })
220 }
221
222 func TestClientTransferDefaultNoMetadata(t *testing.T) {
223         testClientTransfer(t, testClientTransferParams{
224                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
225                         Wrapper: fileCachePieceResourceStorage,
226                 }),
227                 LeecherStartsWithoutMetadata: true,
228         })
229 }
230
231 func TestClientTransferRateLimitedUpload(t *testing.T) {
232         started := time.Now()
233         testClientTransfer(t, testClientTransferParams{
234                 // We are uploading 13 bytes (the length of the greeting torrent). The
235                 // chunks are 2 bytes in length. Then the smallest burst we can run
236                 // with is 2. Time taken is (13-burst)/rate.
237                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
238         })
239         require.True(t, time.Since(started) > time.Second)
240 }
241
242 func TestClientTransferRateLimitedDownload(t *testing.T) {
243         testClientTransfer(t, testClientTransferParams{
244                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
245         })
246 }
247
248 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
249         return struct {
250                 storage.ClientImpl
251                 io.Closer
252         }{
253                 storage.NewResourcePieces(fc.AsResourceProvider()),
254                 ioutil.NopCloser(nil),
255         }
256 }
257
258 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
259         testClientTransfer(t, testClientTransferParams{
260                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
261                         SetCapacity: true,
262                         // Going below the piece length means it can't complete a piece so
263                         // that it can be hashed.
264                         Capacity: 5,
265                         Wrapper:  fileCachePieceResourceStorage,
266                 }),
267                 SetReadahead: setReadahead,
268                 // Can't readahead too far or the cache will thrash and drop data we
269                 // thought we had.
270                 Readahead: readahead,
271
272                 // These tests don't work well with more than 1 connection to the seeder.
273                 ConfigureLeecher: ConfigureClient{
274                         Config: func(cfg *torrent.ClientConfig) {
275                                 cfg.DropDuplicatePeerIds = true
276                                 //cfg.DisableIPv6 = true
277                                 //cfg.DisableUTP = true
278                         },
279                 },
280         })
281 }
282
283 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
284         testClientTransferSmallCache(t, true, 5)
285 }
286
287 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
288         testClientTransferSmallCache(t, true, 15)
289 }
290
291 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
292         testClientTransferSmallCache(t, false, -1)
293 }
294
295 func sqliteClientStorageFactory(optsMaker func(dataDir string) sqliteStorage.NewPiecesStorageOpts) storageFactory {
296         return func(dataDir string) storage.ClientImplCloser {
297                 opts := optsMaker(dataDir)
298                 //log.Printf("opening sqlite db: %#v", opts)
299                 ret, err := sqliteStorage.NewPiecesStorage(opts)
300                 if err != nil {
301                         panic(err)
302                 }
303                 return ret
304         }
305 }
306
307 type leecherStorageTestCase struct {
308         name       string
309         f          storageFactory
310         gomaxprocs int
311 }
312
313 func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase {
314         return leecherStorageTestCase{
315                 fmt.Sprintf("SqliteFile,NumConns=%v", numConns),
316                 sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts {
317                         return sqliteStorage.NewPiecesStorageOpts{
318                                 NewPoolOpts: sqliteStorage.NewPoolOpts{
319                                         Path:     filepath.Join(dataDir, "sqlite.db"),
320                                         NumConns: numConns,
321                                 },
322                         }
323                 }),
324                 numConns,
325         }
326 }
327
328 func TestClientTransferVarious(t *testing.T) {
329         // Leecher storage
330         for _, ls := range []leecherStorageTestCase{
331                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
332                         Wrapper: fileCachePieceResourceStorage,
333                 }), 0},
334                 {"Boltdb", storage.NewBoltDB, 0},
335                 {"SqliteDirect", func(s string) storage.ClientImplCloser {
336                         path := filepath.Join(s, "sqlite3.db")
337                         log.Print(path)
338                         cl, err := sqliteStorage.NewDirectStorage(sqliteStorage.NewDirectStorageOpts{
339                                 NewPoolOpts: sqliteStorage.NewPoolOpts{
340                                         Path: path,
341                                 },
342                                 ProvOpts: nil,
343                         })
344                         if err != nil {
345                                 panic(err)
346                         }
347                         return cl
348                 }, 0},
349                 sqliteLeecherStorageTestCase(1),
350                 sqliteLeecherStorageTestCase(2),
351                 // This should use a number of connections equal to the number of CPUs
352                 sqliteLeecherStorageTestCase(0),
353                 {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts {
354                         return sqliteStorage.NewPiecesStorageOpts{
355                                 NewPoolOpts: sqliteStorage.NewPoolOpts{
356                                         Memory: true,
357                                 },
358                         }
359                 }), 0},
360         } {
361                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
362                         // Seeder storage
363                         for _, ss := range []struct {
364                                 name string
365                                 f    storageFactory
366                         }{
367                                 {"File", storage.NewFile},
368                                 {"Mmap", storage.NewMMap},
369                         } {
370                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
371                                         for _, responsive := range []bool{false, true} {
372                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
373                                                         t.Run("NoReadahead", func(t *testing.T) {
374                                                                 testClientTransfer(t, testClientTransferParams{
375                                                                         Responsive:     responsive,
376                                                                         SeederStorage:  ss.f,
377                                                                         LeecherStorage: ls.f,
378                                                                         GOMAXPROCS:     ls.gomaxprocs,
379                                                                 })
380                                                         })
381                                                         for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} {
382                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
383                                                                         testClientTransfer(t, testClientTransferParams{
384                                                                                 SeederStorage:  ss.f,
385                                                                                 Responsive:     responsive,
386                                                                                 SetReadahead:   true,
387                                                                                 Readahead:      readahead,
388                                                                                 LeecherStorage: ls.f,
389                                                                                 GOMAXPROCS:     ls.gomaxprocs,
390                                                                         })
391                                                                 })
392                                                         }
393                                                 })
394                                         }
395                                 })
396                         }
397                 })
398         }
399 }
400
401 // Check that after completing leeching, a leecher transitions to a seeding
402 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
403 func TestSeedAfterDownloading(t *testing.T) {
404         greetingTempDir, mi := testutil.GreetingTestTorrent()
405         defer os.RemoveAll(greetingTempDir)
406
407         cfg := torrent.TestingConfig(t)
408         cfg.Seed = true
409         cfg.DataDir = greetingTempDir
410         seeder, err := torrent.NewClient(cfg)
411         require.NoError(t, err)
412         defer seeder.Close()
413         defer testutil.ExportStatusWriter(seeder, "s", t)()
414         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
415         require.NoError(t, err)
416         assert.True(t, ok)
417         seederTorrent.VerifyData()
418
419         cfg = torrent.TestingConfig(t)
420         cfg.Seed = true
421         cfg.DataDir, err = ioutil.TempDir("", "")
422         require.NoError(t, err)
423         defer os.RemoveAll(cfg.DataDir)
424         leecher, err := torrent.NewClient(cfg)
425         require.NoError(t, err)
426         defer leecher.Close()
427         defer testutil.ExportStatusWriter(leecher, "l", t)()
428
429         cfg = torrent.TestingConfig(t)
430         cfg.Seed = false
431         cfg.DataDir, err = ioutil.TempDir("", "")
432         require.NoError(t, err)
433         defer os.RemoveAll(cfg.DataDir)
434         leecherLeecher, _ := torrent.NewClient(cfg)
435         require.NoError(t, err)
436         defer leecherLeecher.Close()
437         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
438         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
439                 ret = torrent.TorrentSpecFromMetaInfo(mi)
440                 ret.ChunkSize = 2
441                 return
442         }())
443         require.NoError(t, err)
444         assert.True(t, ok)
445         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
446                 ret = torrent.TorrentSpecFromMetaInfo(mi)
447                 ret.ChunkSize = 3
448                 return
449         }())
450         require.NoError(t, err)
451         assert.True(t, ok)
452         // Simultaneously DownloadAll in Leecher, and read the contents
453         // consecutively in LeecherLeecher. This non-deterministically triggered a
454         // case where the leecher wouldn't unchoke the LeecherLeecher.
455         var wg sync.WaitGroup
456         wg.Add(1)
457         go func() {
458                 defer wg.Done()
459                 r := llg.NewReader()
460                 defer r.Close()
461                 quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
462         }()
463         done := make(chan struct{})
464         defer close(done)
465         go leecherGreeting.AddClientPeer(seeder)
466         go leecherGreeting.AddClientPeer(leecherLeecher)
467         wg.Add(1)
468         go func() {
469                 defer wg.Done()
470                 leecherGreeting.DownloadAll()
471                 leecher.WaitAll()
472         }()
473         wg.Wait()
474 }
475
476 type ConfigureClient struct {
477         Config func(*torrent.ClientConfig)
478         Client func(*torrent.Client)
479 }