16 "github.com/anacrolix/dht"
17 _ "github.com/anacrolix/envpprof"
18 "github.com/anacrolix/missinggo"
19 "github.com/anacrolix/missinggo/filecache"
20 "github.com/bradfitz/iter"
21 "github.com/stretchr/testify/assert"
22 "github.com/stretchr/testify/require"
23 "golang.org/x/time/rate"
25 "github.com/anacrolix/torrent/bencode"
26 "github.com/anacrolix/torrent/internal/testutil"
27 "github.com/anacrolix/torrent/iplist"
28 "github.com/anacrolix/torrent/metainfo"
29 "github.com/anacrolix/torrent/storage"
32 func TestingConfig() *Config {
34 ListenHost: LoopbackListenHost,
37 DisableTrackers: true,
38 NoDefaultPortForwarding: true,
43 func TestClientDefault(t *testing.T) {
44 cl, err := NewClient(TestingConfig())
45 require.NoError(t, err)
49 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
50 cfg := TestingConfig()
51 pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
52 require.NoError(t, err)
53 ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
55 cfg.DefaultStorage = ci
56 cl, err := NewClient(cfg)
57 require.NoError(t, err)
59 // And again, https://github.com/anacrolix/torrent/issues/158
60 cl, err = NewClient(cfg)
61 require.NoError(t, err)
65 func TestAddDropTorrent(t *testing.T) {
66 cl, err := NewClient(TestingConfig())
67 require.NoError(t, err)
69 dir, mi := testutil.GreetingTestTorrent()
70 defer os.RemoveAll(dir)
71 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
72 require.NoError(t, err)
74 tt.SetMaxEstablishedConns(0)
75 tt.SetMaxEstablishedConns(1)
79 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
84 func TestAddTorrentNoUsableURLs(t *testing.T) {
89 func TestAddPeersToUnknownTorrent(t *testing.T) {
94 func TestPieceHashSize(t *testing.T) {
95 assert.Equal(t, 20, pieceHash.Size())
98 func TestTorrentInitialState(t *testing.T) {
99 dir, mi := testutil.GreetingTestTorrent()
100 defer os.RemoveAll(dir)
103 tor := cl.newTorrent(
105 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
109 err := tor.setInfoBytes(mi.InfoBytes)
111 require.NoError(t, err)
112 require.Len(t, tor.pieces, 3)
113 tor.pendAllChunkSpecs(0)
115 assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
117 assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
120 func TestUnmarshalPEXMsg(t *testing.T) {
121 var m peerExchangeMessage
122 if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
125 if len(m.Added) != 2 {
128 if m.Added[0].Port != 0x506 {
133 func TestReducedDialTimeout(t *testing.T) {
136 for _, _case := range []struct {
140 ExpectedReduced time.Duration
142 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
143 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
144 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
145 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
146 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
147 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
149 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
150 expected := _case.ExpectedReduced
151 if expected < cfg.MinDialTimeout {
152 expected = cfg.MinDialTimeout
154 if reduced != expected {
155 t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
160 func TestUTPRawConn(t *testing.T) {
161 l, err := NewUtpSocket("udp", "")
162 require.NoError(t, err)
172 // Connect a UTP peer to see if the RawConn will still work.
173 s, err := NewUtpSocket("udp", "")
174 require.NoError(t, err)
176 utpPeer, err := s.DialContext(context.Background(), "", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
177 require.NoError(t, err)
178 defer utpPeer.Close()
179 peer, err := net.ListenPacket("udp", ":0")
180 require.NoError(t, err)
184 // How many messages to send. I've set this to double the channel buffer
185 // size in the raw packetConn.
187 readerStopped := make(chan struct{})
188 // The reader goroutine.
190 defer close(readerStopped)
191 b := make([]byte, 500)
192 for i := 0; i < N; i++ {
193 n, _, err := l.ReadFrom(b)
194 require.NoError(t, err)
197 fmt.Sscan(string(b[:n]), &d)
198 assert.Equal(t, i, d)
201 udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
202 require.NoError(t, err)
203 for i := 0; i < N; i++ {
204 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
205 require.NoError(t, err)
206 time.Sleep(time.Millisecond)
209 case <-readerStopped:
210 case <-time.After(time.Second):
211 t.Fatal("reader timed out")
213 if msgsReceived != N {
214 t.Fatalf("messages received: %d", msgsReceived)
218 func TestAddDropManyTorrents(t *testing.T) {
219 cl, err := NewClient(TestingConfig())
220 require.NoError(t, err)
222 for i := range iter.N(1000) {
224 binary.PutVarint(spec.InfoHash[:], int64(i))
225 tt, new, err := cl.AddTorrentSpec(&spec)
226 assert.NoError(t, err)
232 type FileCacheClientStorageFactoryParams struct {
235 Wrapper func(*filecache.Cache) storage.ClientImpl
238 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
239 return func(dataDir string) storage.ClientImpl {
240 fc, err := filecache.NewCache(dataDir)
245 fc.SetCapacity(ps.Capacity)
247 return ps.Wrapper(fc)
251 type storageFactory func(string) storage.ClientImpl
253 func TestClientTransferDefault(t *testing.T) {
254 testClientTransfer(t, testClientTransferParams{
255 ExportClientStatus: true,
256 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
257 Wrapper: fileCachePieceResourceStorage,
262 func TestClientTransferRateLimitedUpload(t *testing.T) {
263 started := time.Now()
264 testClientTransfer(t, testClientTransferParams{
265 // We are uploading 13 bytes (the length of the greeting torrent). The
266 // chunks are 2 bytes in length. Then the smallest burst we can run
267 // with is 2. Time taken is (13-burst)/rate.
268 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
269 ExportClientStatus: true,
271 require.True(t, time.Since(started) > time.Second)
274 func TestClientTransferRateLimitedDownload(t *testing.T) {
275 testClientTransfer(t, testClientTransferParams{
276 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
280 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
281 return storage.NewResourcePieces(fc.AsResourceProvider())
284 func TestClientTransferSmallCache(t *testing.T) {
285 testClientTransfer(t, testClientTransferParams{
286 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
288 // Going below the piece length means it can't complete a piece so
289 // that it can be hashed.
291 Wrapper: fileCachePieceResourceStorage,
294 // Can't readahead too far or the cache will thrash and drop data we
297 ExportClientStatus: true,
301 func TestClientTransferVarious(t *testing.T) {
303 for _, ls := range []storageFactory{
304 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
305 Wrapper: fileCachePieceResourceStorage,
310 for _, ss := range []func(string) storage.ClientImpl{
314 for _, responsive := range []bool{false, true} {
315 testClientTransfer(t, testClientTransferParams{
316 Responsive: responsive,
320 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
321 testClientTransfer(t, testClientTransferParams{
323 Responsive: responsive,
325 Readahead: readahead,
334 type testClientTransferParams struct {
338 ExportClientStatus bool
339 LeecherStorage func(string) storage.ClientImpl
340 SeederStorage func(string) storage.ClientImpl
341 SeederUploadRateLimiter *rate.Limiter
342 LeecherDownloadRateLimiter *rate.Limiter
345 // Creates a seeder and a leecher, and ensures the data transfers when a read
346 // is attempted on the leecher.
347 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
348 greetingTempDir, mi := testutil.GreetingTestTorrent()
349 defer os.RemoveAll(greetingTempDir)
350 // Create seeder and a Torrent.
351 cfg := TestingConfig()
353 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
354 // cfg.ListenAddr = "localhost:4000"
355 if ps.SeederStorage != nil {
356 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
357 defer cfg.DefaultStorage.Close()
359 cfg.DataDir = greetingTempDir
361 seeder, err := NewClient(cfg)
362 require.NoError(t, err)
363 if ps.ExportClientStatus {
364 testutil.ExportStatusWriter(seeder, "s")
366 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
367 // Run a Stats right after Closing the Client. This will trigger the Stats
368 // panic in #214 caused by RemoteAddr on Closed uTP sockets.
369 defer seederTorrent.Stats()
371 seederTorrent.VerifyData()
372 // Create leecher and a Torrent.
373 leecherDataDir, err := ioutil.TempDir("", "")
374 require.NoError(t, err)
375 defer os.RemoveAll(leecherDataDir)
376 if ps.LeecherStorage == nil {
377 cfg.DataDir = leecherDataDir
379 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
381 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
383 leecher, err := NewClient(cfg)
384 require.NoError(t, err)
385 defer leecher.Close()
386 if ps.ExportClientStatus {
387 testutil.ExportStatusWriter(leecher, "l")
389 leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
390 ret = TorrentSpecFromMetaInfo(mi)
394 require.NoError(t, err)
396 // Now do some things with leecher and seeder.
397 leecherTorrent.AddClientPeer(seeder)
398 // The Torrent should not be interested in obtaining peers, so the one we
399 // just added should be the only one.
400 assert.False(t, leecherTorrent.Seeding())
401 assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
402 r := leecherTorrent.NewReader()
408 r.SetReadahead(ps.Readahead)
410 assertReadAllGreeting(t, r)
411 assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData)
412 assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten)
413 assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData)
414 assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead)
415 // Try reading through again for the cases where the torrent data size
416 // exceeds the size of the cache.
417 assertReadAllGreeting(t, r)
420 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
421 pos, err := r.Seek(0, io.SeekStart)
422 assert.NoError(t, err)
423 assert.EqualValues(t, 0, pos)
424 _greeting, err := ioutil.ReadAll(r)
425 assert.NoError(t, err)
426 assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
429 // Check that after completing leeching, a leecher transitions to a seeding
430 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
431 func TestSeedAfterDownloading(t *testing.T) {
432 greetingTempDir, mi := testutil.GreetingTestTorrent()
433 defer os.RemoveAll(greetingTempDir)
434 cfg := TestingConfig()
436 cfg.DataDir = greetingTempDir
437 seeder, err := NewClient(cfg)
438 require.NoError(t, err)
440 testutil.ExportStatusWriter(seeder, "s")
441 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
442 seederTorrent.VerifyData()
443 cfg.DataDir, err = ioutil.TempDir("", "")
444 require.NoError(t, err)
445 defer os.RemoveAll(cfg.DataDir)
446 leecher, err := NewClient(cfg)
447 require.NoError(t, err)
448 defer leecher.Close()
449 testutil.ExportStatusWriter(leecher, "l")
451 cfg.DataDir, err = ioutil.TempDir("", "")
452 require.NoError(t, err)
453 defer os.RemoveAll(cfg.DataDir)
454 leecherLeecher, _ := NewClient(cfg)
455 require.NoError(t, err)
456 defer leecherLeecher.Close()
457 testutil.ExportStatusWriter(leecherLeecher, "ll")
458 leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
459 ret = TorrentSpecFromMetaInfo(mi)
463 llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
464 ret = TorrentSpecFromMetaInfo(mi)
468 // Simultaneously DownloadAll in Leecher, and read the contents
469 // consecutively in LeecherLeecher. This non-deterministically triggered a
470 // case where the leecher wouldn't unchoke the LeecherLeecher.
471 var wg sync.WaitGroup
477 b, err := ioutil.ReadAll(r)
478 require.NoError(t, err)
479 assert.EqualValues(t, testutil.GreetingFileContents, b)
481 leecherGreeting.AddClientPeer(seeder)
482 leecherGreeting.AddClientPeer(leecherLeecher)
486 leecherGreeting.DownloadAll()
492 func TestMergingTrackersByAddingSpecs(t *testing.T) {
493 cl, err := NewClient(TestingConfig())
494 require.NoError(t, err)
496 spec := TorrentSpec{}
497 T, new, _ := cl.AddTorrentSpec(&spec)
501 spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
502 _, new, _ = cl.AddTorrentSpec(&spec)
504 assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
505 // Because trackers are disabled in TestingConfig.
506 assert.EqualValues(t, 0, len(T.trackerAnnouncers))
509 // We read from a piece which is marked completed, but is missing data.
510 func TestCompletedPieceWrongSize(t *testing.T) {
511 cfg := TestingConfig()
512 cfg.DefaultStorage = badStorage{}
513 cl, err := NewClient(cfg)
514 require.NoError(t, err)
516 info := metainfo.Info{
518 Pieces: make([]byte, 20),
519 Files: []metainfo.FileInfo{
520 {Path: []string{"greeting"}, Length: 13},
523 b, err := bencode.Marshal(info)
524 require.NoError(t, err)
525 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
527 InfoHash: metainfo.HashBytes(b),
529 require.NoError(t, err)
534 b, err = ioutil.ReadAll(r)
536 assert.NoError(t, err)
539 func BenchmarkAddLargeTorrent(b *testing.B) {
540 cfg := TestingConfig()
541 cfg.DisableTCP = true
542 cfg.DisableUTP = true
543 cfg.ListenHost = func(string) string { return "redonk" }
544 cl, err := NewClient(cfg)
545 require.NoError(b, err)
547 for range iter.N(b.N) {
548 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
556 func TestResponsive(t *testing.T) {
557 seederDataDir, mi := testutil.GreetingTestTorrent()
558 defer os.RemoveAll(seederDataDir)
559 cfg := TestingConfig()
561 cfg.DataDir = seederDataDir
562 seeder, err := NewClient(cfg)
565 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
566 seederTorrent.VerifyData()
567 leecherDataDir, err := ioutil.TempDir("", "")
569 defer os.RemoveAll(leecherDataDir)
570 cfg = TestingConfig()
571 cfg.DataDir = leecherDataDir
572 leecher, err := NewClient(cfg)
574 defer leecher.Close()
575 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
576 ret = TorrentSpecFromMetaInfo(mi)
580 leecherTorrent.AddClientPeer(seeder)
581 reader := leecherTorrent.NewReader()
583 reader.SetReadahead(0)
584 reader.SetResponsive()
586 _, err = reader.Seek(3, io.SeekStart)
587 require.NoError(t, err)
588 _, err = io.ReadFull(reader, b)
590 assert.EqualValues(t, "lo", string(b))
591 _, err = reader.Seek(11, io.SeekStart)
592 require.NoError(t, err)
593 n, err := io.ReadFull(reader, b)
595 assert.EqualValues(t, 2, n)
596 assert.EqualValues(t, "d\n", string(b))
599 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
600 seederDataDir, mi := testutil.GreetingTestTorrent()
601 defer os.RemoveAll(seederDataDir)
602 cfg := TestingConfig()
604 cfg.DataDir = seederDataDir
605 seeder, err := NewClient(cfg)
608 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
609 seederTorrent.VerifyData()
610 leecherDataDir, err := ioutil.TempDir("", "")
612 defer os.RemoveAll(leecherDataDir)
613 cfg = TestingConfig()
614 cfg.DataDir = leecherDataDir
615 leecher, err := NewClient(cfg)
617 defer leecher.Close()
618 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
619 ret = TorrentSpecFromMetaInfo(mi)
623 leecherTorrent.AddClientPeer(seeder)
624 reader := leecherTorrent.NewReader()
626 reader.SetReadahead(0)
627 reader.SetResponsive()
629 _, err = reader.Seek(3, io.SeekStart)
630 require.NoError(t, err)
631 _, err = io.ReadFull(reader, b)
633 assert.EqualValues(t, "lo", string(b))
634 go leecherTorrent.Drop()
635 _, err = reader.Seek(11, io.SeekStart)
636 require.NoError(t, err)
637 n, err := reader.Read(b)
638 assert.EqualError(t, err, "torrent closed")
639 assert.EqualValues(t, 0, n)
642 func TestDHTInheritBlocklist(t *testing.T) {
643 ipl := iplist.New(nil)
644 require.NotNil(t, ipl)
645 cfg := TestingConfig()
646 cfg.IPBlocklist = ipl
648 cl, err := NewClient(cfg)
649 require.NoError(t, err)
652 cl.eachDhtServer(func(s *dht.Server) {
653 assert.Equal(t, ipl, s.IPBlocklist())
656 assert.EqualValues(t, 2, numServers)
659 // Check that stuff is merged in subsequent AddTorrentSpec for the same
661 func TestAddTorrentSpecMerging(t *testing.T) {
662 cl, err := NewClient(TestingConfig())
663 require.NoError(t, err)
665 dir, mi := testutil.GreetingTestTorrent()
666 defer os.RemoveAll(dir)
667 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
668 InfoHash: mi.HashInfoBytes(),
670 require.NoError(t, err)
672 require.Nil(t, tt.Info())
673 _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
674 require.NoError(t, err)
675 require.False(t, new)
676 require.NotNil(t, tt.Info())
679 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
680 dir, mi := testutil.GreetingTestTorrent()
682 cl, _ := NewClient(TestingConfig())
684 tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
685 InfoHash: mi.HashInfoBytes(),
688 assert.EqualValues(t, 0, len(cl.Torrents()))
696 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
697 for i := range iter.N(info.NumPieces()) {
699 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
703 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
704 fileCacheDir, err := ioutil.TempDir("", "")
705 require.NoError(t, err)
706 defer os.RemoveAll(fileCacheDir)
707 fileCache, err := filecache.NewCache(fileCacheDir)
708 require.NoError(t, err)
709 greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
710 defer os.RemoveAll(greetingDataTempDir)
711 filePieceStore := csf(fileCache)
712 defer filePieceStore.Close()
713 info, err := greetingMetainfo.UnmarshalInfo()
714 require.NoError(t, err)
715 ih := greetingMetainfo.HashInfoBytes()
716 greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
717 require.NoError(t, err)
718 writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
719 // require.Equal(t, len(testutil.GreetingFileContents), written)
720 // require.NoError(t, err)
721 for i := 0; i < info.NumPieces(); i++ {
723 if alreadyCompleted {
724 require.NoError(t, greetingData.Piece(p).MarkComplete())
727 cfg := TestingConfig()
728 // TODO: Disable network option?
729 cfg.DisableTCP = true
730 cfg.DisableUTP = true
731 cfg.DefaultStorage = filePieceStore
732 cl, err := NewClient(cfg)
733 require.NoError(t, err)
735 tt, err := cl.AddTorrent(greetingMetainfo)
736 require.NoError(t, err)
737 psrs := tt.PieceStateRuns()
738 assert.Len(t, psrs, 1)
739 assert.EqualValues(t, 3, psrs[0].Length)
740 assert.Equal(t, alreadyCompleted, psrs[0].Complete)
741 if alreadyCompleted {
743 b, err := ioutil.ReadAll(r)
744 assert.NoError(t, err)
745 assert.EqualValues(t, testutil.GreetingFileContents, b)
749 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
750 testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
753 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
754 testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
757 func TestAddMetainfoWithNodes(t *testing.T) {
758 cfg := TestingConfig()
759 cfg.ListenHost = func(string) string { return "" }
761 cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
762 // For now, we want to just jam the nodes into the table, without
763 // verifying them first. Also the DHT code doesn't support mixing secure
764 // and insecure nodes if security is enabled (yet).
765 // cfg.DHTConfig.NoSecurity = true
766 cl, err := NewClient(cfg)
767 require.NoError(t, err)
769 sum := func() (ret int64) {
770 cl.eachDhtServer(func(s *dht.Server) {
771 ret += s.Stats().OutboundQueriesAttempted
775 assert.EqualValues(t, 0, sum())
776 tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
777 require.NoError(t, err)
778 // Nodes are not added or exposed in Torrent's metainfo. We just randomly
779 // check if the announce-list is here instead. TODO: Add nodes.
780 assert.Len(t, tt.metainfo.AnnounceList, 5)
781 // There are 6 nodes in the torrent file.
782 assert.EqualValues(t, 6*len(cl.dhtServers), sum())
785 type testDownloadCancelParams struct {
786 ExportClientStatus bool
787 SetLeecherStorageCapacity bool
788 LeecherStorageCapacity int64
792 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
793 greetingTempDir, mi := testutil.GreetingTestTorrent()
794 defer os.RemoveAll(greetingTempDir)
795 cfg := TestingConfig()
797 cfg.DataDir = greetingTempDir
798 seeder, err := NewClient(cfg)
799 require.NoError(t, err)
801 if ps.ExportClientStatus {
802 testutil.ExportStatusWriter(seeder, "s")
804 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
805 seederTorrent.VerifyData()
806 leecherDataDir, err := ioutil.TempDir("", "")
807 require.NoError(t, err)
808 defer os.RemoveAll(leecherDataDir)
809 fc, err := filecache.NewCache(leecherDataDir)
810 require.NoError(t, err)
811 if ps.SetLeecherStorageCapacity {
812 fc.SetCapacity(ps.LeecherStorageCapacity)
814 cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
815 cfg.DataDir = leecherDataDir
816 leecher, _ := NewClient(cfg)
817 defer leecher.Close()
818 if ps.ExportClientStatus {
819 testutil.ExportStatusWriter(leecher, "l")
821 leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
822 ret = TorrentSpecFromMetaInfo(mi)
826 require.NoError(t, err)
828 psc := leecherGreeting.SubscribePieceStateChanges()
831 leecherGreeting.cl.mu.Lock()
832 leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
834 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
836 leecherGreeting.cl.mu.Unlock()
838 leecherGreeting.AddClientPeer(seeder)
839 completes := make(map[int]bool, 3)
842 // started := time.Now()
844 case _v := <-psc.Values:
845 // log.Print(time.Since(started))
846 v := _v.(PieceStateChange)
847 completes[v.Index] = v.Complete
848 case <-time.After(100 * time.Millisecond):
853 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
855 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
860 func TestTorrentDownloadAll(t *testing.T) {
861 testDownloadCancel(t, testDownloadCancelParams{})
864 func TestTorrentDownloadAllThenCancel(t *testing.T) {
865 testDownloadCancel(t, testDownloadCancelParams{
870 // Ensure that it's an error for a peer to send an invalid have message.
871 func TestPeerInvalidHave(t *testing.T) {
872 cl, err := NewClient(TestingConfig())
873 require.NoError(t, err)
875 info := metainfo.Info{
877 Pieces: make([]byte, 20),
878 Files: []metainfo.FileInfo{{Length: 1}},
880 infoBytes, err := bencode.Marshal(info)
881 require.NoError(t, err)
882 tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
883 InfoBytes: infoBytes,
884 InfoHash: metainfo.HashBytes(infoBytes),
885 Storage: badStorage{},
887 require.NoError(t, err)
893 assert.NoError(t, cn.peerSentHave(0))
894 assert.Error(t, cn.peerSentHave(1))
897 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
898 greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
899 defer os.RemoveAll(greetingTempDir)
900 cfg := TestingConfig()
901 cfg.DataDir = greetingTempDir
902 seeder, err := NewClient(TestingConfig())
903 require.NoError(t, err)
904 seeder.AddTorrentSpec(&TorrentSpec{
905 InfoBytes: greetingMetainfo.InfoBytes,
909 // Check that when the listen port is 0, all the protocols listened on have
910 // the same port, and it isn't zero.
911 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
912 cl, err := NewClient(TestingConfig())
913 require.NoError(t, err)
915 port := cl.LocalPort()
916 assert.NotEqual(t, 0, port)
917 cl.eachListener(func(s socket) bool {
918 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
923 func TestClientDynamicListenTCPOnly(t *testing.T) {
924 cfg := TestingConfig()
925 cfg.DisableUTP = true
926 cl, err := NewClient(cfg)
927 require.NoError(t, err)
929 assert.NotEqual(t, 0, cl.LocalPort())
930 cl.eachListener(func(s socket) bool {
931 assert.True(t, isTcpNetwork(s.Addr().Network()))
936 func TestClientDynamicListenUTPOnly(t *testing.T) {
937 cfg := TestingConfig()
938 cfg.DisableTCP = true
939 cl, err := NewClient(cfg)
940 require.NoError(t, err)
942 assert.NotEqual(t, 0, cl.LocalPort())
943 cl.eachListener(func(s socket) bool {
944 assert.True(t, isUtpNetwork(s.Addr().Network()))
949 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
950 cfg := TestingConfig()
951 cfg.DisableTCP = true
952 cfg.DisableUTP = true
953 cl, err := NewClient(cfg)
954 require.NoError(t, err)
956 assert.Equal(t, 0, cl.LocalPort())
959 func totalConns(tts []*Torrent) (ret int) {
960 for _, tt := range tts {
968 func TestSetMaxEstablishedConn(t *testing.T) {
969 ss := testutil.NewStatusServer(t)
972 ih := testutil.GreetingMetaInfo().HashInfoBytes()
973 for i := range iter.N(3) {
974 cl, err := NewClient(TestingConfig())
975 require.NoError(t, err)
977 tt, _ := cl.AddTorrentInfoHash(ih)
978 tt.SetMaxEstablishedConns(2)
979 ss.HandleStatusWriter(cl, fmt.Sprintf("/%d", i))
980 tts = append(tts, tt)
983 for _, tt := range tts {
984 for _, _tt := range tts {
986 tt.AddClientPeer(_tt.cl)
991 waitTotalConns := func(num int) {
992 for totalConns(tts) != num {
994 time.Sleep(time.Millisecond)
999 tts[0].SetMaxEstablishedConns(1)
1001 tts[0].SetMaxEstablishedConns(0)
1003 tts[0].SetMaxEstablishedConns(1)
1006 tts[0].SetMaxEstablishedConns(2)
1011 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1012 os.MkdirAll(dir, 0770)
1013 file, err := os.Create(filepath.Join(dir, name))
1014 require.NoError(t, err)
1015 file.Write([]byte(name))
1017 mi := metainfo.MetaInfo{}
1019 info := metainfo.Info{PieceLength: 256 * 1024}
1020 err = info.BuildFromFilePath(filepath.Join(dir, name))
1021 require.NoError(t, err)
1022 mi.InfoBytes, err = bencode.Marshal(info)
1023 require.NoError(t, err)
1024 magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1025 tr, err := cl.AddTorrent(&mi)
1026 require.NoError(t, err)
1027 require.True(t, tr.Seeding())
1032 // https://github.com/anacrolix/torrent/issues/114
1033 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1034 cfg := TestingConfig()
1035 cfg.DisableUTP = true
1037 cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1038 cfg.ForceEncryption = true
1039 os.Mkdir(cfg.DataDir, 0755)
1040 server, err := NewClient(cfg)
1041 require.NoError(t, err)
1042 defer server.Close()
1043 testutil.ExportStatusWriter(server, "s")
1044 magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1045 makeMagnet(t, server, cfg.DataDir, "test2")
1046 cfg = TestingConfig()
1047 cfg.DisableUTP = true
1048 cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1049 cfg.ForceEncryption = true
1050 client, err := NewClient(cfg)
1051 require.NoError(t, err)
1052 defer client.Close()
1053 testutil.ExportStatusWriter(client, "c")
1054 tr, err := client.AddMagnet(magnet1)
1055 require.NoError(t, err)
1056 tr.AddClientPeer(server)
1062 func TestClientAddressInUse(t *testing.T) {
1063 s, _ := NewUtpSocket("udp", ":50007")
1067 cfg := TestingConfig().SetListenAddr(":50007")
1068 cl, err := NewClient(cfg)
1069 require.Error(t, err)