X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=client_test.go;h=d2a88e9e7a769abcd1871092c006e37b33573a51;hb=HEAD;hp=d0d88cdf8fc5b5b99122dcc775495c735dfd00e6;hpb=b92e8b7814b66d196347efa1e666cf51bc66b4b6;p=btrtrc.git diff --git a/client_test.go b/client_test.go index d0d88cdf..d2a88e9e 100644 --- a/client_test.go +++ b/client_test.go @@ -1,25 +1,26 @@ package torrent import ( - "context" "encoding/binary" "fmt" "io" - "io/ioutil" "net" + "net/netip" "os" "path/filepath" - "sync" + "reflect" "testing" + "testing/iotest" "time" - _ "github.com/anacrolix/envpprof" - "github.com/anacrolix/missinggo" - "github.com/anacrolix/missinggo/filecache" - "github.com/bradfitz/iter" + "github.com/anacrolix/dht/v2" + "github.com/anacrolix/log" + "github.com/anacrolix/missinggo/v2" + "github.com/anacrolix/missinggo/v2/filecache" + "github.com/frankban/quicktest" + qt "github.com/frankban/quicktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/internal/testutil" @@ -28,41 +29,24 @@ import ( "github.com/anacrolix/torrent/storage" ) -func TestingConfig() *Config { - return &Config{ - ListenAddr: "localhost:0", - NoDHT: true, - DataDir: tempDir(), - DisableTrackers: true, - NoDefaultPortForwarding: true, - // Debug: true, - } -} - func TestClientDefault(t *testing.T) { - cl, err := NewClient(TestingConfig()) + cl, err := NewClient(TestingConfig(t)) require.NoError(t, err) - cl.Close() + require.Empty(t, cl.Close()) } -func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) { - cfg := TestingConfig() - pc, err := storage.NewBoltPieceCompletion(cfg.DataDir) - require.NoError(t, err) - ci := storage.NewFileWithCompletion(cfg.DataDir, pc) - defer ci.Close() - cfg.DefaultStorage = ci - cl, err := NewClient(cfg) - require.NoError(t, err) - cl.Close() - // And again, https://github.com/anacrolix/torrent/issues/158 - cl, err = NewClient(cfg) +func TestClientNilConfig(t *testing.T) { + // The default config will put crap in the working directory. + origDir, _ := os.Getwd() + defer os.Chdir(origDir) + os.Chdir(t.TempDir()) + cl, err := NewClient(nil) require.NoError(t, err) - cl.Close() + require.Empty(t, cl.Close()) } func TestAddDropTorrent(t *testing.T) { - cl, err := NewClient(TestingConfig()) + cl, err := NewClient(TestingConfig(t)) require.NoError(t, err) defer cl.Close() dir, mi := testutil.GreetingTestTorrent() @@ -97,41 +81,28 @@ func TestPieceHashSize(t *testing.T) { func TestTorrentInitialState(t *testing.T) { dir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(dir) - cl := &Client{} + var cl Client + cl.init(TestingConfig(t)) cl.initLogger() tor := cl.newTorrent( mi.HashInfoBytes(), - storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()), + storage.NewFileWithCompletion(t.TempDir(), storage.NewMapPieceCompletion()), ) tor.setChunkSize(2) - tor.cl.mu.Lock() - err := tor.setInfoBytes(mi.InfoBytes) - tor.cl.mu.Unlock() + tor.cl.lock() + err := tor.setInfoBytesLocked(mi.InfoBytes) + tor.cl.unlock() require.NoError(t, err) require.Len(t, tor.pieces, 3) tor.pendAllChunkSpecs(0) - tor.cl.mu.Lock() + tor.cl.lock() assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0)) - tor.cl.mu.Unlock() - assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize)) -} - -func TestUnmarshalPEXMsg(t *testing.T) { - var m peerExchangeMessage - if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil { - t.Fatal(err) - } - if len(m.Added) != 2 { - t.FailNow() - } - if m.Added[0].Port != 0x506 { - t.FailNow() - } + tor.cl.unlock() + assert.EqualValues(t, ChunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize)) } func TestReducedDialTimeout(t *testing.T) { - cfg := &Config{} - cfg.setDefaults() + cfg := NewDefaultClientConfig() for _, _case := range []struct { Max time.Duration HalfOpenLimit int @@ -156,79 +127,11 @@ func TestReducedDialTimeout(t *testing.T) { } } -func TestUTPRawConn(t *testing.T) { - l, err := NewUtpSocket("udp", "") - require.NoError(t, err) - defer l.Close() - go func() { - for { - _, err := l.Accept() - if err != nil { - break - } - } - }() - // Connect a UTP peer to see if the RawConn will still work. - s, err := NewUtpSocket("udp", "") - require.NoError(t, err) - defer s.Close() - utpPeer, err := s.DialContext(context.Background(), "", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr()))) - require.NoError(t, err) - defer utpPeer.Close() - peer, err := net.ListenPacket("udp", ":0") - require.NoError(t, err) - defer peer.Close() - - msgsReceived := 0 - // How many messages to send. I've set this to double the channel buffer - // size in the raw packetConn. - const N = 200 - readerStopped := make(chan struct{}) - // The reader goroutine. - go func() { - defer close(readerStopped) - b := make([]byte, 500) - for i := 0; i < N; i++ { - n, _, err := l.ReadFrom(b) - require.NoError(t, err) - msgsReceived++ - var d int - fmt.Sscan(string(b[:n]), &d) - assert.Equal(t, i, d) - } - }() - udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr()))) - require.NoError(t, err) - for i := 0; i < N; i++ { - _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr) - require.NoError(t, err) - time.Sleep(time.Millisecond) - } - select { - case <-readerStopped: - case <-time.After(time.Second): - t.Fatal("reader timed out") - } - if msgsReceived != N { - t.Fatalf("messages received: %d", msgsReceived) - } -} - -func TestTwoClientsArbitraryPorts(t *testing.T) { - for i := 0; i < 2; i++ { - cl, err := NewClient(TestingConfig()) - if err != nil { - t.Fatal(err) - } - defer cl.Close() - } -} - func TestAddDropManyTorrents(t *testing.T) { - cl, err := NewClient(TestingConfig()) + cl, err := NewClient(TestingConfig(t)) require.NoError(t, err) defer cl.Close() - for i := range iter.N(1000) { + for i := 0; i < 1000; i += 1 { var spec TorrentSpec binary.PutVarint(spec.InfoHash[:], int64(i)) tt, new, err := cl.AddTorrentSpec(&spec) @@ -238,267 +141,17 @@ func TestAddDropManyTorrents(t *testing.T) { } } -type FileCacheClientStorageFactoryParams struct { - Capacity int64 - SetCapacity bool - Wrapper func(*filecache.Cache) storage.ClientImpl -} - -func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory { - return func(dataDir string) storage.ClientImpl { - fc, err := filecache.NewCache(dataDir) - if err != nil { - panic(err) - } - if ps.SetCapacity { - fc.SetCapacity(ps.Capacity) - } - return ps.Wrapper(fc) - } -} - -type storageFactory func(string) storage.ClientImpl - -func TestClientTransferDefault(t *testing.T) { - testClientTransfer(t, testClientTransferParams{ - ExportClientStatus: true, - LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{ - Wrapper: fileCachePieceResourceStorage, - }), - }) -} - -func TestClientTransferRateLimitedUpload(t *testing.T) { - started := time.Now() - testClientTransfer(t, testClientTransferParams{ - // We are uploading 13 bytes (the length of the greeting torrent). The - // chunks are 2 bytes in length. Then the smallest burst we can run - // with is 2. Time taken is (13-burst)/rate. - SeederUploadRateLimiter: rate.NewLimiter(11, 2), - ExportClientStatus: true, - }) - require.True(t, time.Since(started) > time.Second) -} - -func TestClientTransferRateLimitedDownload(t *testing.T) { - testClientTransfer(t, testClientTransferParams{ - LeecherDownloadRateLimiter: rate.NewLimiter(512, 512), - }) -} - func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl { - return storage.NewResourcePieces(fc.AsResourceProvider()) -} - -func TestClientTransferSmallCache(t *testing.T) { - testClientTransfer(t, testClientTransferParams{ - LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{ - SetCapacity: true, - // Going below the piece length means it can't complete a piece so - // that it can be hashed. - Capacity: 5, - Wrapper: fileCachePieceResourceStorage, - }), - SetReadahead: true, - // Can't readahead too far or the cache will thrash and drop data we - // thought we had. - Readahead: 0, - ExportClientStatus: true, - }) -} - -func TestClientTransferVarious(t *testing.T) { - // Leecher storage - for _, ls := range []storageFactory{ - NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{ - Wrapper: fileCachePieceResourceStorage, - }), - storage.NewBoltDB, - } { - // Seeder storage - for _, ss := range []func(string) storage.ClientImpl{ - storage.NewFile, - storage.NewMMap, - } { - for _, responsive := range []bool{false, true} { - testClientTransfer(t, testClientTransferParams{ - Responsive: responsive, - SeederStorage: ss, - LeecherStorage: ls, - }) - for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} { - testClientTransfer(t, testClientTransferParams{ - SeederStorage: ss, - Responsive: responsive, - SetReadahead: true, - Readahead: readahead, - LeecherStorage: ls, - }) - } - } - } - } -} - -type testClientTransferParams struct { - Responsive bool - Readahead int64 - SetReadahead bool - ExportClientStatus bool - LeecherStorage func(string) storage.ClientImpl - SeederStorage func(string) storage.ClientImpl - SeederUploadRateLimiter *rate.Limiter - LeecherDownloadRateLimiter *rate.Limiter -} - -// Creates a seeder and a leecher, and ensures the data transfers when a read -// is attempted on the leecher. -func testClientTransfer(t *testing.T, ps testClientTransferParams) { - greetingTempDir, mi := testutil.GreetingTestTorrent() - defer os.RemoveAll(greetingTempDir) - // Create seeder and a Torrent. - cfg := TestingConfig() - cfg.Seed = true - cfg.UploadRateLimiter = ps.SeederUploadRateLimiter - // cfg.ListenAddr = "localhost:4000" - if ps.SeederStorage != nil { - cfg.DefaultStorage = ps.SeederStorage(greetingTempDir) - defer cfg.DefaultStorage.Close() - } else { - cfg.DataDir = greetingTempDir - } - seeder, err := NewClient(cfg) - require.NoError(t, err) - if ps.ExportClientStatus { - testutil.ExportStatusWriter(seeder, "s") - } - seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) - // Run a Stats right after Closing the Client. This will trigger the Stats - // panic in #214 caused by RemoteAddr on Closed uTP sockets. - defer seederTorrent.Stats() - defer seeder.Close() - seederTorrent.VerifyData() - // Create leecher and a Torrent. - leecherDataDir, err := ioutil.TempDir("", "") - require.NoError(t, err) - defer os.RemoveAll(leecherDataDir) - if ps.LeecherStorage == nil { - cfg.DataDir = leecherDataDir - } else { - cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir) - } - cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter - cfg.Seed = false - leecher, err := NewClient(cfg) - require.NoError(t, err) - defer leecher.Close() - if ps.ExportClientStatus { - testutil.ExportStatusWriter(leecher, "l") - } - leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { - ret = TorrentSpecFromMetaInfo(mi) - ret.ChunkSize = 2 - return - }()) - require.NoError(t, err) - assert.True(t, new) - // Now do some things with leecher and seeder. - addClientPeer(leecherTorrent, seeder) - // The Torrent should not be interested in obtaining peers, so the one we - // just added should be the only one. - assert.False(t, leecherTorrent.Seeding()) - assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers) - r := leecherTorrent.NewReader() - defer r.Close() - if ps.Responsive { - r.SetResponsive() - } - if ps.SetReadahead { - r.SetReadahead(ps.Readahead) - } - assertReadAllGreeting(t, r) - assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData) - assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten) - assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData) - assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead) - // Try reading through again for the cases where the torrent data size - // exceeds the size of the cache. - assertReadAllGreeting(t, r) -} - -func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) { - pos, err := r.Seek(0, io.SeekStart) - assert.NoError(t, err) - assert.EqualValues(t, 0, pos) - _greeting, err := ioutil.ReadAll(r) - assert.NoError(t, err) - assert.EqualValues(t, testutil.GreetingFileContents, _greeting) -} - -// Check that after completing leeching, a leecher transitions to a seeding -// correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher. -func TestSeedAfterDownloading(t *testing.T) { - greetingTempDir, mi := testutil.GreetingTestTorrent() - defer os.RemoveAll(greetingTempDir) - cfg := TestingConfig() - cfg.Seed = true - cfg.DataDir = greetingTempDir - seeder, err := NewClient(cfg) - require.NoError(t, err) - defer seeder.Close() - testutil.ExportStatusWriter(seeder, "s") - seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) - seederTorrent.VerifyData() - cfg.DataDir, err = ioutil.TempDir("", "") - require.NoError(t, err) - defer os.RemoveAll(cfg.DataDir) - leecher, err := NewClient(cfg) - require.NoError(t, err) - defer leecher.Close() - testutil.ExportStatusWriter(leecher, "l") - cfg.Seed = false - cfg.DataDir, err = ioutil.TempDir("", "") - require.NoError(t, err) - defer os.RemoveAll(cfg.DataDir) - leecherLeecher, _ := NewClient(cfg) - defer leecherLeecher.Close() - testutil.ExportStatusWriter(leecherLeecher, "ll") - leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { - ret = TorrentSpecFromMetaInfo(mi) - ret.ChunkSize = 2 - return - }()) - llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) { - ret = TorrentSpecFromMetaInfo(mi) - ret.ChunkSize = 3 - return - }()) - // Simultaneously DownloadAll in Leecher, and read the contents - // consecutively in LeecherLeecher. This non-deterministically triggered a - // case where the leecher wouldn't unchoke the LeecherLeecher. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - r := llg.NewReader() - defer r.Close() - b, err := ioutil.ReadAll(r) - require.NoError(t, err) - assert.EqualValues(t, testutil.GreetingFileContents, b) - }() - addClientPeer(leecherGreeting, seeder) - addClientPeer(leecherGreeting, leecherLeecher) - wg.Add(1) - go func() { - defer wg.Done() - leecherGreeting.DownloadAll() - leecher.WaitAll() - }() - wg.Wait() + return storage.NewResourcePiecesOpts( + fc.AsResourceProvider(), + storage.ResourcePiecesOpts{ + LeaveIncompleteChunks: true, + }, + ) } func TestMergingTrackersByAddingSpecs(t *testing.T) { - cl, err := NewClient(TestingConfig()) + cl, err := NewClient(TestingConfig(t)) require.NoError(t, err) defer cl.Close() spec := TorrentSpec{} @@ -516,7 +169,7 @@ func TestMergingTrackersByAddingSpecs(t *testing.T) { // We read from a piece which is marked completed, but is missing data. func TestCompletedPieceWrongSize(t *testing.T) { - cfg := TestingConfig() + cfg := TestingConfig(t) cfg.DefaultStorage = badStorage{} cl, err := NewClient(cfg) require.NoError(t, err) @@ -539,20 +192,18 @@ func TestCompletedPieceWrongSize(t *testing.T) { assert.True(t, new) r := tt.NewReader() defer r.Close() - b, err = ioutil.ReadAll(r) - assert.Len(t, b, 13) - assert.NoError(t, err) + quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil) } func BenchmarkAddLargeTorrent(b *testing.B) { - cfg := TestingConfig() + cfg := TestingConfig(b) cfg.DisableTCP = true cfg.DisableUTP = true - cfg.ListenAddr = "redonk" cl, err := NewClient(cfg) require.NoError(b, err) defer cl.Close() - for range iter.N(b.N) { + b.ReportAllocs() + for i := 0; i < b.N; i += 1 { t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent") if err != nil { b.Fatal(err) @@ -564,7 +215,7 @@ func BenchmarkAddLargeTorrent(b *testing.B) { func TestResponsive(t *testing.T) { seederDataDir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(seederDataDir) - cfg := TestingConfig() + cfg := TestingConfig(t) cfg.Seed = true cfg.DataDir = seederDataDir seeder, err := NewClient(cfg) @@ -572,10 +223,52 @@ func TestResponsive(t *testing.T) { defer seeder.Close() seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) seederTorrent.VerifyData() - leecherDataDir, err := ioutil.TempDir("", "") + leecherDataDir := t.TempDir() + cfg = TestingConfig(t) + cfg.DataDir = leecherDataDir + leecher, err := NewClient(cfg) require.Nil(t, err) - defer os.RemoveAll(leecherDataDir) - cfg = TestingConfig() + defer leecher.Close() + leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { + ret = TorrentSpecFromMetaInfo(mi) + ret.ChunkSize = 2 + return + }()) + leecherTorrent.AddClientPeer(seeder) + reader := leecherTorrent.NewReader() + defer reader.Close() + reader.SetReadahead(0) + reader.SetResponsive() + b := make([]byte, 2) + _, err = reader.Seek(3, io.SeekStart) + require.NoError(t, err) + _, err = io.ReadFull(reader, b) + assert.Nil(t, err) + assert.EqualValues(t, "lo", string(b)) + _, err = reader.Seek(11, io.SeekStart) + require.NoError(t, err) + n, err := io.ReadFull(reader, b) + assert.Nil(t, err) + assert.EqualValues(t, 2, n) + assert.EqualValues(t, "d\n", string(b)) +} + +// TestResponsive was the first test to fail if uTP is disabled and TCP sockets dial from the +// listening port. +func TestResponsiveTcpOnly(t *testing.T) { + seederDataDir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(seederDataDir) + cfg := TestingConfig(t) + cfg.DisableUTP = true + cfg.Seed = true + cfg.DataDir = seederDataDir + seeder, err := NewClient(cfg) + require.Nil(t, err) + defer seeder.Close() + seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) + seederTorrent.VerifyData() + leecherDataDir := t.TempDir() + cfg = TestingConfig(t) cfg.DataDir = leecherDataDir leecher, err := NewClient(cfg) require.Nil(t, err) @@ -585,7 +278,7 @@ func TestResponsive(t *testing.T) { ret.ChunkSize = 2 return }()) - addClientPeer(leecherTorrent, seeder) + leecherTorrent.AddClientPeer(seeder) reader := leecherTorrent.NewReader() defer reader.Close() reader.SetReadahead(0) @@ -607,7 +300,7 @@ func TestResponsive(t *testing.T) { func TestTorrentDroppedDuringResponsiveRead(t *testing.T) { seederDataDir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(seederDataDir) - cfg := TestingConfig() + cfg := TestingConfig(t) cfg.Seed = true cfg.DataDir = seederDataDir seeder, err := NewClient(cfg) @@ -615,10 +308,8 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) { defer seeder.Close() seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) seederTorrent.VerifyData() - leecherDataDir, err := ioutil.TempDir("", "") - require.Nil(t, err) - defer os.RemoveAll(leecherDataDir) - cfg = TestingConfig() + leecherDataDir := t.TempDir() + cfg = TestingConfig(t) cfg.DataDir = leecherDataDir leecher, err := NewClient(cfg) require.Nil(t, err) @@ -628,7 +319,7 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) { ret.ChunkSize = 2 return }()) - addClientPeer(leecherTorrent, seeder) + leecherTorrent.AddClientPeer(seeder) reader := leecherTorrent.NewReader() defer reader.Close() reader.SetReadahead(0) @@ -639,30 +330,36 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) { _, err = io.ReadFull(reader, b) assert.Nil(t, err) assert.EqualValues(t, "lo", string(b)) - go leecherTorrent.Drop() _, err = reader.Seek(11, io.SeekStart) require.NoError(t, err) + leecherTorrent.Drop() n, err := reader.Read(b) assert.EqualError(t, err, "torrent closed") assert.EqualValues(t, 0, n) } -func TestDHTInheritBlocklist(t *testing.T) { +func TestDhtInheritBlocklist(t *testing.T) { ipl := iplist.New(nil) require.NotNil(t, ipl) - cfg := TestingConfig() + cfg := TestingConfig(t) cfg.IPBlocklist = ipl cfg.NoDHT = false cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() - require.Equal(t, ipl, cl.DHT().IPBlocklist()) + numServers := 0 + cl.eachDhtServer(func(s DhtServer) { + t.Log(s) + assert.Equal(t, ipl, s.(AnacrolixDhtServerWrapper).Server.IPBlocklist()) + numServers++ + }) + assert.EqualValues(t, 2, numServers) } // Check that stuff is merged in subsequent AddTorrentSpec for the same // infohash. func TestAddTorrentSpecMerging(t *testing.T) { - cl, err := NewClient(TestingConfig()) + cl, err := NewClient(TestingConfig(t)) require.NoError(t, err) defer cl.Close() dir, mi := testutil.GreetingTestTorrent() @@ -682,7 +379,7 @@ func TestAddTorrentSpecMerging(t *testing.T) { func TestTorrentDroppedBeforeGotInfo(t *testing.T) { dir, mi := testutil.GreetingTestTorrent() os.RemoveAll(dir) - cl, _ := NewClient(TestingConfig()) + cl, _ := NewClient(TestingConfig(t)) defer cl.Close() tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{ InfoHash: mi.HashInfoBytes(), @@ -697,22 +394,19 @@ func TestTorrentDroppedBeforeGotInfo(t *testing.T) { } func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) { - for i := range iter.N(info.NumPieces()) { + for i := 0; i < info.NumPieces(); i += 1 { p := info.Piece(i) ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0) } } func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) { - fileCacheDir, err := ioutil.TempDir("", "") - require.NoError(t, err) - defer os.RemoveAll(fileCacheDir) + fileCacheDir := t.TempDir() fileCache, err := filecache.NewCache(fileCacheDir) require.NoError(t, err) greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent() defer os.RemoveAll(greetingDataTempDir) filePieceStore := csf(fileCache) - defer filePieceStore.Close() info, err := greetingMetainfo.UnmarshalInfo() require.NoError(t, err) ih := greetingMetainfo.HashInfoBytes() @@ -727,7 +421,7 @@ func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf require.NoError(t, greetingData.Piece(p).MarkComplete()) } } - cfg := TestingConfig() + cfg := TestingConfig(t) // TODO: Disable network option? cfg.DisableTCP = true cfg.DisableUTP = true @@ -743,9 +437,7 @@ func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf assert.Equal(t, alreadyCompleted, psrs[0].Complete) if alreadyCompleted { r := tt.NewReader() - b, err := ioutil.ReadAll(r) - assert.NoError(t, err) - assert.EqualValues(t, testutil.GreetingFileContents, b) + quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil) } } @@ -758,28 +450,35 @@ func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) { } func TestAddMetainfoWithNodes(t *testing.T) { - cfg := TestingConfig() - cfg.ListenAddr = ":0" + cfg := TestingConfig(t) + cfg.ListenHost = func(string) string { return "" } cfg.NoDHT = false - // For now, we want to just jam the nodes into the table, without - // verifying them first. Also the DHT code doesn't support mixing secure - // and insecure nodes if security is enabled (yet). - cfg.DHTConfig.NoSecurity = true + cfg.DhtStartingNodes = func(string) dht.StartingNodesGetter { return func() ([]dht.Addr, error) { return nil, nil } } + // For now, we want to just jam the nodes into the table, without verifying them first. Also the + // DHT code doesn't support mixing secure and insecure nodes if security is enabled (yet). + // cfg.DHTConfig.NoSecurity = true cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() - assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions) + sum := func() (ret int64) { + cl.eachDhtServer(func(s DhtServer) { + ret += s.Stats().(dht.ServerStats).OutboundQueriesAttempted + }) + return + } + assert.EqualValues(t, 0, sum()) tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent") require.NoError(t, err) // Nodes are not added or exposed in Torrent's metainfo. We just randomly // check if the announce-list is here instead. TODO: Add nodes. assert.Len(t, tt.metainfo.AnnounceList, 5) // There are 6 nodes in the torrent file. - assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions) + for sum() != int64(6*len(cl.dhtServers)) { + time.Sleep(time.Millisecond) + } } type testDownloadCancelParams struct { - ExportClientStatus bool SetLeecherStorageCapacity bool LeecherStorageCapacity int64 Cancel bool @@ -788,20 +487,16 @@ type testDownloadCancelParams struct { func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) { greetingTempDir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(greetingTempDir) - cfg := TestingConfig() + cfg := TestingConfig(t) cfg.Seed = true cfg.DataDir = greetingTempDir seeder, err := NewClient(cfg) require.NoError(t, err) defer seeder.Close() - if ps.ExportClientStatus { - testutil.ExportStatusWriter(seeder, "s") - } + defer testutil.ExportStatusWriter(seeder, "s", t)() seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) seederTorrent.VerifyData() - leecherDataDir, err := ioutil.TempDir("", "") - require.NoError(t, err) - defer os.RemoveAll(leecherDataDir) + leecherDataDir := t.TempDir() fc, err := filecache.NewCache(leecherDataDir) require.NoError(t, err) if ps.SetLeecherStorageCapacity { @@ -809,11 +504,10 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) { } cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider()) cfg.DataDir = leecherDataDir - leecher, _ := NewClient(cfg) + leecher, err := NewClient(cfg) + require.NoError(t, err) defer leecher.Close() - if ps.ExportClientStatus { - testutil.ExportStatusWriter(leecher, "l") - } + defer testutil.ExportStatusWriter(leecher, "l", t)() leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { ret = TorrentSpecFromMetaInfo(mi) ret.ChunkSize = 2 @@ -824,33 +518,27 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) { psc := leecherGreeting.SubscribePieceStateChanges() defer psc.Close() - leecherGreeting.cl.mu.Lock() + leecherGreeting.cl.lock() leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces()) if ps.Cancel { - leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces()) + leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces(), "") } - leecherGreeting.cl.mu.Unlock() - - addClientPeer(leecherGreeting, seeder) + leecherGreeting.cl.unlock() + done := make(chan struct{}) + defer close(done) + go leecherGreeting.AddClientPeer(seeder) completes := make(map[int]bool, 3) -values: - for { - // started := time.Now() - select { - case _v := <-psc.Values: - // log.Print(time.Since(started)) - v := _v.(PieceStateChange) - completes[v.Index] = v.Complete - case <-time.After(100 * time.Millisecond): - break values + expected := func() map[int]bool { + if ps.Cancel { + return map[int]bool{0: false, 1: false, 2: false} + } else { + return map[int]bool{0: true, 1: true, 2: true} } + }() + for !reflect.DeepEqual(completes, expected) { + v := <-psc.Values + completes[v.Index] = v.Complete } - if ps.Cancel { - assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes) - } else { - assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes) - } - } func TestTorrentDownloadAll(t *testing.T) { @@ -865,7 +553,9 @@ func TestTorrentDownloadAllThenCancel(t *testing.T) { // Ensure that it's an error for a peer to send an invalid have message. func TestPeerInvalidHave(t *testing.T) { - cl, err := NewClient(TestingConfig()) + cfg := TestingConfig(t) + cfg.DropMutuallyCompletePeers = false + cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() info := metainfo.Info{ @@ -883,9 +573,14 @@ func TestPeerInvalidHave(t *testing.T) { require.NoError(t, err) assert.True(t, _new) defer tt.Drop() - cn := &connection{ - t: tt, - } + cn := &PeerConn{Peer: Peer{ + t: tt, + callbacks: &cfg.Callbacks, + }} + tt.conns[cn] = struct{}{} + cn.peerImpl = cn + cl.lock() + defer cl.unlock() assert.NoError(t, cn.peerSentHave(0)) assert.Error(t, cn.peerSentHave(1)) } @@ -893,101 +588,79 @@ func TestPeerInvalidHave(t *testing.T) { func TestPieceCompletedInStorageButNotClient(t *testing.T) { greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent() defer os.RemoveAll(greetingTempDir) - cfg := TestingConfig() + cfg := TestingConfig(t) cfg.DataDir = greetingTempDir - seeder, err := NewClient(TestingConfig()) + seeder, err := NewClient(TestingConfig(t)) require.NoError(t, err) + defer seeder.Close() seeder.AddTorrentSpec(&TorrentSpec{ InfoBytes: greetingMetainfo.InfoBytes, }) } -func TestPrepareTrackerAnnounce(t *testing.T) { - cl := &Client{} - blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp") - require.NoError(t, err) - assert.False(t, blocked) - assert.EqualValues(t, "localhost:1234", host) - assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse) -} - // Check that when the listen port is 0, all the protocols listened on have // the same port, and it isn't zero. func TestClientDynamicListenPortAllProtocols(t *testing.T) { - cl, err := NewClient(TestingConfig()) + cl, err := NewClient(TestingConfig(t)) require.NoError(t, err) defer cl.Close() - assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr())) - assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr())) + port := cl.LocalPort() + assert.NotEqual(t, 0, port) + cl.eachListener(func(s Listener) bool { + assert.Equal(t, port, missinggo.AddrPort(s.Addr())) + return true + }) } func TestClientDynamicListenTCPOnly(t *testing.T) { - cfg := TestingConfig() + cfg := TestingConfig(t) cfg.DisableUTP = true + cfg.DisableTCP = false cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() - assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr())) - assert.Nil(t, cl.utpSock) + assert.NotEqual(t, 0, cl.LocalPort()) } func TestClientDynamicListenUTPOnly(t *testing.T) { - cfg := TestingConfig() - cfg.DisableTCP = true - cl, err := NewClient(cfg) - require.NoError(t, err) - defer cl.Close() - assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr())) - assert.Nil(t, cl.tcpListener) -} - -func TestClientDynamicListenPortNoProtocols(t *testing.T) { - cfg := TestingConfig() + cfg := TestingConfig(t) cfg.DisableTCP = true - cfg.DisableUTP = true + cfg.DisableUTP = false cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() - assert.Nil(t, cl.ListenAddr()) -} - -func addClientPeer(t *Torrent, cl *Client) { - t.AddPeers([]Peer{ - { - IP: missinggo.AddrIP(cl.ListenAddr()), - Port: missinggo.AddrPort(cl.ListenAddr()), - }, - }) + assert.NotEqual(t, 0, cl.LocalPort()) } func totalConns(tts []*Torrent) (ret int) { for _, tt := range tts { - tt.cl.mu.Lock() + tt.cl.lock() ret += len(tt.conns) - tt.cl.mu.Unlock() + tt.cl.unlock() } return } func TestSetMaxEstablishedConn(t *testing.T) { - ss := testutil.NewStatusServer(t) - defer ss.Close() var tts []*Torrent ih := testutil.GreetingMetaInfo().HashInfoBytes() - for i := range iter.N(3) { - cl, err := NewClient(TestingConfig()) + cfg := TestingConfig(t) + cfg.DisableAcceptRateLimiting = true + cfg.DropDuplicatePeerIds = true + for i := 0; i < 3; i += 1 { + cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() tt, _ := cl.AddTorrentInfoHash(ih) tt.SetMaxEstablishedConns(2) - ss.HandleStatusWriter(cl, fmt.Sprintf("/%d", i)) + defer testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i), t)() tts = append(tts, tt) } addPeers := func() { for _, tt := range tts { for _, _tt := range tts { // if tt != _tt { - addClientPeer(tt, _tt.cl) + tt.AddClientPeer(_tt.cl) // } } } @@ -1012,8 +685,10 @@ func TestSetMaxEstablishedConn(t *testing.T) { waitTotalConns(6) } -func makeMagnet(t *testing.T, cl *Client, dir string, name string) string { - os.MkdirAll(dir, 0770) +// Creates a file containing its own name as data. Make a metainfo from that, adds it to the given +// client, and returns a magnet link. +func makeMagnet(t *testing.T, cl *Client, dir, name string) string { + os.MkdirAll(dir, 0o770) file, err := os.Create(filepath.Join(dir, name)) require.NoError(t, err) file.Write([]byte(name)) @@ -1025,7 +700,7 @@ func makeMagnet(t *testing.T, cl *Client, dir string, name string) string { require.NoError(t, err) mi.InfoBytes, err = bencode.Marshal(info) require.NoError(t, err) - magnet := mi.Magnet(name, mi.HashInfoBytes()).String() + magnet := mi.Magnet(nil, &info).String() tr, err := cl.AddTorrent(&mi) require.NoError(t, err) require.True(t, tr.Seeding()) @@ -1035,45 +710,200 @@ func makeMagnet(t *testing.T, cl *Client, dir string, name string) string { // https://github.com/anacrolix/torrent/issues/114 func TestMultipleTorrentsWithEncryption(t *testing.T) { - cfg := TestingConfig() - cfg.DisableUTP = true + testSeederLeecherPair( + t, + func(cfg *ClientConfig) { + cfg.HeaderObfuscationPolicy.Preferred = true + cfg.HeaderObfuscationPolicy.RequirePreferred = true + }, + func(cfg *ClientConfig) { + cfg.HeaderObfuscationPolicy.RequirePreferred = false + }, + ) +} + +// Test that the leecher can download a torrent in its entirety from the seeder. Note that the +// seeder config is done first. +func testSeederLeecherPair(t *testing.T, seeder, leecher func(*ClientConfig)) { + cfg := TestingConfig(t) cfg.Seed = true cfg.DataDir = filepath.Join(cfg.DataDir, "server") - cfg.ForceEncryption = true - os.Mkdir(cfg.DataDir, 0755) + os.Mkdir(cfg.DataDir, 0o755) + seeder(cfg) server, err := NewClient(cfg) require.NoError(t, err) defer server.Close() - testutil.ExportStatusWriter(server, "s") + defer testutil.ExportStatusWriter(server, "s", t)() magnet1 := makeMagnet(t, server, cfg.DataDir, "test1") + // Extra torrents are added to test the seeder having to match incoming obfuscated headers + // against more than one torrent. See issue #114 makeMagnet(t, server, cfg.DataDir, "test2") - cfg = TestingConfig() - cfg.DisableUTP = true + for i := 0; i < 100; i++ { + makeMagnet(t, server, cfg.DataDir, fmt.Sprintf("test%d", i+2)) + } + cfg = TestingConfig(t) cfg.DataDir = filepath.Join(cfg.DataDir, "client") - cfg.ForceEncryption = true + leecher(cfg) client, err := NewClient(cfg) require.NoError(t, err) defer client.Close() - testutil.ExportStatusWriter(client, "c") + defer testutil.ExportStatusWriter(client, "c", t)() tr, err := client.AddMagnet(magnet1) require.NoError(t, err) - tr.AddPeers([]Peer{{ - IP: missinggo.AddrIP(server.ListenAddr()), - Port: missinggo.AddrPort(server.ListenAddr()), - }}) + tr.AddClientPeer(server) <-tr.GotInfo() tr.DownloadAll() client.WaitAll() } +// This appears to be the situation with the S3 BitTorrent client. +func TestObfuscatedHeaderFallbackSeederDisallowsLeecherPrefers(t *testing.T) { + // Leecher prefers obfuscation, but the seeder does not allow it. + testSeederLeecherPair( + t, + func(cfg *ClientConfig) { + cfg.HeaderObfuscationPolicy.Preferred = false + cfg.HeaderObfuscationPolicy.RequirePreferred = true + }, + func(cfg *ClientConfig) { + cfg.HeaderObfuscationPolicy.Preferred = true + cfg.HeaderObfuscationPolicy.RequirePreferred = false + }, + ) +} + +func TestObfuscatedHeaderFallbackSeederRequiresLeecherPrefersNot(t *testing.T) { + // Leecher prefers no obfuscation, but the seeder enforces it. + testSeederLeecherPair( + t, + func(cfg *ClientConfig) { + cfg.HeaderObfuscationPolicy.Preferred = true + cfg.HeaderObfuscationPolicy.RequirePreferred = true + }, + func(cfg *ClientConfig) { + cfg.HeaderObfuscationPolicy.Preferred = false + cfg.HeaderObfuscationPolicy.RequirePreferred = false + }, + ) +} + func TestClientAddressInUse(t *testing.T) { - s, _ := NewUtpSocket("udp", ":50007") + s, _ := NewUtpSocket("udp", "localhost:50007", nil, log.Default) if s != nil { defer s.Close() } - cfg := TestingConfig() - cfg.ListenAddr = ":50007" + cfg := TestingConfig(t).SetListenAddr("localhost:50007") + cfg.DisableUTP = false cl, err := NewClient(cfg) + if err == nil { + assert.Nil(t, cl.Close()) + } require.Error(t, err) require.Nil(t, cl) } + +func TestClientHasDhtServersWhenUtpDisabled(t *testing.T) { + cc := TestingConfig(t) + cc.DisableUTP = true + cc.NoDHT = false + cl, err := NewClient(cc) + require.NoError(t, err) + defer cl.Close() + assert.NotEmpty(t, cl.DhtServers()) +} + +func TestClientDisabledImplicitNetworksButDhtEnabled(t *testing.T) { + cfg := TestingConfig(t) + cfg.DisableTCP = true + cfg.DisableUTP = true + cfg.NoDHT = false + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + assert.Empty(t, cl.listeners) + assert.NotEmpty(t, cl.DhtServers()) +} + +func TestBadPeerIpPort(t *testing.T) { + for _, tc := range []struct { + title string + ip net.IP + port int + expectedOk bool + setup func(*Client) + }{ + {"empty both", nil, 0, true, func(*Client) {}}, + {"empty/nil ip", nil, 6666, true, func(*Client) {}}, + { + "empty port", + net.ParseIP("127.0.0.1/32"), + 0, true, + func(*Client) {}, + }, + { + "in doppleganger addresses", + net.ParseIP("127.0.0.1/32"), + 2322, + true, + func(cl *Client) { + cl.dopplegangerAddrs["10.0.0.1:2322"] = struct{}{} + }, + }, + { + "in IP block list", + net.ParseIP("10.0.0.1"), + 2322, + true, + func(cl *Client) { + cl.ipBlockList = iplist.New([]iplist.Range{ + {First: net.ParseIP("10.0.0.1"), Last: net.ParseIP("10.0.0.255")}, + }) + }, + }, + { + "in bad peer IPs", + net.ParseIP("10.0.0.1"), + 2322, + true, + func(cl *Client) { + ipAddr, ok := netip.AddrFromSlice(net.ParseIP("10.0.0.1")) + require.True(t, ok) + cl.badPeerIPs = map[netip.Addr]struct{}{} + cl.badPeerIPs[ipAddr] = struct{}{} + }, + }, + { + "good", + net.ParseIP("10.0.0.1"), + 2322, + false, + func(cl *Client) {}, + }, + } { + t.Run(tc.title, func(t *testing.T) { + cfg := TestingConfig(t) + cfg.DisableTCP = true + cfg.DisableUTP = true + cfg.NoDHT = false + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + tc.setup(cl) + require.Equal(t, tc.expectedOk, cl.badPeerIPPort(tc.ip, tc.port)) + }) + } +} + +// https://github.com/anacrolix/torrent/issues/837 +func TestClientConfigSetHandlerNotIgnored(t *testing.T) { + cfg := TestingConfig(t) + cfg.Logger.SetHandlers(log.DiscardHandler) + c := qt.New(t) + cl, err := NewClient(cfg) + c.Assert(err, qt.IsNil) + defer cl.Close() + c.Assert(cl.logger.Handlers, qt.HasLen, 1) + h := cl.logger.Handlers[0].(log.StreamHandler) + c.Check(h.W, qt.Equals, io.Discard) +}