]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Merge branch 'sqlite-direct'
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "os"
8         "path/filepath"
9         "runtime"
10         "sync"
11         "testing"
12         "testing/iotest"
13         "time"
14
15         "github.com/anacrolix/missinggo/v2/filecache"
16         "github.com/anacrolix/torrent"
17         "github.com/anacrolix/torrent/internal/testutil"
18         "github.com/anacrolix/torrent/storage"
19         sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
20         "github.com/frankban/quicktest"
21         "golang.org/x/time/rate"
22
23         "github.com/stretchr/testify/assert"
24         "github.com/stretchr/testify/require"
25 )
26
27 type testClientTransferParams struct {
28         Responsive                 bool
29         Readahead                  int64
30         SetReadahead               bool
31         LeecherStorage             func(string) storage.ClientImplCloser
32         SeederStorage              func(string) storage.ClientImplCloser
33         SeederUploadRateLimiter    *rate.Limiter
34         LeecherDownloadRateLimiter *rate.Limiter
35         ConfigureSeeder            ConfigureClient
36         ConfigureLeecher           ConfigureClient
37         GOMAXPROCS                 int
38
39         LeecherStartsWithoutMetadata bool
40 }
41
42 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
43         pos, err := r.Seek(0, io.SeekStart)
44         assert.NoError(t, err)
45         assert.EqualValues(t, 0, pos)
46         quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
47 }
48
49 // Creates a seeder and a leecher, and ensures the data transfers when a read
50 // is attempted on the leecher.
51 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
52
53         prevGOMAXPROCS := runtime.GOMAXPROCS(ps.GOMAXPROCS)
54         newGOMAXPROCS := prevGOMAXPROCS
55         if ps.GOMAXPROCS > 0 {
56                 newGOMAXPROCS = ps.GOMAXPROCS
57         }
58         defer func() {
59                 quicktest.Check(t, runtime.GOMAXPROCS(prevGOMAXPROCS), quicktest.ContentEquals, newGOMAXPROCS)
60         }()
61
62         greetingTempDir, mi := testutil.GreetingTestTorrent()
63         defer os.RemoveAll(greetingTempDir)
64         // Create seeder and a Torrent.
65         cfg := torrent.TestingConfig(t)
66         cfg.Seed = true
67         // Some test instances don't like this being on, even when there's no cache involved.
68         cfg.DropMutuallyCompletePeers = false
69         if ps.SeederUploadRateLimiter != nil {
70                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
71         }
72         // cfg.ListenAddr = "localhost:4000"
73         if ps.SeederStorage != nil {
74                 storage := ps.SeederStorage(greetingTempDir)
75                 defer storage.Close()
76                 cfg.DefaultStorage = storage
77         } else {
78                 cfg.DataDir = greetingTempDir
79         }
80         if ps.ConfigureSeeder.Config != nil {
81                 ps.ConfigureSeeder.Config(cfg)
82         }
83         seeder, err := torrent.NewClient(cfg)
84         require.NoError(t, err)
85         if ps.ConfigureSeeder.Client != nil {
86                 ps.ConfigureSeeder.Client(seeder)
87         }
88         defer testutil.ExportStatusWriter(seeder, "s", t)()
89         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
90         // Run a Stats right after Closing the Client. This will trigger the Stats
91         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
92         defer seederTorrent.Stats()
93         defer seeder.Close()
94         seederTorrent.VerifyData()
95         // Create leecher and a Torrent.
96         leecherDataDir, err := ioutil.TempDir("", "")
97         require.NoError(t, err)
98         defer os.RemoveAll(leecherDataDir)
99         cfg = torrent.TestingConfig(t)
100         // See the seeder client config comment.
101         cfg.DropMutuallyCompletePeers = false
102         if ps.LeecherStorage == nil {
103                 cfg.DataDir = leecherDataDir
104         } else {
105                 storage := ps.LeecherStorage(leecherDataDir)
106                 defer storage.Close()
107                 cfg.DefaultStorage = storage
108         }
109         if ps.LeecherDownloadRateLimiter != nil {
110                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
111         }
112         cfg.Seed = false
113         //cfg.Debug = true
114         if ps.ConfigureLeecher.Config != nil {
115                 ps.ConfigureLeecher.Config(cfg)
116         }
117         leecher, err := torrent.NewClient(cfg)
118         require.NoError(t, err)
119         defer leecher.Close()
120         if ps.ConfigureLeecher.Client != nil {
121                 ps.ConfigureLeecher.Client(leecher)
122         }
123         defer testutil.ExportStatusWriter(leecher, "l", t)()
124         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
125                 ret = torrent.TorrentSpecFromMetaInfo(mi)
126                 ret.ChunkSize = 2
127                 if ps.LeecherStartsWithoutMetadata {
128                         ret.InfoBytes = nil
129                 }
130                 return
131         }())
132         require.NoError(t, err)
133         assert.True(t, new)
134
135         //// This was used when observing coalescing of piece state changes.
136         //logPieceStateChanges(leecherTorrent)
137
138         // Now do some things with leecher and seeder.
139         added := leecherTorrent.AddClientPeer(seeder)
140         assert.False(t, leecherTorrent.Seeding())
141         // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
142         // should be sitting idle until we demand data.
143         if !ps.LeecherStartsWithoutMetadata {
144                 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
145         }
146         if ps.LeecherStartsWithoutMetadata {
147                 <-leecherTorrent.GotInfo()
148         }
149         r := leecherTorrent.NewReader()
150         defer r.Close()
151         go leecherTorrent.SetInfoBytes(mi.InfoBytes)
152         if ps.Responsive {
153                 r.SetResponsive()
154         }
155         if ps.SetReadahead {
156                 r.SetReadahead(ps.Readahead)
157         }
158         assertReadAllGreeting(t, r)
159         assert.NotEmpty(t, seederTorrent.PeerConns())
160         leecherPeerConns := leecherTorrent.PeerConns()
161         if cfg.DropMutuallyCompletePeers {
162                 // I don't think we can assume it will be empty already, due to timing.
163                 //assert.Empty(t, leecherPeerConns)
164         } else {
165                 assert.NotEmpty(t, leecherPeerConns)
166         }
167         foundSeeder := false
168         for _, pc := range leecherPeerConns {
169                 completed := pc.PeerPieces().Len()
170                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
171                 if completed == leecherTorrent.Info().NumPieces() {
172                         foundSeeder = true
173                 }
174         }
175         if !foundSeeder {
176                 t.Errorf("didn't find seeder amongst leecher peer conns")
177         }
178
179         seederStats := seederTorrent.Stats()
180         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
181         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
182
183         leecherStats := leecherTorrent.Stats()
184         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
185         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
186
187         // Try reading through again for the cases where the torrent data size
188         // exceeds the size of the cache.
189         assertReadAllGreeting(t, r)
190 }
191
192 type fileCacheClientStorageFactoryParams struct {
193         Capacity    int64
194         SetCapacity bool
195         Wrapper     func(*filecache.Cache) storage.ClientImplCloser
196 }
197
198 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
199         return func(dataDir string) storage.ClientImplCloser {
200                 fc, err := filecache.NewCache(dataDir)
201                 if err != nil {
202                         panic(err)
203                 }
204                 if ps.SetCapacity {
205                         fc.SetCapacity(ps.Capacity)
206                 }
207                 return ps.Wrapper(fc)
208         }
209 }
210
211 type storageFactory func(string) storage.ClientImplCloser
212
213 func TestClientTransferDefault(t *testing.T) {
214         testClientTransfer(t, testClientTransferParams{
215                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
216                         Wrapper: fileCachePieceResourceStorage,
217                 }),
218         })
219 }
220
221 func TestClientTransferDefaultNoMetadata(t *testing.T) {
222         testClientTransfer(t, testClientTransferParams{
223                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
224                         Wrapper: fileCachePieceResourceStorage,
225                 }),
226                 LeecherStartsWithoutMetadata: true,
227         })
228 }
229
230 func TestClientTransferRateLimitedUpload(t *testing.T) {
231         started := time.Now()
232         testClientTransfer(t, testClientTransferParams{
233                 // We are uploading 13 bytes (the length of the greeting torrent). The
234                 // chunks are 2 bytes in length. Then the smallest burst we can run
235                 // with is 2. Time taken is (13-burst)/rate.
236                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
237         })
238         require.True(t, time.Since(started) > time.Second)
239 }
240
241 func TestClientTransferRateLimitedDownload(t *testing.T) {
242         testClientTransfer(t, testClientTransferParams{
243                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
244         })
245 }
246
247 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
248         return struct {
249                 storage.ClientImpl
250                 io.Closer
251         }{
252                 storage.NewResourcePieces(fc.AsResourceProvider()),
253                 ioutil.NopCloser(nil),
254         }
255 }
256
257 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
258         testClientTransfer(t, testClientTransferParams{
259                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
260                         SetCapacity: true,
261                         // Going below the piece length means it can't complete a piece so
262                         // that it can be hashed.
263                         Capacity: 5,
264                         Wrapper:  fileCachePieceResourceStorage,
265                 }),
266                 SetReadahead: setReadahead,
267                 // Can't readahead too far or the cache will thrash and drop data we
268                 // thought we had.
269                 Readahead: readahead,
270
271                 // These tests don't work well with more than 1 connection to the seeder.
272                 ConfigureLeecher: ConfigureClient{
273                         Config: func(cfg *torrent.ClientConfig) {
274                                 cfg.DropDuplicatePeerIds = true
275                                 //cfg.DisableIPv6 = true
276                                 //cfg.DisableUTP = true
277                         },
278                 },
279         })
280 }
281
282 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
283         testClientTransferSmallCache(t, true, 5)
284 }
285
286 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
287         testClientTransferSmallCache(t, true, 15)
288 }
289
290 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
291         testClientTransferSmallCache(t, false, -1)
292 }
293
294 func sqliteClientStorageFactory(optsMaker func(dataDir string) sqliteStorage.NewPiecesStorageOpts) storageFactory {
295         return func(dataDir string) storage.ClientImplCloser {
296                 opts := optsMaker(dataDir)
297                 //log.Printf("opening sqlite db: %#v", opts)
298                 ret, err := sqliteStorage.NewPiecesStorage(opts)
299                 if err != nil {
300                         panic(err)
301                 }
302                 return ret
303         }
304 }
305
306 type leecherStorageTestCase struct {
307         name       string
308         f          storageFactory
309         gomaxprocs int
310 }
311
312 func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase {
313         return leecherStorageTestCase{
314                 fmt.Sprintf("SqliteFile,NumConns=%v", numConns),
315                 sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) {
316                         opts.Path = filepath.Join(dataDir, "sqlite.db")
317                         opts.NumConns = numConns
318                         return
319                 }),
320                 numConns,
321         }
322 }
323
324 func TestClientTransferVarious(t *testing.T) {
325         // Leecher storage
326         for _, ls := range []leecherStorageTestCase{
327                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
328                         Wrapper: fileCachePieceResourceStorage,
329                 }), 0},
330                 {"Boltdb", storage.NewBoltDB, 0},
331                 {"SqliteDirect", func(s string) storage.ClientImplCloser {
332                         path := filepath.Join(s, "sqlite3.db")
333                         var opts sqliteStorage.NewDirectStorageOpts
334                         opts.Path = path
335                         cl, err := sqliteStorage.NewDirectStorage(opts)
336                         if err != nil {
337                                 panic(err)
338                         }
339                         return cl
340                 }, 0},
341                 sqliteLeecherStorageTestCase(1),
342                 sqliteLeecherStorageTestCase(2),
343                 // This should use a number of connections equal to the number of CPUs
344                 sqliteLeecherStorageTestCase(0),
345                 {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) {
346                         opts.Memory = true
347                         return
348                 }), 0},
349         } {
350                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
351                         // Seeder storage
352                         for _, ss := range []struct {
353                                 name string
354                                 f    storageFactory
355                         }{
356                                 {"File", storage.NewFile},
357                                 {"Mmap", storage.NewMMap},
358                         } {
359                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
360                                         for _, responsive := range []bool{false, true} {
361                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
362                                                         t.Run("NoReadahead", func(t *testing.T) {
363                                                                 testClientTransfer(t, testClientTransferParams{
364                                                                         Responsive:     responsive,
365                                                                         SeederStorage:  ss.f,
366                                                                         LeecherStorage: ls.f,
367                                                                         GOMAXPROCS:     ls.gomaxprocs,
368                                                                 })
369                                                         })
370                                                         for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} {
371                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
372                                                                         testClientTransfer(t, testClientTransferParams{
373                                                                                 SeederStorage:  ss.f,
374                                                                                 Responsive:     responsive,
375                                                                                 SetReadahead:   true,
376                                                                                 Readahead:      readahead,
377                                                                                 LeecherStorage: ls.f,
378                                                                                 GOMAXPROCS:     ls.gomaxprocs,
379                                                                         })
380                                                                 })
381                                                         }
382                                                 })
383                                         }
384                                 })
385                         }
386                 })
387         }
388 }
389
390 // Check that after completing leeching, a leecher transitions to a seeding
391 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
392 func TestSeedAfterDownloading(t *testing.T) {
393         greetingTempDir, mi := testutil.GreetingTestTorrent()
394         defer os.RemoveAll(greetingTempDir)
395
396         cfg := torrent.TestingConfig(t)
397         cfg.Seed = true
398         cfg.DataDir = greetingTempDir
399         seeder, err := torrent.NewClient(cfg)
400         require.NoError(t, err)
401         defer seeder.Close()
402         defer testutil.ExportStatusWriter(seeder, "s", t)()
403         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
404         require.NoError(t, err)
405         assert.True(t, ok)
406         seederTorrent.VerifyData()
407
408         cfg = torrent.TestingConfig(t)
409         cfg.Seed = true
410         cfg.DataDir, err = ioutil.TempDir("", "")
411         require.NoError(t, err)
412         defer os.RemoveAll(cfg.DataDir)
413         leecher, err := torrent.NewClient(cfg)
414         require.NoError(t, err)
415         defer leecher.Close()
416         defer testutil.ExportStatusWriter(leecher, "l", t)()
417
418         cfg = torrent.TestingConfig(t)
419         cfg.Seed = false
420         cfg.DataDir, err = ioutil.TempDir("", "")
421         require.NoError(t, err)
422         defer os.RemoveAll(cfg.DataDir)
423         leecherLeecher, _ := torrent.NewClient(cfg)
424         require.NoError(t, err)
425         defer leecherLeecher.Close()
426         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
427         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
428                 ret = torrent.TorrentSpecFromMetaInfo(mi)
429                 ret.ChunkSize = 2
430                 return
431         }())
432         require.NoError(t, err)
433         assert.True(t, ok)
434         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
435                 ret = torrent.TorrentSpecFromMetaInfo(mi)
436                 ret.ChunkSize = 3
437                 return
438         }())
439         require.NoError(t, err)
440         assert.True(t, ok)
441         // Simultaneously DownloadAll in Leecher, and read the contents
442         // consecutively in LeecherLeecher. This non-deterministically triggered a
443         // case where the leecher wouldn't unchoke the LeecherLeecher.
444         var wg sync.WaitGroup
445         wg.Add(1)
446         go func() {
447                 defer wg.Done()
448                 r := llg.NewReader()
449                 defer r.Close()
450                 quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
451         }()
452         done := make(chan struct{})
453         defer close(done)
454         go leecherGreeting.AddClientPeer(seeder)
455         go leecherGreeting.AddClientPeer(leecherLeecher)
456         wg.Add(1)
457         go func() {
458                 defer wg.Done()
459                 leecherGreeting.DownloadAll()
460                 leecher.WaitAll()
461         }()
462         wg.Wait()
463 }
464
465 type ConfigureClient struct {
466         Config func(*torrent.ClientConfig)
467         Client func(*torrent.Client)
468 }