]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Mark client transfer test as parallel
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "os"
8         "path/filepath"
9         "runtime"
10         "sync"
11         "testing"
12         "testing/iotest"
13         "time"
14
15         "github.com/anacrolix/missinggo/v2/bitmap"
16         "github.com/anacrolix/missinggo/v2/filecache"
17         "github.com/anacrolix/torrent"
18         "github.com/anacrolix/torrent/internal/testutil"
19         "github.com/anacrolix/torrent/storage"
20         sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
21         "github.com/frankban/quicktest"
22         "golang.org/x/time/rate"
23
24         "github.com/stretchr/testify/assert"
25         "github.com/stretchr/testify/require"
26 )
27
28 type testClientTransferParams struct {
29         Responsive                 bool
30         Readahead                  int64
31         SetReadahead               bool
32         LeecherStorage             func(string) storage.ClientImplCloser
33         SeederStorage              func(string) storage.ClientImplCloser
34         SeederUploadRateLimiter    *rate.Limiter
35         LeecherDownloadRateLimiter *rate.Limiter
36         ConfigureSeeder            ConfigureClient
37         ConfigureLeecher           ConfigureClient
38         GOMAXPROCS                 int
39
40         LeecherStartsWithoutMetadata bool
41 }
42
43 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
44         pos, err := r.Seek(0, io.SeekStart)
45         assert.NoError(t, err)
46         assert.EqualValues(t, 0, pos)
47         quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
48 }
49
50 // Creates a seeder and a leecher, and ensures the data transfers when a read
51 // is attempted on the leecher.
52 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
53         t.Parallel()
54
55         prevGOMAXPROCS := runtime.GOMAXPROCS(ps.GOMAXPROCS)
56         newGOMAXPROCS := prevGOMAXPROCS
57         if ps.GOMAXPROCS > 0 {
58                 newGOMAXPROCS = ps.GOMAXPROCS
59         }
60         defer func() {
61                 quicktest.Check(t, runtime.GOMAXPROCS(prevGOMAXPROCS), quicktest.ContentEquals, newGOMAXPROCS)
62         }()
63
64         greetingTempDir, mi := testutil.GreetingTestTorrent()
65         defer os.RemoveAll(greetingTempDir)
66         // Create seeder and a Torrent.
67         cfg := torrent.TestingConfig(t)
68         cfg.Seed = true
69         // Some test instances don't like this being on, even when there's no cache involved.
70         cfg.DropMutuallyCompletePeers = false
71         if ps.SeederUploadRateLimiter != nil {
72                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
73         }
74         // cfg.ListenAddr = "localhost:4000"
75         if ps.SeederStorage != nil {
76                 storage := ps.SeederStorage(greetingTempDir)
77                 defer storage.Close()
78                 cfg.DefaultStorage = storage
79         } else {
80                 cfg.DataDir = greetingTempDir
81         }
82         if ps.ConfigureSeeder.Config != nil {
83                 ps.ConfigureSeeder.Config(cfg)
84         }
85         seeder, err := torrent.NewClient(cfg)
86         require.NoError(t, err)
87         if ps.ConfigureSeeder.Client != nil {
88                 ps.ConfigureSeeder.Client(seeder)
89         }
90         defer testutil.ExportStatusWriter(seeder, "s", t)()
91         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
92         // Run a Stats right after Closing the Client. This will trigger the Stats
93         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
94         defer seederTorrent.Stats()
95         defer seeder.Close()
96         seederTorrent.VerifyData()
97         // Create leecher and a Torrent.
98         leecherDataDir, err := ioutil.TempDir("", "")
99         require.NoError(t, err)
100         defer os.RemoveAll(leecherDataDir)
101         cfg = torrent.TestingConfig(t)
102         // See the seeder client config comment.
103         cfg.DropMutuallyCompletePeers = false
104         if ps.LeecherStorage == nil {
105                 cfg.DataDir = leecherDataDir
106         } else {
107                 storage := ps.LeecherStorage(leecherDataDir)
108                 defer storage.Close()
109                 cfg.DefaultStorage = storage
110         }
111         if ps.LeecherDownloadRateLimiter != nil {
112                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
113         }
114         cfg.Seed = false
115         //cfg.Debug = true
116         if ps.ConfigureLeecher.Config != nil {
117                 ps.ConfigureLeecher.Config(cfg)
118         }
119         leecher, err := torrent.NewClient(cfg)
120         require.NoError(t, err)
121         defer leecher.Close()
122         if ps.ConfigureLeecher.Client != nil {
123                 ps.ConfigureLeecher.Client(leecher)
124         }
125         defer testutil.ExportStatusWriter(leecher, "l", t)()
126         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
127                 ret = torrent.TorrentSpecFromMetaInfo(mi)
128                 ret.ChunkSize = 2
129                 if ps.LeecherStartsWithoutMetadata {
130                         ret.InfoBytes = nil
131                 }
132                 return
133         }())
134         require.NoError(t, err)
135         assert.True(t, new)
136
137         //// This was used when observing coalescing of piece state changes.
138         //logPieceStateChanges(leecherTorrent)
139
140         // Now do some things with leecher and seeder.
141         added := leecherTorrent.AddClientPeer(seeder)
142         assert.False(t, leecherTorrent.Seeding())
143         // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
144         // should be sitting idle until we demand data.
145         if !ps.LeecherStartsWithoutMetadata {
146                 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
147         }
148         if ps.LeecherStartsWithoutMetadata {
149                 <-leecherTorrent.GotInfo()
150         }
151         r := leecherTorrent.NewReader()
152         defer r.Close()
153         go leecherTorrent.SetInfoBytes(mi.InfoBytes)
154         if ps.Responsive {
155                 r.SetResponsive()
156         }
157         if ps.SetReadahead {
158                 r.SetReadahead(ps.Readahead)
159         }
160         assertReadAllGreeting(t, r)
161         assert.NotEmpty(t, seederTorrent.PeerConns())
162         leecherPeerConns := leecherTorrent.PeerConns()
163         if cfg.DropMutuallyCompletePeers {
164                 // I don't think we can assume it will be empty already, due to timing.
165                 //assert.Empty(t, leecherPeerConns)
166         } else {
167                 assert.NotEmpty(t, leecherPeerConns)
168         }
169         foundSeeder := false
170         for _, pc := range leecherPeerConns {
171                 completed := pc.PeerPieces().Len()
172                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
173                 if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) {
174                         foundSeeder = true
175                 }
176         }
177         if !foundSeeder {
178                 t.Errorf("didn't find seeder amongst leecher peer conns")
179         }
180
181         seederStats := seederTorrent.Stats()
182         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
183         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
184
185         leecherStats := leecherTorrent.Stats()
186         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
187         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
188
189         // Try reading through again for the cases where the torrent data size
190         // exceeds the size of the cache.
191         assertReadAllGreeting(t, r)
192 }
193
194 type fileCacheClientStorageFactoryParams struct {
195         Capacity    int64
196         SetCapacity bool
197 }
198
199 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
200         return func(dataDir string) storage.ClientImplCloser {
201                 fc, err := filecache.NewCache(dataDir)
202                 if err != nil {
203                         panic(err)
204                 }
205                 var sharedCapacity *int64
206                 if ps.SetCapacity {
207                         sharedCapacity = &ps.Capacity
208                         fc.SetCapacity(ps.Capacity)
209                 }
210                 return struct {
211                         storage.ClientImpl
212                         io.Closer
213                 }{
214                         storage.NewResourcePiecesOpts(
215                                 fc.AsResourceProvider(),
216                                 storage.ResourcePiecesOpts{
217                                         Capacity: sharedCapacity,
218                                 }),
219                         ioutil.NopCloser(nil),
220                 }
221         }
222 }
223
224 type storageFactory func(string) storage.ClientImplCloser
225
226 func TestClientTransferDefault(t *testing.T) {
227         testClientTransfer(t, testClientTransferParams{
228                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
229         })
230 }
231
232 func TestClientTransferDefaultNoMetadata(t *testing.T) {
233         testClientTransfer(t, testClientTransferParams{
234                 LeecherStorage:               newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
235                 LeecherStartsWithoutMetadata: true,
236         })
237 }
238
239 func TestClientTransferRateLimitedUpload(t *testing.T) {
240         started := time.Now()
241         testClientTransfer(t, testClientTransferParams{
242                 // We are uploading 13 bytes (the length of the greeting torrent). The
243                 // chunks are 2 bytes in length. Then the smallest burst we can run
244                 // with is 2. Time taken is (13-burst)/rate.
245                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
246         })
247         require.True(t, time.Since(started) > time.Second)
248 }
249
250 func TestClientTransferRateLimitedDownload(t *testing.T) {
251         testClientTransfer(t, testClientTransferParams{
252                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
253         })
254 }
255
256 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
257         testClientTransfer(t, testClientTransferParams{
258                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
259                         SetCapacity: true,
260                         // Going below the piece length means it can't complete a piece so
261                         // that it can be hashed.
262                         Capacity: 5,
263                 }),
264                 SetReadahead: setReadahead,
265                 // Can't readahead too far or the cache will thrash and drop data we
266                 // thought we had.
267                 Readahead: readahead,
268
269                 // These tests don't work well with more than 1 connection to the seeder.
270                 ConfigureLeecher: ConfigureClient{
271                         Config: func(cfg *torrent.ClientConfig) {
272                                 cfg.DropDuplicatePeerIds = true
273                                 //cfg.DisableIPv6 = true
274                                 //cfg.DisableUTP = true
275                         },
276                 },
277         })
278 }
279
280 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
281         testClientTransferSmallCache(t, true, 5)
282 }
283
284 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
285         testClientTransferSmallCache(t, true, 15)
286 }
287
288 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
289         testClientTransferSmallCache(t, false, -1)
290 }
291
292 type leecherStorageTestCase struct {
293         name       string
294         f          storageFactory
295         gomaxprocs int
296 }
297
298 func TestClientTransferVarious(t *testing.T) {
299         // Leecher storage
300         for _, ls := range []leecherStorageTestCase{
301                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), 0},
302                 {"Boltdb", storage.NewBoltDB, 0},
303                 {"SqliteDirect", func(s string) storage.ClientImplCloser {
304                         path := filepath.Join(s, "sqlite3.db")
305                         var opts sqliteStorage.NewDirectStorageOpts
306                         opts.Path = path
307                         cl, err := sqliteStorage.NewDirectStorage(opts)
308                         if err != nil {
309                                 panic(err)
310                         }
311                         return cl
312                 }, 0},
313         } {
314                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
315                         // Seeder storage
316                         for _, ss := range []struct {
317                                 name string
318                                 f    storageFactory
319                         }{
320                                 {"File", storage.NewFile},
321                                 {"Mmap", storage.NewMMap},
322                         } {
323                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
324                                         for _, responsive := range []bool{false, true} {
325                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
326                                                         t.Run("NoReadahead", func(t *testing.T) {
327                                                                 testClientTransfer(t, testClientTransferParams{
328                                                                         Responsive:     responsive,
329                                                                         SeederStorage:  ss.f,
330                                                                         LeecherStorage: ls.f,
331                                                                         GOMAXPROCS:     ls.gomaxprocs,
332                                                                 })
333                                                         })
334                                                         for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} {
335                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
336                                                                         testClientTransfer(t, testClientTransferParams{
337                                                                                 SeederStorage:  ss.f,
338                                                                                 Responsive:     responsive,
339                                                                                 SetReadahead:   true,
340                                                                                 Readahead:      readahead,
341                                                                                 LeecherStorage: ls.f,
342                                                                                 GOMAXPROCS:     ls.gomaxprocs,
343                                                                         })
344                                                                 })
345                                                         }
346                                                 })
347                                         }
348                                 })
349                         }
350                 })
351         }
352 }
353
354 // Check that after completing leeching, a leecher transitions to a seeding
355 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
356 func TestSeedAfterDownloading(t *testing.T) {
357         greetingTempDir, mi := testutil.GreetingTestTorrent()
358         defer os.RemoveAll(greetingTempDir)
359
360         cfg := torrent.TestingConfig(t)
361         cfg.Seed = true
362         cfg.DataDir = greetingTempDir
363         seeder, err := torrent.NewClient(cfg)
364         require.NoError(t, err)
365         defer seeder.Close()
366         defer testutil.ExportStatusWriter(seeder, "s", t)()
367         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
368         require.NoError(t, err)
369         assert.True(t, ok)
370         seederTorrent.VerifyData()
371
372         cfg = torrent.TestingConfig(t)
373         cfg.Seed = true
374         cfg.DataDir, err = ioutil.TempDir("", "")
375         require.NoError(t, err)
376         defer os.RemoveAll(cfg.DataDir)
377         leecher, err := torrent.NewClient(cfg)
378         require.NoError(t, err)
379         defer leecher.Close()
380         defer testutil.ExportStatusWriter(leecher, "l", t)()
381
382         cfg = torrent.TestingConfig(t)
383         cfg.Seed = false
384         cfg.DataDir, err = ioutil.TempDir("", "")
385         require.NoError(t, err)
386         defer os.RemoveAll(cfg.DataDir)
387         leecherLeecher, _ := torrent.NewClient(cfg)
388         require.NoError(t, err)
389         defer leecherLeecher.Close()
390         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
391         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
392                 ret = torrent.TorrentSpecFromMetaInfo(mi)
393                 ret.ChunkSize = 2
394                 return
395         }())
396         require.NoError(t, err)
397         assert.True(t, ok)
398         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
399                 ret = torrent.TorrentSpecFromMetaInfo(mi)
400                 ret.ChunkSize = 3
401                 return
402         }())
403         require.NoError(t, err)
404         assert.True(t, ok)
405         // Simultaneously DownloadAll in Leecher, and read the contents
406         // consecutively in LeecherLeecher. This non-deterministically triggered a
407         // case where the leecher wouldn't unchoke the LeecherLeecher.
408         var wg sync.WaitGroup
409         wg.Add(1)
410         go func() {
411                 defer wg.Done()
412                 r := llg.NewReader()
413                 defer r.Close()
414                 quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
415         }()
416         done := make(chan struct{})
417         defer close(done)
418         go leecherGreeting.AddClientPeer(seeder)
419         go leecherGreeting.AddClientPeer(leecherLeecher)
420         wg.Add(1)
421         go func() {
422                 defer wg.Done()
423                 leecherGreeting.DownloadAll()
424                 leecher.WaitAll()
425         }()
426         wg.Wait()
427 }
428
429 type ConfigureClient struct {
430         Config func(*torrent.ClientConfig)
431         Client func(*torrent.Client)
432 }