18 _ "github.com/anacrolix/envpprof"
19 "github.com/anacrolix/missinggo"
20 "github.com/anacrolix/missinggo/filecache"
21 "github.com/anacrolix/missinggo/pubsub"
22 "github.com/bradfitz/iter"
23 "github.com/stretchr/testify/assert"
24 "github.com/stretchr/testify/require"
25 "golang.org/x/time/rate"
27 "github.com/anacrolix/torrent/bencode"
28 "github.com/anacrolix/torrent/internal/testutil"
29 "github.com/anacrolix/torrent/iplist"
30 "github.com/anacrolix/torrent/metainfo"
31 "github.com/anacrolix/torrent/storage"
34 func TestingConfig() *Config {
36 ListenAddr: "localhost:0",
39 DisableTrackers: true,
40 NoDefaultPortForwarding: true,
45 func TestClientDefault(t *testing.T) {
46 cl, err := NewClient(TestingConfig())
47 require.NoError(t, err)
51 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
52 cfg := TestingConfig()
53 pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
54 require.NoError(t, err)
55 ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
57 cfg.DefaultStorage = ci
58 cl, err := NewClient(cfg)
59 require.NoError(t, err)
61 // And again, https://github.com/anacrolix/torrent/issues/158
62 cl, err = NewClient(cfg)
63 require.NoError(t, err)
67 func TestAddDropTorrent(t *testing.T) {
68 cl, err := NewClient(TestingConfig())
69 require.NoError(t, err)
71 dir, mi := testutil.GreetingTestTorrent()
72 defer os.RemoveAll(dir)
73 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
74 require.NoError(t, err)
76 tt.SetMaxEstablishedConns(0)
77 tt.SetMaxEstablishedConns(1)
81 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
86 func TestAddTorrentNoUsableURLs(t *testing.T) {
91 func TestAddPeersToUnknownTorrent(t *testing.T) {
96 func TestPieceHashSize(t *testing.T) {
97 assert.Equal(t, 20, pieceHash.Size())
100 func TestTorrentInitialState(t *testing.T) {
101 dir, mi := testutil.GreetingTestTorrent()
102 defer os.RemoveAll(dir)
104 infoHash: mi.HashInfoBytes(),
105 pieceStateChanges: pubsub.NewPubSub(),
108 tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()))
109 // Needed to lock for asynchronous piece verification.
112 err := tor.setInfoBytes(mi.InfoBytes)
114 require.NoError(t, err)
115 require.Len(t, tor.pieces, 3)
116 tor.pendAllChunkSpecs(0)
118 assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
120 assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
123 func TestUnmarshalPEXMsg(t *testing.T) {
124 var m peerExchangeMessage
125 if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
128 if len(m.Added) != 2 {
131 if m.Added[0].Port != 0x506 {
136 func TestReducedDialTimeout(t *testing.T) {
139 for _, _case := range []struct {
143 ExpectedReduced time.Duration
145 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
146 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
147 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
148 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
149 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
150 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
152 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
153 expected := _case.ExpectedReduced
154 if expected < cfg.MinDialTimeout {
155 expected = cfg.MinDialTimeout
157 if reduced != expected {
158 t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
163 func TestUTPRawConn(t *testing.T) {
164 l, err := NewUtpSocket("udp", "")
165 require.NoError(t, err)
175 // Connect a UTP peer to see if the RawConn will still work.
176 s, err := NewUtpSocket("udp", "")
177 require.NoError(t, err)
179 utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
180 require.NoError(t, err)
181 defer utpPeer.Close()
182 peer, err := net.ListenPacket("udp", ":0")
183 require.NoError(t, err)
187 // How many messages to send. I've set this to double the channel buffer
188 // size in the raw packetConn.
190 readerStopped := make(chan struct{})
191 // The reader goroutine.
193 defer close(readerStopped)
194 b := make([]byte, 500)
195 for i := 0; i < N; i++ {
196 n, _, err := l.ReadFrom(b)
197 require.NoError(t, err)
200 fmt.Sscan(string(b[:n]), &d)
201 assert.Equal(t, i, d)
204 udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
205 require.NoError(t, err)
206 for i := 0; i < N; i++ {
207 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
208 require.NoError(t, err)
209 time.Sleep(time.Millisecond)
212 case <-readerStopped:
213 case <-time.After(time.Second):
214 t.Fatal("reader timed out")
216 if msgsReceived != N {
217 t.Fatalf("messages received: %d", msgsReceived)
221 func TestTwoClientsArbitraryPorts(t *testing.T) {
222 for i := 0; i < 2; i++ {
223 cl, err := NewClient(TestingConfig())
231 func TestAddDropManyTorrents(t *testing.T) {
232 cl, err := NewClient(TestingConfig())
233 require.NoError(t, err)
235 for i := range iter.N(1000) {
237 binary.PutVarint(spec.InfoHash[:], int64(i))
238 tt, new, err := cl.AddTorrentSpec(&spec)
239 assert.NoError(t, err)
245 type FileCacheClientStorageFactoryParams struct {
248 Wrapper func(*filecache.Cache) storage.ClientImpl
251 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
252 return func(dataDir string) storage.ClientImpl {
253 fc, err := filecache.NewCache(dataDir)
258 fc.SetCapacity(ps.Capacity)
260 return ps.Wrapper(fc)
264 type storageFactory func(string) storage.ClientImpl
266 func TestClientTransferDefault(t *testing.T) {
267 testClientTransfer(t, testClientTransferParams{
268 ExportClientStatus: true,
269 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
270 Wrapper: fileCachePieceResourceStorage,
275 func TestClientTransferRateLimitedUpload(t *testing.T) {
276 started := time.Now()
277 testClientTransfer(t, testClientTransferParams{
278 // We are uploading 13 bytes (the length of the greeting torrent). The
279 // chunks are 2 bytes in length. Then the smallest burst we can run
280 // with is 2. Time taken is (13-burst)/rate.
281 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
283 require.True(t, time.Since(started) > time.Second)
286 func TestClientTransferRateLimitedDownload(t *testing.T) {
287 testClientTransfer(t, testClientTransferParams{
288 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
292 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
293 return storage.NewResourcePieces(fc.AsResourceProvider())
296 func TestClientTransferSmallCache(t *testing.T) {
297 testClientTransfer(t, testClientTransferParams{
298 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
300 // Going below the piece length means it can't complete a piece so
301 // that it can be hashed.
303 Wrapper: fileCachePieceResourceStorage,
306 // Can't readahead too far or the cache will thrash and drop data we
309 ExportClientStatus: true,
313 func TestClientTransferVarious(t *testing.T) {
315 for _, ls := range []storageFactory{
316 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
317 Wrapper: fileCachePieceResourceStorage,
322 for _, ss := range []func(string) storage.ClientImpl{
326 for _, responsive := range []bool{false, true} {
327 testClientTransfer(t, testClientTransferParams{
328 Responsive: responsive,
332 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
333 testClientTransfer(t, testClientTransferParams{
335 Responsive: responsive,
337 Readahead: readahead,
346 type testClientTransferParams struct {
350 ExportClientStatus bool
351 LeecherStorage func(string) storage.ClientImpl
352 SeederStorage func(string) storage.ClientImpl
353 SeederUploadRateLimiter *rate.Limiter
354 LeecherDownloadRateLimiter *rate.Limiter
357 // Creates a seeder and a leecher, and ensures the data transfers when a read
358 // is attempted on the leecher.
359 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
360 greetingTempDir, mi := testutil.GreetingTestTorrent()
361 defer os.RemoveAll(greetingTempDir)
362 // Create seeder and a Torrent.
363 cfg := TestingConfig()
365 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
366 // cfg.ListenAddr = "localhost:4000"
367 if ps.SeederStorage != nil {
368 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
369 defer cfg.DefaultStorage.Close()
371 cfg.DataDir = greetingTempDir
373 seeder, err := NewClient(cfg)
374 require.NoError(t, err)
375 if ps.ExportClientStatus {
376 testutil.ExportStatusWriter(seeder, "s")
378 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
379 // Run a Stats right after Closing the Client. This will trigger the Stats
380 // panic in #214 caused by RemoteAddr on Closed uTP sockets.
381 defer seederTorrent.Stats()
383 seederTorrent.VerifyData()
384 // Create leecher and a Torrent.
385 leecherDataDir, err := ioutil.TempDir("", "")
386 require.NoError(t, err)
387 defer os.RemoveAll(leecherDataDir)
388 if ps.LeecherStorage == nil {
389 cfg.DataDir = leecherDataDir
391 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
393 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
394 // cfg.ListenAddr = "localhost:4001"
395 leecher, err := NewClient(cfg)
396 require.NoError(t, err)
397 defer leecher.Close()
398 if ps.ExportClientStatus {
399 testutil.ExportStatusWriter(leecher, "l")
401 leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
402 ret = TorrentSpecFromMetaInfo(mi)
406 require.NoError(t, err)
408 // Now do some things with leecher and seeder.
409 addClientPeer(leecherGreeting, seeder)
410 r := leecherGreeting.NewReader()
416 r.SetReadahead(ps.Readahead)
418 assertReadAllGreeting(t, r)
419 // After one read through, we can assume certain torrent statistics.
420 // These are not a strict requirement. It is however interesting to
422 // t.Logf("%#v", seederTorrent.Stats())
423 // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
424 // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
425 // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
426 // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
427 // Read through again for the cases where the torrent data size exceeds
428 // the size of the cache.
429 assertReadAllGreeting(t, r)
432 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
433 pos, err := r.Seek(0, io.SeekStart)
434 assert.NoError(t, err)
435 assert.EqualValues(t, 0, pos)
436 _greeting, err := ioutil.ReadAll(r)
437 assert.NoError(t, err)
438 assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
441 // Check that after completing leeching, a leecher transitions to a seeding
442 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
443 func TestSeedAfterDownloading(t *testing.T) {
444 greetingTempDir, mi := testutil.GreetingTestTorrent()
445 defer os.RemoveAll(greetingTempDir)
446 cfg := TestingConfig()
448 cfg.DataDir = greetingTempDir
449 seeder, err := NewClient(cfg)
450 require.NoError(t, err)
452 testutil.ExportStatusWriter(seeder, "s")
453 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
454 seederTorrent.VerifyData()
455 cfg.DataDir, err = ioutil.TempDir("", "")
456 require.NoError(t, err)
457 defer os.RemoveAll(cfg.DataDir)
458 leecher, err := NewClient(cfg)
459 require.NoError(t, err)
460 defer leecher.Close()
461 testutil.ExportStatusWriter(leecher, "l")
463 cfg.DataDir, err = ioutil.TempDir("", "")
464 require.NoError(t, err)
465 defer os.RemoveAll(cfg.DataDir)
466 leecherLeecher, _ := NewClient(cfg)
467 defer leecherLeecher.Close()
468 testutil.ExportStatusWriter(leecherLeecher, "ll")
469 leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
470 ret = TorrentSpecFromMetaInfo(mi)
474 llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
475 ret = TorrentSpecFromMetaInfo(mi)
479 // Simultaneously DownloadAll in Leecher, and read the contents
480 // consecutively in LeecherLeecher. This non-deterministically triggered a
481 // case where the leecher wouldn't unchoke the LeecherLeecher.
482 var wg sync.WaitGroup
488 b, err := ioutil.ReadAll(r)
489 require.NoError(t, err)
490 assert.EqualValues(t, testutil.GreetingFileContents, b)
492 addClientPeer(leecherGreeting, seeder)
493 addClientPeer(leecherGreeting, leecherLeecher)
497 leecherGreeting.DownloadAll()
503 func TestMergingTrackersByAddingSpecs(t *testing.T) {
504 cl, err := NewClient(TestingConfig())
505 require.NoError(t, err)
507 spec := TorrentSpec{}
508 T, new, _ := cl.AddTorrentSpec(&spec)
512 spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
513 _, new, _ = cl.AddTorrentSpec(&spec)
515 assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
516 // Because trackers are disabled in TestingConfig.
517 assert.EqualValues(t, 0, len(T.trackerAnnouncers))
520 type badStorage struct{}
522 var _ storage.ClientImpl = badStorage{}
524 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
528 func (bs badStorage) Close() error {
532 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
533 return badStoragePiece{p}
536 type badStoragePiece struct {
540 var _ storage.PieceImpl = badStoragePiece{}
542 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
546 func (p badStoragePiece) Completion() storage.Completion {
547 return storage.Completion{Complete: true, Ok: true}
550 func (p badStoragePiece) MarkComplete() error {
551 return errors.New("psyyyyyyyche")
554 func (p badStoragePiece) MarkNotComplete() error {
555 return errors.New("psyyyyyyyche")
558 func (p badStoragePiece) randomlyTruncatedDataString() string {
559 return "hello, world\n"[:rand.Intn(14)]
562 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
563 r := strings.NewReader(p.randomlyTruncatedDataString())
564 return r.ReadAt(b, off+p.p.Offset())
567 // We read from a piece which is marked completed, but is missing data.
568 func TestCompletedPieceWrongSize(t *testing.T) {
569 cfg := TestingConfig()
570 cfg.DefaultStorage = badStorage{}
571 cl, err := NewClient(cfg)
572 require.NoError(t, err)
574 info := metainfo.Info{
576 Pieces: make([]byte, 20),
577 Files: []metainfo.FileInfo{
578 {Path: []string{"greeting"}, Length: 13},
581 b, err := bencode.Marshal(info)
582 require.NoError(t, err)
583 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
585 InfoHash: metainfo.HashBytes(b),
587 require.NoError(t, err)
592 b, err = ioutil.ReadAll(r)
594 assert.NoError(t, err)
597 func BenchmarkAddLargeTorrent(b *testing.B) {
598 cfg := TestingConfig()
599 cfg.DisableTCP = true
600 cfg.DisableUTP = true
601 cfg.ListenAddr = "redonk"
602 cl, err := NewClient(cfg)
603 require.NoError(b, err)
605 for range iter.N(b.N) {
606 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
614 func TestResponsive(t *testing.T) {
615 seederDataDir, mi := testutil.GreetingTestTorrent()
616 defer os.RemoveAll(seederDataDir)
617 cfg := TestingConfig()
619 cfg.DataDir = seederDataDir
620 seeder, err := NewClient(cfg)
623 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
624 seederTorrent.VerifyData()
625 leecherDataDir, err := ioutil.TempDir("", "")
627 defer os.RemoveAll(leecherDataDir)
628 cfg = TestingConfig()
629 cfg.DataDir = leecherDataDir
630 leecher, err := NewClient(cfg)
632 defer leecher.Close()
633 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
634 ret = TorrentSpecFromMetaInfo(mi)
638 addClientPeer(leecherTorrent, seeder)
639 reader := leecherTorrent.NewReader()
641 reader.SetReadahead(0)
642 reader.SetResponsive()
644 _, err = reader.Seek(3, io.SeekStart)
645 require.NoError(t, err)
646 _, err = io.ReadFull(reader, b)
648 assert.EqualValues(t, "lo", string(b))
649 _, err = reader.Seek(11, io.SeekStart)
650 require.NoError(t, err)
651 n, err := io.ReadFull(reader, b)
653 assert.EqualValues(t, 2, n)
654 assert.EqualValues(t, "d\n", string(b))
657 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
658 seederDataDir, mi := testutil.GreetingTestTorrent()
659 defer os.RemoveAll(seederDataDir)
660 cfg := TestingConfig()
662 cfg.DataDir = seederDataDir
663 seeder, err := NewClient(cfg)
666 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
667 seederTorrent.VerifyData()
668 leecherDataDir, err := ioutil.TempDir("", "")
670 defer os.RemoveAll(leecherDataDir)
671 cfg = TestingConfig()
672 cfg.DataDir = leecherDataDir
673 leecher, err := NewClient(cfg)
675 defer leecher.Close()
676 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
677 ret = TorrentSpecFromMetaInfo(mi)
681 addClientPeer(leecherTorrent, seeder)
682 reader := leecherTorrent.NewReader()
684 reader.SetReadahead(0)
685 reader.SetResponsive()
687 _, err = reader.Seek(3, io.SeekStart)
688 require.NoError(t, err)
689 _, err = io.ReadFull(reader, b)
691 assert.EqualValues(t, "lo", string(b))
692 go leecherTorrent.Drop()
693 _, err = reader.Seek(11, io.SeekStart)
694 require.NoError(t, err)
695 n, err := reader.Read(b)
696 assert.EqualError(t, err, "torrent closed")
697 assert.EqualValues(t, 0, n)
700 func TestDHTInheritBlocklist(t *testing.T) {
701 ipl := iplist.New(nil)
702 require.NotNil(t, ipl)
703 cfg := TestingConfig()
704 cfg.IPBlocklist = ipl
706 cl, err := NewClient(cfg)
707 require.NoError(t, err)
709 require.Equal(t, ipl, cl.DHT().IPBlocklist())
712 // Check that stuff is merged in subsequent AddTorrentSpec for the same
714 func TestAddTorrentSpecMerging(t *testing.T) {
715 cl, err := NewClient(TestingConfig())
716 require.NoError(t, err)
718 dir, mi := testutil.GreetingTestTorrent()
719 defer os.RemoveAll(dir)
720 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
721 InfoHash: mi.HashInfoBytes(),
723 require.NoError(t, err)
725 require.Nil(t, tt.Info())
726 _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
727 require.NoError(t, err)
728 require.False(t, new)
729 require.NotNil(t, tt.Info())
732 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
733 dir, mi := testutil.GreetingTestTorrent()
735 cl, _ := NewClient(TestingConfig())
737 tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
738 InfoHash: mi.HashInfoBytes(),
741 assert.EqualValues(t, 0, len(cl.Torrents()))
749 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
750 for i := range iter.N(info.NumPieces()) {
752 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
756 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
757 fileCacheDir, err := ioutil.TempDir("", "")
758 require.NoError(t, err)
759 defer os.RemoveAll(fileCacheDir)
760 fileCache, err := filecache.NewCache(fileCacheDir)
761 require.NoError(t, err)
762 greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
763 defer os.RemoveAll(greetingDataTempDir)
764 filePieceStore := csf(fileCache)
765 defer filePieceStore.Close()
766 info, err := greetingMetainfo.UnmarshalInfo()
767 require.NoError(t, err)
768 ih := greetingMetainfo.HashInfoBytes()
769 greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
770 require.NoError(t, err)
771 writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
772 // require.Equal(t, len(testutil.GreetingFileContents), written)
773 // require.NoError(t, err)
774 for i := 0; i < info.NumPieces(); i++ {
776 if alreadyCompleted {
777 require.NoError(t, greetingData.Piece(p).MarkComplete())
780 cfg := TestingConfig()
781 // TODO: Disable network option?
782 cfg.DisableTCP = true
783 cfg.DisableUTP = true
784 cfg.DefaultStorage = filePieceStore
785 cl, err := NewClient(cfg)
786 require.NoError(t, err)
788 tt, err := cl.AddTorrent(greetingMetainfo)
789 require.NoError(t, err)
790 psrs := tt.PieceStateRuns()
791 assert.Len(t, psrs, 1)
792 assert.EqualValues(t, 3, psrs[0].Length)
793 assert.Equal(t, alreadyCompleted, psrs[0].Complete)
794 if alreadyCompleted {
796 b, err := ioutil.ReadAll(r)
797 assert.NoError(t, err)
798 assert.EqualValues(t, testutil.GreetingFileContents, b)
802 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
803 testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
806 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
807 testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
810 func TestAddMetainfoWithNodes(t *testing.T) {
811 cfg := TestingConfig()
812 cfg.ListenAddr = ":0"
814 // For now, we want to just jam the nodes into the table, without
815 // verifying them first. Also the DHT code doesn't support mixing secure
816 // and insecure nodes if security is enabled (yet).
817 cfg.DHTConfig.NoSecurity = true
818 cl, err := NewClient(cfg)
819 require.NoError(t, err)
821 assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
822 tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
823 require.NoError(t, err)
824 // Nodes are not added or exposed in Torrent's metainfo. We just randomly
825 // check if the announce-list is here instead. TODO: Add nodes.
826 assert.Len(t, tt.metainfo.AnnounceList, 5)
827 // There are 6 nodes in the torrent file.
828 assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
831 type testDownloadCancelParams struct {
832 ExportClientStatus bool
833 SetLeecherStorageCapacity bool
834 LeecherStorageCapacity int64
838 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
839 greetingTempDir, mi := testutil.GreetingTestTorrent()
840 defer os.RemoveAll(greetingTempDir)
841 cfg := TestingConfig()
843 cfg.DataDir = greetingTempDir
844 seeder, err := NewClient(cfg)
845 require.NoError(t, err)
847 if ps.ExportClientStatus {
848 testutil.ExportStatusWriter(seeder, "s")
850 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
851 seederTorrent.VerifyData()
852 leecherDataDir, err := ioutil.TempDir("", "")
853 require.NoError(t, err)
854 defer os.RemoveAll(leecherDataDir)
855 fc, err := filecache.NewCache(leecherDataDir)
856 require.NoError(t, err)
857 if ps.SetLeecherStorageCapacity {
858 fc.SetCapacity(ps.LeecherStorageCapacity)
860 cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
861 cfg.DataDir = leecherDataDir
862 leecher, _ := NewClient(cfg)
863 defer leecher.Close()
864 if ps.ExportClientStatus {
865 testutil.ExportStatusWriter(leecher, "l")
867 leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
868 ret = TorrentSpecFromMetaInfo(mi)
872 require.NoError(t, err)
874 psc := leecherGreeting.SubscribePieceStateChanges()
877 leecherGreeting.cl.mu.Lock()
878 leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
880 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
882 leecherGreeting.cl.mu.Unlock()
884 addClientPeer(leecherGreeting, seeder)
885 completes := make(map[int]bool, 3)
888 // started := time.Now()
890 case _v := <-psc.Values:
891 // log.Print(time.Since(started))
892 v := _v.(PieceStateChange)
893 completes[v.Index] = v.Complete
894 case <-time.After(100 * time.Millisecond):
899 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
901 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
906 func TestTorrentDownloadAll(t *testing.T) {
907 testDownloadCancel(t, testDownloadCancelParams{})
910 func TestTorrentDownloadAllThenCancel(t *testing.T) {
911 testDownloadCancel(t, testDownloadCancelParams{
916 // Ensure that it's an error for a peer to send an invalid have message.
917 func TestPeerInvalidHave(t *testing.T) {
918 cl, err := NewClient(TestingConfig())
919 require.NoError(t, err)
921 info := metainfo.Info{
923 Pieces: make([]byte, 20),
924 Files: []metainfo.FileInfo{{Length: 1}},
926 infoBytes, err := bencode.Marshal(info)
927 require.NoError(t, err)
928 tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
929 InfoBytes: infoBytes,
930 InfoHash: metainfo.HashBytes(infoBytes),
931 Storage: badStorage{},
933 require.NoError(t, err)
939 assert.NoError(t, cn.peerSentHave(0))
940 assert.Error(t, cn.peerSentHave(1))
943 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
944 greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
945 defer os.RemoveAll(greetingTempDir)
946 cfg := TestingConfig()
947 cfg.DataDir = greetingTempDir
948 seeder, err := NewClient(TestingConfig())
949 require.NoError(t, err)
950 seeder.AddTorrentSpec(&TorrentSpec{
951 InfoBytes: greetingMetainfo.InfoBytes,
955 func TestPrepareTrackerAnnounce(t *testing.T) {
957 blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
958 require.NoError(t, err)
959 assert.False(t, blocked)
960 assert.EqualValues(t, "localhost:1234", host)
961 assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
964 // Check that when the listen port is 0, all the protocols listened on have
965 // the same port, and it isn't zero.
966 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
967 cl, err := NewClient(TestingConfig())
968 require.NoError(t, err)
970 assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
971 assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
974 func TestClientDynamicListenTCPOnly(t *testing.T) {
975 cfg := TestingConfig()
976 cfg.DisableUTP = true
977 cl, err := NewClient(cfg)
978 require.NoError(t, err)
980 assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
981 assert.Nil(t, cl.utpSock)
984 func TestClientDynamicListenUTPOnly(t *testing.T) {
985 cfg := TestingConfig()
986 cfg.DisableTCP = true
987 cl, err := NewClient(cfg)
988 require.NoError(t, err)
990 assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
991 assert.Nil(t, cl.tcpListener)
994 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
995 cfg := TestingConfig()
996 cfg.DisableTCP = true
997 cfg.DisableUTP = true
998 cl, err := NewClient(cfg)
999 require.NoError(t, err)
1001 assert.Nil(t, cl.ListenAddr())
1004 func addClientPeer(t *Torrent, cl *Client) {
1007 IP: missinggo.AddrIP(cl.ListenAddr()),
1008 Port: missinggo.AddrPort(cl.ListenAddr()),
1013 func totalConns(tts []*Torrent) (ret int) {
1014 for _, tt := range tts {
1016 ret += len(tt.conns)
1022 func TestSetMaxEstablishedConn(t *testing.T) {
1024 ih := testutil.GreetingMetaInfo().HashInfoBytes()
1025 for i := range iter.N(3) {
1026 cl, err := NewClient(TestingConfig())
1027 require.NoError(t, err)
1029 tt, _ := cl.AddTorrentInfoHash(ih)
1030 tt.SetMaxEstablishedConns(2)
1031 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1032 tts = append(tts, tt)
1034 addPeers := func() {
1035 for i, tt := range tts {
1036 for _, _tt := range tts[:i] {
1037 addClientPeer(tt, _tt.cl)
1041 waitTotalConns := func(num int) {
1042 for totalConns(tts) != num {
1043 time.Sleep(time.Millisecond)
1048 tts[0].SetMaxEstablishedConns(1)
1050 tts[0].SetMaxEstablishedConns(0)
1052 tts[0].SetMaxEstablishedConns(1)
1055 tts[0].SetMaxEstablishedConns(2)
1060 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1061 os.MkdirAll(dir, 0770)
1062 file, err := os.Create(filepath.Join(dir, name))
1063 require.NoError(t, err)
1064 file.Write([]byte(name))
1066 mi := metainfo.MetaInfo{}
1068 info := metainfo.Info{PieceLength: 256 * 1024}
1069 err = info.BuildFromFilePath(filepath.Join(dir, name))
1070 require.NoError(t, err)
1071 mi.InfoBytes, err = bencode.Marshal(info)
1072 require.NoError(t, err)
1073 magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1074 tr, err := cl.AddTorrent(&mi)
1075 require.NoError(t, err)
1076 require.True(t, tr.Seeding())
1081 // https://github.com/anacrolix/torrent/issues/114
1082 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1083 cfg := TestingConfig()
1084 cfg.DisableUTP = true
1086 cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1088 cfg.ForceEncryption = true
1089 os.Mkdir(cfg.DataDir, 0755)
1090 server, err := NewClient(cfg)
1091 require.NoError(t, err)
1092 defer server.Close()
1093 testutil.ExportStatusWriter(server, "s")
1094 magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1095 makeMagnet(t, server, cfg.DataDir, "test2")
1096 cfg = TestingConfig()
1097 cfg.DisableUTP = true
1098 cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1100 cfg.ForceEncryption = true
1101 client, err := NewClient(cfg)
1102 require.NoError(t, err)
1103 defer client.Close()
1104 testutil.ExportStatusWriter(client, "c")
1105 tr, err := client.AddMagnet(magnet1)
1106 require.NoError(t, err)
1107 tr.AddPeers([]Peer{{
1108 IP: missinggo.AddrIP(server.ListenAddr()),
1109 Port: missinggo.AddrPort(server.ListenAddr()),
1116 func TestClientAddressInUse(t *testing.T) {
1117 s, _ := NewUtpSocket("udp", ":50007")
1121 cfg := TestingConfig()
1122 cfg.ListenAddr = ":50007"
1123 cl, err := NewClient(cfg)
1124 require.Error(t, err)