X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=test%2Ftransfer_test.go;h=683d589b897059c1b76b24491c66bedecdcc7773;hb=ac086bb3bd3b8e31c117362551eb0fe295a0f78d;hp=b3187c0bec30cef658bba492560eda39d17552f1;hpb=95d808d3c5f318b8c09c233b021fb8a8f542c415;p=btrtrc.git diff --git a/test/transfer_test.go b/test/transfer_test.go index b3187c0b..683d589b 100644 --- a/test/transfer_test.go +++ b/test/transfer_test.go @@ -1,201 +1,32 @@ package test import ( - "fmt" "io" "io/ioutil" "os" - "path/filepath" - "runtime" "sync" "testing" "testing/iotest" "time" - "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2/filecache" + qt "github.com/frankban/quicktest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" + "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/internal/testutil" "github.com/anacrolix/torrent/storage" - sqliteStorage "github.com/anacrolix/torrent/storage/sqlite" - "github.com/frankban/quicktest" - "golang.org/x/time/rate" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) -type testClientTransferParams struct { - Responsive bool - Readahead int64 - SetReadahead bool - LeecherStorage func(string) storage.ClientImplCloser - SeederStorage func(string) storage.ClientImplCloser - SeederUploadRateLimiter *rate.Limiter - LeecherDownloadRateLimiter *rate.Limiter - ConfigureSeeder ConfigureClient - ConfigureLeecher ConfigureClient - GOMAXPROCS int - - LeecherStartsWithoutMetadata bool -} - -func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) { - pos, err := r.Seek(0, io.SeekStart) - assert.NoError(t, err) - assert.EqualValues(t, 0, pos) - quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil) -} - -// Creates a seeder and a leecher, and ensures the data transfers when a read -// is attempted on the leecher. -func testClientTransfer(t *testing.T, ps testClientTransferParams) { - - prevGOMAXPROCS := runtime.GOMAXPROCS(ps.GOMAXPROCS) - newGOMAXPROCS := prevGOMAXPROCS - if ps.GOMAXPROCS > 0 { - newGOMAXPROCS = ps.GOMAXPROCS - } - defer func() { - quicktest.Check(t, runtime.GOMAXPROCS(prevGOMAXPROCS), quicktest.ContentEquals, newGOMAXPROCS) - }() - - greetingTempDir, mi := testutil.GreetingTestTorrent() - defer os.RemoveAll(greetingTempDir) - // Create seeder and a Torrent. - cfg := torrent.TestingConfig(t) - cfg.Seed = true - // Some test instances don't like this being on, even when there's no cache involved. - cfg.DropMutuallyCompletePeers = false - if ps.SeederUploadRateLimiter != nil { - cfg.UploadRateLimiter = ps.SeederUploadRateLimiter - } - // cfg.ListenAddr = "localhost:4000" - if ps.SeederStorage != nil { - storage := ps.SeederStorage(greetingTempDir) - defer storage.Close() - cfg.DefaultStorage = storage - } else { - cfg.DataDir = greetingTempDir - } - if ps.ConfigureSeeder.Config != nil { - ps.ConfigureSeeder.Config(cfg) - } - seeder, err := torrent.NewClient(cfg) - require.NoError(t, err) - if ps.ConfigureSeeder.Client != nil { - ps.ConfigureSeeder.Client(seeder) - } - defer testutil.ExportStatusWriter(seeder, "s", t)() - seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi)) - // Run a Stats right after Closing the Client. This will trigger the Stats - // panic in #214 caused by RemoteAddr on Closed uTP sockets. - defer seederTorrent.Stats() - defer seeder.Close() - seederTorrent.VerifyData() - // Create leecher and a Torrent. - leecherDataDir, err := ioutil.TempDir("", "") - require.NoError(t, err) - defer os.RemoveAll(leecherDataDir) - cfg = torrent.TestingConfig(t) - // See the seeder client config comment. - cfg.DropMutuallyCompletePeers = false - if ps.LeecherStorage == nil { - cfg.DataDir = leecherDataDir - } else { - storage := ps.LeecherStorage(leecherDataDir) - defer storage.Close() - cfg.DefaultStorage = storage - } - if ps.LeecherDownloadRateLimiter != nil { - cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter - } - cfg.Seed = false - //cfg.Debug = true - if ps.ConfigureLeecher.Config != nil { - ps.ConfigureLeecher.Config(cfg) - } - leecher, err := torrent.NewClient(cfg) - require.NoError(t, err) - defer leecher.Close() - if ps.ConfigureLeecher.Client != nil { - ps.ConfigureLeecher.Client(leecher) - } - defer testutil.ExportStatusWriter(leecher, "l", t)() - leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) { - ret = torrent.TorrentSpecFromMetaInfo(mi) - ret.ChunkSize = 2 - if ps.LeecherStartsWithoutMetadata { - ret.InfoBytes = nil - } - return - }()) - require.NoError(t, err) - assert.True(t, new) - - //// This was used when observing coalescing of piece state changes. - //logPieceStateChanges(leecherTorrent) - - // Now do some things with leecher and seeder. - added := leecherTorrent.AddClientPeer(seeder) - assert.False(t, leecherTorrent.Seeding()) - // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they - // should be sitting idle until we demand data. - if !ps.LeecherStartsWithoutMetadata { - assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers) - } - if ps.LeecherStartsWithoutMetadata { - <-leecherTorrent.GotInfo() - } - r := leecherTorrent.NewReader() - defer r.Close() - go leecherTorrent.SetInfoBytes(mi.InfoBytes) - if ps.Responsive { - r.SetResponsive() - } - if ps.SetReadahead { - r.SetReadahead(ps.Readahead) - } - assertReadAllGreeting(t, r) - assert.NotEmpty(t, seederTorrent.PeerConns()) - leecherPeerConns := leecherTorrent.PeerConns() - if cfg.DropMutuallyCompletePeers { - // I don't think we can assume it will be empty already, due to timing. - //assert.Empty(t, leecherPeerConns) - } else { - assert.NotEmpty(t, leecherPeerConns) - } - foundSeeder := false - for _, pc := range leecherPeerConns { - completed := pc.PeerPieces().Len() - t.Logf("peer conn %v has %v completed pieces", pc, completed) - if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) { - foundSeeder = true - } - } - if !foundSeeder { - t.Errorf("didn't find seeder amongst leecher peer conns") - } - - seederStats := seederTorrent.Stats() - assert.True(t, 13 <= seederStats.BytesWrittenData.Int64()) - assert.True(t, 8 <= seederStats.ChunksWritten.Int64()) - - leecherStats := leecherTorrent.Stats() - assert.True(t, 13 <= leecherStats.BytesReadData.Int64()) - assert.True(t, 8 <= leecherStats.ChunksRead.Int64()) - - // Try reading through again for the cases where the torrent data size - // exceeds the size of the cache. - assertReadAllGreeting(t, r) -} - type fileCacheClientStorageFactoryParams struct { Capacity int64 SetCapacity bool } -func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory { +func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) StorageFactory { return func(dataDir string) storage.ClientImplCloser { fc, err := filecache.NewCache(dataDir) if err != nil { @@ -220,8 +51,6 @@ func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) st } } -type storageFactory func(string) storage.ClientImplCloser - func TestClientTransferDefault(t *testing.T) { testClientTransfer(t, testClientTransferParams{ LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), @@ -249,6 +78,13 @@ func TestClientTransferRateLimitedUpload(t *testing.T) { func TestClientTransferRateLimitedDownload(t *testing.T) { testClientTransfer(t, testClientTransferParams{ LeecherDownloadRateLimiter: rate.NewLimiter(512, 512), + ConfigureSeeder: ConfigureClient{ + Config: func(cfg *torrent.ClientConfig) { + // If we send too many keep alives, we consume all the leechers available download + // rate. The default isn't exposed, but a minute is pretty reasonable. + cfg.KeepAliveTimeout = time.Minute + }, + }, }) } @@ -260,7 +96,8 @@ func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int // that it can be hashed. Capacity: 5, }), - SetReadahead: setReadahead, + LeecherStorageCapacity: 5, + SetReadahead: setReadahead, // Can't readahead too far or the cache will thrash and drop data we // thought we had. Readahead: readahead, @@ -269,8 +106,8 @@ func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int ConfigureLeecher: ConfigureClient{ Config: func(cfg *torrent.ClientConfig) { cfg.DropDuplicatePeerIds = true - //cfg.DisableIPv6 = true - //cfg.DisableUTP = true + // cfg.DisableIPv6 = true + // cfg.DisableUTP = true }, }, }) @@ -288,109 +125,23 @@ func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) { testClientTransferSmallCache(t, false, -1) } -func sqliteClientStorageFactory(optsMaker func(dataDir string) sqliteStorage.NewPiecesStorageOpts) storageFactory { - return func(dataDir string) storage.ClientImplCloser { - opts := optsMaker(dataDir) - //log.Printf("opening sqlite db: %#v", opts) - ret, err := sqliteStorage.NewPiecesStorage(opts) - if err != nil { - panic(err) - } - return ret - } -} - -type leecherStorageTestCase struct { - name string - f storageFactory - gomaxprocs int -} - -func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase { - return leecherStorageTestCase{ - fmt.Sprintf("SqliteFile,NumConns=%v", numConns), - sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) { - opts.Path = filepath.Join(dataDir, "sqlite.db") - opts.NumConns = numConns - return - }), - numConns, - } -} - -func TestClientTransferVarious(t *testing.T) { - // Leecher storage - for _, ls := range []leecherStorageTestCase{ - {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), 0}, - {"Boltdb", storage.NewBoltDB, 0}, - {"SqliteDirect", func(s string) storage.ClientImplCloser { - path := filepath.Join(s, "sqlite3.db") - var opts sqliteStorage.NewDirectStorageOpts - opts.Path = path - cl, err := sqliteStorage.NewDirectStorage(opts) - if err != nil { - panic(err) - } - return cl - }, 0}, - sqliteLeecherStorageTestCase(1), - sqliteLeecherStorageTestCase(2), - // This should use a number of connections equal to the number of CPUs - sqliteLeecherStorageTestCase(0), - {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) { - opts.Memory = true - return - }), 0}, - } { - t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) { - // Seeder storage - for _, ss := range []struct { - name string - f storageFactory - }{ - {"File", storage.NewFile}, - {"Mmap", storage.NewMMap}, - } { - t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) { - for _, responsive := range []bool{false, true} { - t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) { - t.Run("NoReadahead", func(t *testing.T) { - testClientTransfer(t, testClientTransferParams{ - Responsive: responsive, - SeederStorage: ss.f, - LeecherStorage: ls.f, - GOMAXPROCS: ls.gomaxprocs, - }) - }) - for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} { - t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) { - testClientTransfer(t, testClientTransferParams{ - SeederStorage: ss.f, - Responsive: responsive, - SetReadahead: true, - Readahead: readahead, - LeecherStorage: ls.f, - GOMAXPROCS: ls.gomaxprocs, - }) - }) - } - }) - } - }) - } - }) - } +func TestFilecacheClientTransferVarious(t *testing.T) { + TestLeecherStorage(t, LeecherStorageTestCase{ + "Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), 0, + }) } // Check that after completing leeching, a leecher transitions to a seeding // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher. -func TestSeedAfterDownloading(t *testing.T) { +func testSeedAfterDownloading(t *testing.T, disableUtp bool) { greetingTempDir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(greetingTempDir) cfg := torrent.TestingConfig(t) cfg.Seed = true + cfg.MaxAllocPeerRequestDataPerConn = 4 cfg.DataDir = greetingTempDir + cfg.DisableUTP = disableUtp seeder, err := torrent.NewClient(cfg) require.NoError(t, err) defer seeder.Close() @@ -402,19 +153,27 @@ func TestSeedAfterDownloading(t *testing.T) { cfg = torrent.TestingConfig(t) cfg.Seed = true - cfg.DataDir, err = ioutil.TempDir("", "") - require.NoError(t, err) - defer os.RemoveAll(cfg.DataDir) + cfg.DataDir = t.TempDir() + cfg.DisableUTP = disableUtp + // Make sure the leecher-leecher doesn't connect directly to the seeder. This is because I + // wanted to see if having the higher chunk-sized leecher-leecher would cause the leecher to + // error decoding. However it shouldn't because a client should only be receiving pieces sized + // to the chunk size it expects. + cfg.DisablePEX = true + //cfg.Debug = true + cfg.Logger = log.Default.WithContextText("leecher") leecher, err := torrent.NewClient(cfg) require.NoError(t, err) defer leecher.Close() defer testutil.ExportStatusWriter(leecher, "l", t)() cfg = torrent.TestingConfig(t) + cfg.DisableUTP = disableUtp cfg.Seed = false - cfg.DataDir, err = ioutil.TempDir("", "") - require.NoError(t, err) - defer os.RemoveAll(cfg.DataDir) + cfg.DataDir = t.TempDir() + cfg.MaxAllocPeerRequestDataPerConn = 4 + cfg.Logger = log.Default.WithContextText("leecher-leecher") + cfg.Debug = true leecherLeecher, _ := torrent.NewClient(cfg) require.NoError(t, err) defer leecherLeecher.Close() @@ -437,15 +196,17 @@ func TestSeedAfterDownloading(t *testing.T) { // consecutively in LeecherLeecher. This non-deterministically triggered a // case where the leecher wouldn't unchoke the LeecherLeecher. var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + { + // Prioritize a region, and ensure it's been hashed, so we want connections. r := llg.NewReader() - defer r.Close() - quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil) - }() - done := make(chan struct{}) - defer close(done) + llg.VerifyData() + wg.Add(1) + go func() { + defer wg.Done() + defer r.Close() + qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil) + }() + } go leecherGreeting.AddClientPeer(seeder) go leecherGreeting.AddClientPeer(leecherLeecher) wg.Add(1) @@ -457,7 +218,10 @@ func TestSeedAfterDownloading(t *testing.T) { wg.Wait() } -type ConfigureClient struct { - Config func(*torrent.ClientConfig) - Client func(*torrent.Client) +func TestSeedAfterDownloadingDisableUtp(t *testing.T) { + testSeedAfterDownloading(t, true) +} + +func TestSeedAfterDownloadingAllowUtp(t *testing.T) { + testSeedAfterDownloading(t, false) }