16 "github.com/anacrolix/dht"
17 _ "github.com/anacrolix/envpprof"
18 "github.com/anacrolix/missinggo"
19 "github.com/anacrolix/missinggo/filecache"
20 "github.com/bradfitz/iter"
21 "github.com/stretchr/testify/assert"
22 "github.com/stretchr/testify/require"
23 "golang.org/x/time/rate"
25 "github.com/anacrolix/torrent/bencode"
26 "github.com/anacrolix/torrent/internal/testutil"
27 "github.com/anacrolix/torrent/iplist"
28 "github.com/anacrolix/torrent/metainfo"
29 "github.com/anacrolix/torrent/storage"
32 func TestingConfig() *Config {
34 ListenAddr: "localhost:0",
37 DisableTrackers: true,
38 NoDefaultPortForwarding: true,
43 func TestClientDefault(t *testing.T) {
44 cl, err := NewClient(TestingConfig())
45 require.NoError(t, err)
49 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
50 cfg := TestingConfig()
51 pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
52 require.NoError(t, err)
53 ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
55 cfg.DefaultStorage = ci
56 cl, err := NewClient(cfg)
57 require.NoError(t, err)
59 // And again, https://github.com/anacrolix/torrent/issues/158
60 cl, err = NewClient(cfg)
61 require.NoError(t, err)
65 func TestAddDropTorrent(t *testing.T) {
66 cl, err := NewClient(TestingConfig())
67 require.NoError(t, err)
69 dir, mi := testutil.GreetingTestTorrent()
70 defer os.RemoveAll(dir)
71 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
72 require.NoError(t, err)
74 tt.SetMaxEstablishedConns(0)
75 tt.SetMaxEstablishedConns(1)
79 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
84 func TestAddTorrentNoUsableURLs(t *testing.T) {
89 func TestAddPeersToUnknownTorrent(t *testing.T) {
94 func TestPieceHashSize(t *testing.T) {
95 assert.Equal(t, 20, pieceHash.Size())
98 func TestTorrentInitialState(t *testing.T) {
99 dir, mi := testutil.GreetingTestTorrent()
100 defer os.RemoveAll(dir)
103 tor := cl.newTorrent(
105 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
109 err := tor.setInfoBytes(mi.InfoBytes)
111 require.NoError(t, err)
112 require.Len(t, tor.pieces, 3)
113 tor.pendAllChunkSpecs(0)
115 assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
117 assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
120 func TestUnmarshalPEXMsg(t *testing.T) {
121 var m peerExchangeMessage
122 if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
125 if len(m.Added) != 2 {
128 if m.Added[0].Port != 0x506 {
133 func TestReducedDialTimeout(t *testing.T) {
136 for _, _case := range []struct {
140 ExpectedReduced time.Duration
142 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
143 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
144 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
145 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
146 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
147 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
149 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
150 expected := _case.ExpectedReduced
151 if expected < cfg.MinDialTimeout {
152 expected = cfg.MinDialTimeout
154 if reduced != expected {
155 t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
160 func TestUTPRawConn(t *testing.T) {
161 l, err := NewUtpSocket("udp", "")
162 require.NoError(t, err)
172 // Connect a UTP peer to see if the RawConn will still work.
173 s, err := NewUtpSocket("udp", "")
174 require.NoError(t, err)
176 utpPeer, err := s.DialContext(context.Background(), "", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
177 require.NoError(t, err)
178 defer utpPeer.Close()
179 peer, err := net.ListenPacket("udp", ":0")
180 require.NoError(t, err)
184 // How many messages to send. I've set this to double the channel buffer
185 // size in the raw packetConn.
187 readerStopped := make(chan struct{})
188 // The reader goroutine.
190 defer close(readerStopped)
191 b := make([]byte, 500)
192 for i := 0; i < N; i++ {
193 n, _, err := l.ReadFrom(b)
194 require.NoError(t, err)
197 fmt.Sscan(string(b[:n]), &d)
198 assert.Equal(t, i, d)
201 udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
202 require.NoError(t, err)
203 for i := 0; i < N; i++ {
204 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
205 require.NoError(t, err)
206 time.Sleep(time.Millisecond)
209 case <-readerStopped:
210 case <-time.After(time.Second):
211 t.Fatal("reader timed out")
213 if msgsReceived != N {
214 t.Fatalf("messages received: %d", msgsReceived)
218 func TestAddDropManyTorrents(t *testing.T) {
219 cl, err := NewClient(TestingConfig())
220 require.NoError(t, err)
222 for i := range iter.N(1000) {
224 binary.PutVarint(spec.InfoHash[:], int64(i))
225 tt, new, err := cl.AddTorrentSpec(&spec)
226 assert.NoError(t, err)
232 type FileCacheClientStorageFactoryParams struct {
235 Wrapper func(*filecache.Cache) storage.ClientImpl
238 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
239 return func(dataDir string) storage.ClientImpl {
240 fc, err := filecache.NewCache(dataDir)
245 fc.SetCapacity(ps.Capacity)
247 return ps.Wrapper(fc)
251 type storageFactory func(string) storage.ClientImpl
253 func TestClientTransferDefault(t *testing.T) {
254 testClientTransfer(t, testClientTransferParams{
255 ExportClientStatus: true,
256 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
257 Wrapper: fileCachePieceResourceStorage,
262 func TestClientTransferRateLimitedUpload(t *testing.T) {
263 started := time.Now()
264 testClientTransfer(t, testClientTransferParams{
265 // We are uploading 13 bytes (the length of the greeting torrent). The
266 // chunks are 2 bytes in length. Then the smallest burst we can run
267 // with is 2. Time taken is (13-burst)/rate.
268 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
269 ExportClientStatus: true,
271 require.True(t, time.Since(started) > time.Second)
274 func TestClientTransferRateLimitedDownload(t *testing.T) {
275 testClientTransfer(t, testClientTransferParams{
276 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
280 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
281 return storage.NewResourcePieces(fc.AsResourceProvider())
284 func TestClientTransferSmallCache(t *testing.T) {
285 testClientTransfer(t, testClientTransferParams{
286 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
288 // Going below the piece length means it can't complete a piece so
289 // that it can be hashed.
291 Wrapper: fileCachePieceResourceStorage,
294 // Can't readahead too far or the cache will thrash and drop data we
297 ExportClientStatus: true,
301 func TestClientTransferVarious(t *testing.T) {
303 for _, ls := range []storageFactory{
304 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
305 Wrapper: fileCachePieceResourceStorage,
310 for _, ss := range []func(string) storage.ClientImpl{
314 for _, responsive := range []bool{false, true} {
315 testClientTransfer(t, testClientTransferParams{
316 Responsive: responsive,
320 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
321 testClientTransfer(t, testClientTransferParams{
323 Responsive: responsive,
325 Readahead: readahead,
334 type testClientTransferParams struct {
338 ExportClientStatus bool
339 LeecherStorage func(string) storage.ClientImpl
340 SeederStorage func(string) storage.ClientImpl
341 SeederUploadRateLimiter *rate.Limiter
342 LeecherDownloadRateLimiter *rate.Limiter
345 // Creates a seeder and a leecher, and ensures the data transfers when a read
346 // is attempted on the leecher.
347 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
348 greetingTempDir, mi := testutil.GreetingTestTorrent()
349 defer os.RemoveAll(greetingTempDir)
350 // Create seeder and a Torrent.
351 cfg := TestingConfig()
353 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
354 // cfg.ListenAddr = "localhost:4000"
355 if ps.SeederStorage != nil {
356 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
357 defer cfg.DefaultStorage.Close()
359 cfg.DataDir = greetingTempDir
361 seeder, err := NewClient(cfg)
362 require.NoError(t, err)
363 if ps.ExportClientStatus {
364 testutil.ExportStatusWriter(seeder, "s")
366 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
367 // Run a Stats right after Closing the Client. This will trigger the Stats
368 // panic in #214 caused by RemoteAddr on Closed uTP sockets.
369 defer seederTorrent.Stats()
371 seederTorrent.VerifyData()
372 // Create leecher and a Torrent.
373 leecherDataDir, err := ioutil.TempDir("", "")
374 require.NoError(t, err)
375 defer os.RemoveAll(leecherDataDir)
376 if ps.LeecherStorage == nil {
377 cfg.DataDir = leecherDataDir
379 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
381 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
383 leecher, err := NewClient(cfg)
384 require.NoError(t, err)
385 defer leecher.Close()
386 if ps.ExportClientStatus {
387 testutil.ExportStatusWriter(leecher, "l")
389 leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
390 ret = TorrentSpecFromMetaInfo(mi)
394 require.NoError(t, err)
396 // Now do some things with leecher and seeder.
397 leecherTorrent.AddClientPeer(seeder)
398 // The Torrent should not be interested in obtaining peers, so the one we
399 // just added should be the only one.
400 assert.False(t, leecherTorrent.Seeding())
401 assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
402 r := leecherTorrent.NewReader()
408 r.SetReadahead(ps.Readahead)
410 assertReadAllGreeting(t, r)
411 assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData)
412 assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten)
413 assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData)
414 assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead)
415 // Try reading through again for the cases where the torrent data size
416 // exceeds the size of the cache.
417 assertReadAllGreeting(t, r)
420 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
421 pos, err := r.Seek(0, io.SeekStart)
422 assert.NoError(t, err)
423 assert.EqualValues(t, 0, pos)
424 _greeting, err := ioutil.ReadAll(r)
425 assert.NoError(t, err)
426 assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
429 // Check that after completing leeching, a leecher transitions to a seeding
430 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
431 func TestSeedAfterDownloading(t *testing.T) {
432 greetingTempDir, mi := testutil.GreetingTestTorrent()
433 defer os.RemoveAll(greetingTempDir)
434 cfg := TestingConfig()
436 cfg.DataDir = greetingTempDir
437 seeder, err := NewClient(cfg)
438 require.NoError(t, err)
440 testutil.ExportStatusWriter(seeder, "s")
441 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
442 seederTorrent.VerifyData()
443 cfg.DataDir, err = ioutil.TempDir("", "")
444 require.NoError(t, err)
445 defer os.RemoveAll(cfg.DataDir)
446 leecher, err := NewClient(cfg)
447 require.NoError(t, err)
448 defer leecher.Close()
449 testutil.ExportStatusWriter(leecher, "l")
451 cfg.DataDir, err = ioutil.TempDir("", "")
452 require.NoError(t, err)
453 defer os.RemoveAll(cfg.DataDir)
454 leecherLeecher, _ := NewClient(cfg)
455 require.NoError(t, err)
456 defer leecherLeecher.Close()
457 testutil.ExportStatusWriter(leecherLeecher, "ll")
458 leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
459 ret = TorrentSpecFromMetaInfo(mi)
463 llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
464 ret = TorrentSpecFromMetaInfo(mi)
468 // Simultaneously DownloadAll in Leecher, and read the contents
469 // consecutively in LeecherLeecher. This non-deterministically triggered a
470 // case where the leecher wouldn't unchoke the LeecherLeecher.
471 var wg sync.WaitGroup
477 b, err := ioutil.ReadAll(r)
478 require.NoError(t, err)
479 assert.EqualValues(t, testutil.GreetingFileContents, b)
481 leecherGreeting.AddClientPeer(seeder)
482 leecherGreeting.AddClientPeer(leecherLeecher)
486 leecherGreeting.DownloadAll()
492 func TestMergingTrackersByAddingSpecs(t *testing.T) {
493 cl, err := NewClient(TestingConfig())
494 require.NoError(t, err)
496 spec := TorrentSpec{}
497 T, new, _ := cl.AddTorrentSpec(&spec)
501 spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
502 _, new, _ = cl.AddTorrentSpec(&spec)
504 assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
505 // Because trackers are disabled in TestingConfig.
506 assert.EqualValues(t, 0, len(T.trackerAnnouncers))
509 // We read from a piece which is marked completed, but is missing data.
510 func TestCompletedPieceWrongSize(t *testing.T) {
511 cfg := TestingConfig()
512 cfg.DefaultStorage = badStorage{}
513 cl, err := NewClient(cfg)
514 require.NoError(t, err)
516 info := metainfo.Info{
518 Pieces: make([]byte, 20),
519 Files: []metainfo.FileInfo{
520 {Path: []string{"greeting"}, Length: 13},
523 b, err := bencode.Marshal(info)
524 require.NoError(t, err)
525 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
527 InfoHash: metainfo.HashBytes(b),
529 require.NoError(t, err)
534 b, err = ioutil.ReadAll(r)
536 assert.NoError(t, err)
539 func BenchmarkAddLargeTorrent(b *testing.B) {
540 cfg := TestingConfig()
541 cfg.DisableTCP = true
542 cfg.DisableUTP = true
543 cfg.ListenAddr = "redonk"
544 cl, err := NewClient(cfg)
545 require.NoError(b, err)
547 for range iter.N(b.N) {
548 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
556 func TestResponsive(t *testing.T) {
557 seederDataDir, mi := testutil.GreetingTestTorrent()
558 defer os.RemoveAll(seederDataDir)
559 cfg := TestingConfig()
561 cfg.DataDir = seederDataDir
562 seeder, err := NewClient(cfg)
565 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
566 seederTorrent.VerifyData()
567 leecherDataDir, err := ioutil.TempDir("", "")
569 defer os.RemoveAll(leecherDataDir)
570 cfg = TestingConfig()
571 cfg.DataDir = leecherDataDir
572 leecher, err := NewClient(cfg)
574 defer leecher.Close()
575 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
576 ret = TorrentSpecFromMetaInfo(mi)
580 leecherTorrent.AddClientPeer(seeder)
581 reader := leecherTorrent.NewReader()
583 reader.SetReadahead(0)
584 reader.SetResponsive()
586 _, err = reader.Seek(3, io.SeekStart)
587 require.NoError(t, err)
588 _, err = io.ReadFull(reader, b)
590 assert.EqualValues(t, "lo", string(b))
591 _, err = reader.Seek(11, io.SeekStart)
592 require.NoError(t, err)
593 n, err := io.ReadFull(reader, b)
595 assert.EqualValues(t, 2, n)
596 assert.EqualValues(t, "d\n", string(b))
599 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
600 seederDataDir, mi := testutil.GreetingTestTorrent()
601 defer os.RemoveAll(seederDataDir)
602 cfg := TestingConfig()
604 cfg.DataDir = seederDataDir
605 seeder, err := NewClient(cfg)
608 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
609 seederTorrent.VerifyData()
610 leecherDataDir, err := ioutil.TempDir("", "")
612 defer os.RemoveAll(leecherDataDir)
613 cfg = TestingConfig()
614 cfg.DataDir = leecherDataDir
615 leecher, err := NewClient(cfg)
617 defer leecher.Close()
618 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
619 ret = TorrentSpecFromMetaInfo(mi)
623 leecherTorrent.AddClientPeer(seeder)
624 reader := leecherTorrent.NewReader()
626 reader.SetReadahead(0)
627 reader.SetResponsive()
629 _, err = reader.Seek(3, io.SeekStart)
630 require.NoError(t, err)
631 _, err = io.ReadFull(reader, b)
633 assert.EqualValues(t, "lo", string(b))
634 go leecherTorrent.Drop()
635 _, err = reader.Seek(11, io.SeekStart)
636 require.NoError(t, err)
637 n, err := reader.Read(b)
638 assert.EqualError(t, err, "torrent closed")
639 assert.EqualValues(t, 0, n)
642 func TestDHTInheritBlocklist(t *testing.T) {
643 ipl := iplist.New(nil)
644 require.NotNil(t, ipl)
645 cfg := TestingConfig()
646 cfg.IPBlocklist = ipl
648 cl, err := NewClient(cfg)
649 require.NoError(t, err)
652 cl.eachDhtServer(func(s *dht.Server) {
653 assert.Equal(t, ipl, s.IPBlocklist())
656 assert.EqualValues(t, 2, numServers)
659 // Check that stuff is merged in subsequent AddTorrentSpec for the same
661 func TestAddTorrentSpecMerging(t *testing.T) {
662 cl, err := NewClient(TestingConfig())
663 require.NoError(t, err)
665 dir, mi := testutil.GreetingTestTorrent()
666 defer os.RemoveAll(dir)
667 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
668 InfoHash: mi.HashInfoBytes(),
670 require.NoError(t, err)
672 require.Nil(t, tt.Info())
673 _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
674 require.NoError(t, err)
675 require.False(t, new)
676 require.NotNil(t, tt.Info())
679 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
680 dir, mi := testutil.GreetingTestTorrent()
682 cl, _ := NewClient(TestingConfig())
684 tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
685 InfoHash: mi.HashInfoBytes(),
688 assert.EqualValues(t, 0, len(cl.Torrents()))
696 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
697 for i := range iter.N(info.NumPieces()) {
699 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
703 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
704 fileCacheDir, err := ioutil.TempDir("", "")
705 require.NoError(t, err)
706 defer os.RemoveAll(fileCacheDir)
707 fileCache, err := filecache.NewCache(fileCacheDir)
708 require.NoError(t, err)
709 greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
710 defer os.RemoveAll(greetingDataTempDir)
711 filePieceStore := csf(fileCache)
712 defer filePieceStore.Close()
713 info, err := greetingMetainfo.UnmarshalInfo()
714 require.NoError(t, err)
715 ih := greetingMetainfo.HashInfoBytes()
716 greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
717 require.NoError(t, err)
718 writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
719 // require.Equal(t, len(testutil.GreetingFileContents), written)
720 // require.NoError(t, err)
721 for i := 0; i < info.NumPieces(); i++ {
723 if alreadyCompleted {
724 require.NoError(t, greetingData.Piece(p).MarkComplete())
727 cfg := TestingConfig()
728 // TODO: Disable network option?
729 cfg.DisableTCP = true
730 cfg.DisableUTP = true
731 cfg.DefaultStorage = filePieceStore
732 cl, err := NewClient(cfg)
733 require.NoError(t, err)
735 tt, err := cl.AddTorrent(greetingMetainfo)
736 require.NoError(t, err)
737 psrs := tt.PieceStateRuns()
738 assert.Len(t, psrs, 1)
739 assert.EqualValues(t, 3, psrs[0].Length)
740 assert.Equal(t, alreadyCompleted, psrs[0].Complete)
741 if alreadyCompleted {
743 b, err := ioutil.ReadAll(r)
744 assert.NoError(t, err)
745 assert.EqualValues(t, testutil.GreetingFileContents, b)
749 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
750 testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
753 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
754 testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
757 func TestAddMetainfoWithNodes(t *testing.T) {
758 cfg := TestingConfig()
759 cfg.ListenAddr = ":0"
761 cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
762 // For now, we want to just jam the nodes into the table, without
763 // verifying them first. Also the DHT code doesn't support mixing secure
764 // and insecure nodes if security is enabled (yet).
765 // cfg.DHTConfig.NoSecurity = true
766 cl, err := NewClient(cfg)
767 require.NoError(t, err)
769 sum := func() (ret int) {
770 cl.eachDhtServer(func(s *dht.Server) {
772 ret += s.Stats().OutstandingTransactions
776 assert.EqualValues(t, 0, sum())
777 tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
778 require.NoError(t, err)
779 // Nodes are not added or exposed in Torrent's metainfo. We just randomly
780 // check if the announce-list is here instead. TODO: Add nodes.
781 assert.Len(t, tt.metainfo.AnnounceList, 5)
782 // There are 6 nodes in the torrent file.
783 assert.EqualValues(t, 6*len(cl.dhtServers), sum())
786 type testDownloadCancelParams struct {
787 ExportClientStatus bool
788 SetLeecherStorageCapacity bool
789 LeecherStorageCapacity int64
793 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
794 greetingTempDir, mi := testutil.GreetingTestTorrent()
795 defer os.RemoveAll(greetingTempDir)
796 cfg := TestingConfig()
798 cfg.DataDir = greetingTempDir
799 seeder, err := NewClient(cfg)
800 require.NoError(t, err)
802 if ps.ExportClientStatus {
803 testutil.ExportStatusWriter(seeder, "s")
805 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
806 seederTorrent.VerifyData()
807 leecherDataDir, err := ioutil.TempDir("", "")
808 require.NoError(t, err)
809 defer os.RemoveAll(leecherDataDir)
810 fc, err := filecache.NewCache(leecherDataDir)
811 require.NoError(t, err)
812 if ps.SetLeecherStorageCapacity {
813 fc.SetCapacity(ps.LeecherStorageCapacity)
815 cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
816 cfg.DataDir = leecherDataDir
817 leecher, _ := NewClient(cfg)
818 defer leecher.Close()
819 if ps.ExportClientStatus {
820 testutil.ExportStatusWriter(leecher, "l")
822 leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
823 ret = TorrentSpecFromMetaInfo(mi)
827 require.NoError(t, err)
829 psc := leecherGreeting.SubscribePieceStateChanges()
832 leecherGreeting.cl.mu.Lock()
833 leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
835 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
837 leecherGreeting.cl.mu.Unlock()
839 leecherGreeting.AddClientPeer(seeder)
840 completes := make(map[int]bool, 3)
843 // started := time.Now()
845 case _v := <-psc.Values:
846 // log.Print(time.Since(started))
847 v := _v.(PieceStateChange)
848 completes[v.Index] = v.Complete
849 case <-time.After(100 * time.Millisecond):
854 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
856 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
861 func TestTorrentDownloadAll(t *testing.T) {
862 testDownloadCancel(t, testDownloadCancelParams{})
865 func TestTorrentDownloadAllThenCancel(t *testing.T) {
866 testDownloadCancel(t, testDownloadCancelParams{
871 // Ensure that it's an error for a peer to send an invalid have message.
872 func TestPeerInvalidHave(t *testing.T) {
873 cl, err := NewClient(TestingConfig())
874 require.NoError(t, err)
876 info := metainfo.Info{
878 Pieces: make([]byte, 20),
879 Files: []metainfo.FileInfo{{Length: 1}},
881 infoBytes, err := bencode.Marshal(info)
882 require.NoError(t, err)
883 tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
884 InfoBytes: infoBytes,
885 InfoHash: metainfo.HashBytes(infoBytes),
886 Storage: badStorage{},
888 require.NoError(t, err)
894 assert.NoError(t, cn.peerSentHave(0))
895 assert.Error(t, cn.peerSentHave(1))
898 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
899 greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
900 defer os.RemoveAll(greetingTempDir)
901 cfg := TestingConfig()
902 cfg.DataDir = greetingTempDir
903 seeder, err := NewClient(TestingConfig())
904 require.NoError(t, err)
905 seeder.AddTorrentSpec(&TorrentSpec{
906 InfoBytes: greetingMetainfo.InfoBytes,
910 // Check that when the listen port is 0, all the protocols listened on have
911 // the same port, and it isn't zero.
912 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
913 cl, err := NewClient(TestingConfig())
914 require.NoError(t, err)
916 port := cl.LocalPort()
917 assert.NotEqual(t, 0, port)
918 cl.eachListener(func(s socket) bool {
919 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
924 func TestClientDynamicListenTCPOnly(t *testing.T) {
925 cfg := TestingConfig()
926 cfg.DisableUTP = true
927 cl, err := NewClient(cfg)
928 require.NoError(t, err)
930 assert.NotEqual(t, 0, cl.LocalPort())
931 cl.eachListener(func(s socket) bool {
932 assert.True(t, isTcpNetwork(s.Addr().Network()))
937 func TestClientDynamicListenUTPOnly(t *testing.T) {
938 cfg := TestingConfig()
939 cfg.DisableTCP = true
940 cl, err := NewClient(cfg)
941 require.NoError(t, err)
943 assert.NotEqual(t, 0, cl.LocalPort())
944 cl.eachListener(func(s socket) bool {
945 assert.True(t, isUtpNetwork(s.Addr().Network()))
950 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
951 cfg := TestingConfig()
952 cfg.DisableTCP = true
953 cfg.DisableUTP = true
954 cl, err := NewClient(cfg)
955 require.NoError(t, err)
957 assert.Equal(t, 0, cl.LocalPort())
960 func totalConns(tts []*Torrent) (ret int) {
961 for _, tt := range tts {
969 func TestSetMaxEstablishedConn(t *testing.T) {
970 ss := testutil.NewStatusServer(t)
973 ih := testutil.GreetingMetaInfo().HashInfoBytes()
974 for i := range iter.N(3) {
975 cl, err := NewClient(TestingConfig())
976 require.NoError(t, err)
978 tt, _ := cl.AddTorrentInfoHash(ih)
979 tt.SetMaxEstablishedConns(2)
980 ss.HandleStatusWriter(cl, fmt.Sprintf("/%d", i))
981 tts = append(tts, tt)
984 for _, tt := range tts {
985 for _, _tt := range tts {
987 tt.AddClientPeer(_tt.cl)
992 waitTotalConns := func(num int) {
993 for totalConns(tts) != num {
995 time.Sleep(time.Millisecond)
1000 tts[0].SetMaxEstablishedConns(1)
1002 tts[0].SetMaxEstablishedConns(0)
1004 tts[0].SetMaxEstablishedConns(1)
1007 tts[0].SetMaxEstablishedConns(2)
1012 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1013 os.MkdirAll(dir, 0770)
1014 file, err := os.Create(filepath.Join(dir, name))
1015 require.NoError(t, err)
1016 file.Write([]byte(name))
1018 mi := metainfo.MetaInfo{}
1020 info := metainfo.Info{PieceLength: 256 * 1024}
1021 err = info.BuildFromFilePath(filepath.Join(dir, name))
1022 require.NoError(t, err)
1023 mi.InfoBytes, err = bencode.Marshal(info)
1024 require.NoError(t, err)
1025 magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1026 tr, err := cl.AddTorrent(&mi)
1027 require.NoError(t, err)
1028 require.True(t, tr.Seeding())
1033 // https://github.com/anacrolix/torrent/issues/114
1034 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1035 cfg := TestingConfig()
1036 cfg.DisableUTP = true
1038 cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1039 cfg.ForceEncryption = true
1040 os.Mkdir(cfg.DataDir, 0755)
1041 server, err := NewClient(cfg)
1042 require.NoError(t, err)
1043 defer server.Close()
1044 testutil.ExportStatusWriter(server, "s")
1045 magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1046 makeMagnet(t, server, cfg.DataDir, "test2")
1047 cfg = TestingConfig()
1048 cfg.DisableUTP = true
1049 cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1050 cfg.ForceEncryption = true
1051 client, err := NewClient(cfg)
1052 require.NoError(t, err)
1053 defer client.Close()
1054 testutil.ExportStatusWriter(client, "c")
1055 tr, err := client.AddMagnet(magnet1)
1056 require.NoError(t, err)
1057 tr.AddClientPeer(server)
1063 func TestClientAddressInUse(t *testing.T) {
1064 s, _ := NewUtpSocket("udp", ":50007")
1068 cfg := TestingConfig()
1069 cfg.ListenAddr = ":50007"
1070 cl, err := NewClient(cfg)
1071 require.Error(t, err)