]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Merge branch 'master' into go1.18
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "os"
8         "path/filepath"
9         "runtime"
10         "sync"
11         "testing"
12         "testing/iotest"
13         "time"
14
15         "github.com/anacrolix/missinggo/v2/bitmap"
16         "github.com/anacrolix/missinggo/v2/filecache"
17         "github.com/anacrolix/torrent"
18         "github.com/anacrolix/torrent/internal/testutil"
19         "github.com/anacrolix/torrent/storage"
20         sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
21         "github.com/frankban/quicktest"
22         "golang.org/x/time/rate"
23
24         "github.com/stretchr/testify/assert"
25         "github.com/stretchr/testify/require"
26 )
27
28 type testClientTransferParams struct {
29         Responsive     bool
30         Readahead      int64
31         SetReadahead   bool
32         LeecherStorage func(string) storage.ClientImplCloser
33         // TODO: Use a generic option type. This is the capacity of the leecher storage for determining
34         // whether it's possible for the leecher to be Complete. 0 currently means no limit.
35         LeecherStorageCapacity     int64
36         SeederStorage              func(string) storage.ClientImplCloser
37         SeederUploadRateLimiter    *rate.Limiter
38         LeecherDownloadRateLimiter *rate.Limiter
39         ConfigureSeeder            ConfigureClient
40         ConfigureLeecher           ConfigureClient
41         GOMAXPROCS                 int
42
43         LeecherStartsWithoutMetadata bool
44 }
45
46 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
47         pos, err := r.Seek(0, io.SeekStart)
48         assert.NoError(t, err)
49         assert.EqualValues(t, 0, pos)
50         quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
51 }
52
53 // Creates a seeder and a leecher, and ensures the data transfers when a read
54 // is attempted on the leecher.
55 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
56         t.Parallel()
57
58         prevGOMAXPROCS := runtime.GOMAXPROCS(ps.GOMAXPROCS)
59         newGOMAXPROCS := prevGOMAXPROCS
60         if ps.GOMAXPROCS > 0 {
61                 newGOMAXPROCS = ps.GOMAXPROCS
62         }
63         defer func() {
64                 quicktest.Check(t, runtime.GOMAXPROCS(prevGOMAXPROCS), quicktest.ContentEquals, newGOMAXPROCS)
65         }()
66
67         greetingTempDir, mi := testutil.GreetingTestTorrent()
68         defer os.RemoveAll(greetingTempDir)
69         // Create seeder and a Torrent.
70         cfg := torrent.TestingConfig(t)
71         // cfg.Debug = true
72         cfg.Seed = true
73         // Some test instances don't like this being on, even when there's no cache involved.
74         cfg.DropMutuallyCompletePeers = false
75         if ps.SeederUploadRateLimiter != nil {
76                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
77         }
78         // cfg.ListenAddr = "localhost:4000"
79         if ps.SeederStorage != nil {
80                 storage := ps.SeederStorage(greetingTempDir)
81                 defer storage.Close()
82                 cfg.DefaultStorage = storage
83         } else {
84                 cfg.DataDir = greetingTempDir
85         }
86         if ps.ConfigureSeeder.Config != nil {
87                 ps.ConfigureSeeder.Config(cfg)
88         }
89         seeder, err := torrent.NewClient(cfg)
90         require.NoError(t, err)
91         if ps.ConfigureSeeder.Client != nil {
92                 ps.ConfigureSeeder.Client(seeder)
93         }
94         defer testutil.ExportStatusWriter(seeder, "s", t)()
95         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
96         // Run a Stats right after Closing the Client. This will trigger the Stats
97         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
98         defer seederTorrent.Stats()
99         defer seeder.Close()
100         // Adding a torrent and setting the info should trigger piece checks for everything
101         // automatically. Wait until the seed Torrent agrees that everything is available.
102         <-seederTorrent.Complete.On()
103         // Create leecher and a Torrent.
104         leecherDataDir := t.TempDir()
105         cfg = torrent.TestingConfig(t)
106         // See the seeder client config comment.
107         cfg.DropMutuallyCompletePeers = false
108         if ps.LeecherStorage == nil {
109                 cfg.DataDir = leecherDataDir
110         } else {
111                 storage := ps.LeecherStorage(leecherDataDir)
112                 defer storage.Close()
113                 cfg.DefaultStorage = storage
114         }
115         if ps.LeecherDownloadRateLimiter != nil {
116                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
117         }
118         cfg.Seed = false
119         // cfg.Debug = true
120         if ps.ConfigureLeecher.Config != nil {
121                 ps.ConfigureLeecher.Config(cfg)
122         }
123         leecher, err := torrent.NewClient(cfg)
124         require.NoError(t, err)
125         defer leecher.Close()
126         if ps.ConfigureLeecher.Client != nil {
127                 ps.ConfigureLeecher.Client(leecher)
128         }
129         defer testutil.ExportStatusWriter(leecher, "l", t)()
130         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
131                 ret = torrent.TorrentSpecFromMetaInfo(mi)
132                 ret.ChunkSize = 2
133                 if ps.LeecherStartsWithoutMetadata {
134                         ret.InfoBytes = nil
135                 }
136                 return
137         }())
138         require.NoError(t, err)
139         assert.False(t, leecherTorrent.Complete.Bool())
140         assert.True(t, new)
141
142         //// This was used when observing coalescing of piece state changes.
143         //logPieceStateChanges(leecherTorrent)
144
145         // Now do some things with leecher and seeder.
146         added := leecherTorrent.AddClientPeer(seeder)
147         assert.False(t, leecherTorrent.Seeding())
148         // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
149         // should be sitting idle until we demand data.
150         if !ps.LeecherStartsWithoutMetadata {
151                 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
152         }
153         if ps.LeecherStartsWithoutMetadata {
154                 <-leecherTorrent.GotInfo()
155         }
156         r := leecherTorrent.NewReader()
157         defer r.Close()
158         go leecherTorrent.SetInfoBytes(mi.InfoBytes)
159         if ps.Responsive {
160                 r.SetResponsive()
161         }
162         if ps.SetReadahead {
163                 r.SetReadahead(ps.Readahead)
164         }
165         assertReadAllGreeting(t, r)
166         info, err := mi.UnmarshalInfo()
167         require.NoError(t, err)
168         canComplete := ps.LeecherStorageCapacity == 0 || ps.LeecherStorageCapacity >= info.TotalLength()
169         if !canComplete {
170                 // Reading from a cache doesn't refresh older pieces until we fail to read those, so we need
171                 // to force a refresh since we just read the contents from start to finish.
172                 go leecherTorrent.VerifyData()
173         }
174         if canComplete {
175                 <-leecherTorrent.Complete.On()
176         } else {
177                 <-leecherTorrent.Complete.Off()
178         }
179         assert.NotEmpty(t, seederTorrent.PeerConns())
180         leecherPeerConns := leecherTorrent.PeerConns()
181         if cfg.DropMutuallyCompletePeers {
182                 // I don't think we can assume it will be empty already, due to timing.
183                 // assert.Empty(t, leecherPeerConns)
184         } else {
185                 assert.NotEmpty(t, leecherPeerConns)
186         }
187         foundSeeder := false
188         for _, pc := range leecherPeerConns {
189                 completed := pc.PeerPieces().GetCardinality()
190                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
191                 if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) {
192                         foundSeeder = true
193                 }
194         }
195         if !foundSeeder {
196                 t.Errorf("didn't find seeder amongst leecher peer conns")
197         }
198
199         seederStats := seederTorrent.Stats()
200         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
201         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
202
203         leecherStats := leecherTorrent.Stats()
204         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
205         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
206
207         // Try reading through again for the cases where the torrent data size
208         // exceeds the size of the cache.
209         assertReadAllGreeting(t, r)
210 }
211
212 type fileCacheClientStorageFactoryParams struct {
213         Capacity    int64
214         SetCapacity bool
215 }
216
217 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
218         return func(dataDir string) storage.ClientImplCloser {
219                 fc, err := filecache.NewCache(dataDir)
220                 if err != nil {
221                         panic(err)
222                 }
223                 var sharedCapacity *int64
224                 if ps.SetCapacity {
225                         sharedCapacity = &ps.Capacity
226                         fc.SetCapacity(ps.Capacity)
227                 }
228                 return struct {
229                         storage.ClientImpl
230                         io.Closer
231                 }{
232                         storage.NewResourcePiecesOpts(
233                                 fc.AsResourceProvider(),
234                                 storage.ResourcePiecesOpts{
235                                         Capacity: sharedCapacity,
236                                 }),
237                         ioutil.NopCloser(nil),
238                 }
239         }
240 }
241
242 type storageFactory func(string) storage.ClientImplCloser
243
244 func TestClientTransferDefault(t *testing.T) {
245         testClientTransfer(t, testClientTransferParams{
246                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
247         })
248 }
249
250 func TestClientTransferDefaultNoMetadata(t *testing.T) {
251         testClientTransfer(t, testClientTransferParams{
252                 LeecherStorage:               newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
253                 LeecherStartsWithoutMetadata: true,
254         })
255 }
256
257 func TestClientTransferRateLimitedUpload(t *testing.T) {
258         started := time.Now()
259         testClientTransfer(t, testClientTransferParams{
260                 // We are uploading 13 bytes (the length of the greeting torrent). The
261                 // chunks are 2 bytes in length. Then the smallest burst we can run
262                 // with is 2. Time taken is (13-burst)/rate.
263                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
264         })
265         require.True(t, time.Since(started) > time.Second)
266 }
267
268 func TestClientTransferRateLimitedDownload(t *testing.T) {
269         testClientTransfer(t, testClientTransferParams{
270                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
271                 ConfigureSeeder: ConfigureClient{
272                         Config: func(cfg *torrent.ClientConfig) {
273                                 // If we send too many keep alives, we consume all the leechers available download
274                                 // rate. The default isn't exposed, but a minute is pretty reasonable.
275                                 cfg.KeepAliveTimeout = time.Minute
276                         },
277                 },
278         })
279 }
280
281 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
282         testClientTransfer(t, testClientTransferParams{
283                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
284                         SetCapacity: true,
285                         // Going below the piece length means it can't complete a piece so
286                         // that it can be hashed.
287                         Capacity: 5,
288                 }),
289                 LeecherStorageCapacity: 5,
290                 SetReadahead:           setReadahead,
291                 // Can't readahead too far or the cache will thrash and drop data we
292                 // thought we had.
293                 Readahead: readahead,
294
295                 // These tests don't work well with more than 1 connection to the seeder.
296                 ConfigureLeecher: ConfigureClient{
297                         Config: func(cfg *torrent.ClientConfig) {
298                                 cfg.DropDuplicatePeerIds = true
299                                 // cfg.DisableIPv6 = true
300                                 // cfg.DisableUTP = true
301                         },
302                 },
303         })
304 }
305
306 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
307         testClientTransferSmallCache(t, true, 5)
308 }
309
310 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
311         testClientTransferSmallCache(t, true, 15)
312 }
313
314 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
315         testClientTransferSmallCache(t, false, -1)
316 }
317
318 type leecherStorageTestCase struct {
319         name       string
320         f          storageFactory
321         gomaxprocs int
322 }
323
324 func TestClientTransferVarious(t *testing.T) {
325         // Leecher storage
326         for _, ls := range []leecherStorageTestCase{
327                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), 0},
328                 {"Boltdb", storage.NewBoltDB, 0},
329                 {"SqliteDirect", func(s string) storage.ClientImplCloser {
330                         path := filepath.Join(s, "sqlite3.db")
331                         var opts sqliteStorage.NewDirectStorageOpts
332                         opts.Path = path
333                         cl, err := sqliteStorage.NewDirectStorage(opts)
334                         if err != nil {
335                                 panic(err)
336                         }
337                         return cl
338                 }, 0},
339         } {
340                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
341                         // Seeder storage
342                         for _, ss := range []struct {
343                                 name string
344                                 f    storageFactory
345                         }{
346                                 {"File", storage.NewFile},
347                                 {"Mmap", storage.NewMMap},
348                         } {
349                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
350                                         for _, responsive := range []bool{false, true} {
351                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
352                                                         t.Run("NoReadahead", func(t *testing.T) {
353                                                                 testClientTransfer(t, testClientTransferParams{
354                                                                         Responsive:     responsive,
355                                                                         SeederStorage:  ss.f,
356                                                                         LeecherStorage: ls.f,
357                                                                         GOMAXPROCS:     ls.gomaxprocs,
358                                                                 })
359                                                         })
360                                                         for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} {
361                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
362                                                                         testClientTransfer(t, testClientTransferParams{
363                                                                                 SeederStorage:  ss.f,
364                                                                                 Responsive:     responsive,
365                                                                                 SetReadahead:   true,
366                                                                                 Readahead:      readahead,
367                                                                                 LeecherStorage: ls.f,
368                                                                                 GOMAXPROCS:     ls.gomaxprocs,
369                                                                         })
370                                                                 })
371                                                         }
372                                                 })
373                                         }
374                                 })
375                         }
376                 })
377         }
378 }
379
380 // Check that after completing leeching, a leecher transitions to a seeding
381 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
382 func TestSeedAfterDownloading(t *testing.T) {
383         greetingTempDir, mi := testutil.GreetingTestTorrent()
384         defer os.RemoveAll(greetingTempDir)
385
386         cfg := torrent.TestingConfig(t)
387         cfg.Seed = true
388         cfg.DataDir = greetingTempDir
389         seeder, err := torrent.NewClient(cfg)
390         require.NoError(t, err)
391         defer seeder.Close()
392         defer testutil.ExportStatusWriter(seeder, "s", t)()
393         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
394         require.NoError(t, err)
395         assert.True(t, ok)
396         seederTorrent.VerifyData()
397
398         cfg = torrent.TestingConfig(t)
399         cfg.Seed = true
400         cfg.DataDir = t.TempDir()
401         leecher, err := torrent.NewClient(cfg)
402         require.NoError(t, err)
403         defer leecher.Close()
404         defer testutil.ExportStatusWriter(leecher, "l", t)()
405
406         cfg = torrent.TestingConfig(t)
407         cfg.Seed = false
408         cfg.DataDir = t.TempDir()
409         leecherLeecher, _ := torrent.NewClient(cfg)
410         require.NoError(t, err)
411         defer leecherLeecher.Close()
412         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
413         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
414                 ret = torrent.TorrentSpecFromMetaInfo(mi)
415                 ret.ChunkSize = 2
416                 return
417         }())
418         require.NoError(t, err)
419         assert.True(t, ok)
420         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
421                 ret = torrent.TorrentSpecFromMetaInfo(mi)
422                 ret.ChunkSize = 3
423                 return
424         }())
425         require.NoError(t, err)
426         assert.True(t, ok)
427         // Simultaneously DownloadAll in Leecher, and read the contents
428         // consecutively in LeecherLeecher. This non-deterministically triggered a
429         // case where the leecher wouldn't unchoke the LeecherLeecher.
430         var wg sync.WaitGroup
431         wg.Add(1)
432         go func() {
433                 defer wg.Done()
434                 r := llg.NewReader()
435                 defer r.Close()
436                 quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
437         }()
438         done := make(chan struct{})
439         defer close(done)
440         go leecherGreeting.AddClientPeer(seeder)
441         go leecherGreeting.AddClientPeer(leecherLeecher)
442         wg.Add(1)
443         go func() {
444                 defer wg.Done()
445                 leecherGreeting.DownloadAll()
446                 leecher.WaitAll()
447         }()
448         wg.Wait()
449 }
450
451 type ConfigureClient struct {
452         Config func(cfg *torrent.ClientConfig)
453         Client func(cl *torrent.Client)
454 }