18 _ "github.com/anacrolix/envpprof"
19 "github.com/anacrolix/missinggo"
20 "github.com/anacrolix/missinggo/filecache"
21 "github.com/anacrolix/missinggo/pubsub"
22 "github.com/bradfitz/iter"
23 "github.com/stretchr/testify/assert"
24 "github.com/stretchr/testify/require"
25 "golang.org/x/time/rate"
27 "github.com/anacrolix/torrent/bencode"
28 "github.com/anacrolix/torrent/internal/testutil"
29 "github.com/anacrolix/torrent/iplist"
30 "github.com/anacrolix/torrent/metainfo"
31 "github.com/anacrolix/torrent/storage"
34 func TestingConfig() *Config {
36 ListenAddr: "localhost:0",
39 DisableTrackers: true,
44 func TestClientDefault(t *testing.T) {
45 cl, err := NewClient(TestingConfig())
46 require.NoError(t, err)
50 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
51 cfg := TestingConfig()
52 pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
53 require.NoError(t, err)
54 ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
56 cfg.DefaultStorage = ci
57 cl, err := NewClient(cfg)
58 require.NoError(t, err)
60 // And again, https://github.com/anacrolix/torrent/issues/158
61 cl, err = NewClient(cfg)
62 require.NoError(t, err)
66 func TestAddDropTorrent(t *testing.T) {
67 cl, err := NewClient(TestingConfig())
68 require.NoError(t, err)
70 dir, mi := testutil.GreetingTestTorrent()
71 defer os.RemoveAll(dir)
72 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
73 require.NoError(t, err)
75 tt.SetMaxEstablishedConns(0)
76 tt.SetMaxEstablishedConns(1)
80 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
85 func TestAddTorrentNoUsableURLs(t *testing.T) {
90 func TestAddPeersToUnknownTorrent(t *testing.T) {
95 func TestPieceHashSize(t *testing.T) {
96 assert.Equal(t, 20, pieceHash.Size())
99 func TestTorrentInitialState(t *testing.T) {
100 dir, mi := testutil.GreetingTestTorrent()
101 defer os.RemoveAll(dir)
103 infoHash: mi.HashInfoBytes(),
104 pieceStateChanges: pubsub.NewPubSub(),
107 tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()))
108 // Needed to lock for asynchronous piece verification.
111 err := tor.setInfoBytes(mi.InfoBytes)
113 require.NoError(t, err)
114 require.Len(t, tor.pieces, 3)
115 tor.pendAllChunkSpecs(0)
117 assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
119 assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
122 func TestUnmarshalPEXMsg(t *testing.T) {
123 var m peerExchangeMessage
124 if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
127 if len(m.Added) != 2 {
130 if m.Added[0].Port != 0x506 {
135 func TestReducedDialTimeout(t *testing.T) {
138 for _, _case := range []struct {
142 ExpectedReduced time.Duration
144 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
145 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
146 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
147 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
148 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
149 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
151 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
152 expected := _case.ExpectedReduced
153 if expected < cfg.MinDialTimeout {
154 expected = cfg.MinDialTimeout
156 if reduced != expected {
157 t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
162 func TestUTPRawConn(t *testing.T) {
163 l, err := NewUtpSocket("udp", "")
164 require.NoError(t, err)
174 // Connect a UTP peer to see if the RawConn will still work.
175 s, err := NewUtpSocket("udp", "")
176 require.NoError(t, err)
178 utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
179 require.NoError(t, err)
180 defer utpPeer.Close()
181 peer, err := net.ListenPacket("udp", ":0")
182 require.NoError(t, err)
186 // How many messages to send. I've set this to double the channel buffer
187 // size in the raw packetConn.
189 readerStopped := make(chan struct{})
190 // The reader goroutine.
192 defer close(readerStopped)
193 b := make([]byte, 500)
194 for i := 0; i < N; i++ {
195 n, _, err := l.ReadFrom(b)
196 require.NoError(t, err)
199 fmt.Sscan(string(b[:n]), &d)
200 assert.Equal(t, i, d)
203 udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
204 require.NoError(t, err)
205 for i := 0; i < N; i++ {
206 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
207 require.NoError(t, err)
208 time.Sleep(time.Millisecond)
211 case <-readerStopped:
212 case <-time.After(time.Second):
213 t.Fatal("reader timed out")
215 if msgsReceived != N {
216 t.Fatalf("messages received: %d", msgsReceived)
220 func TestTwoClientsArbitraryPorts(t *testing.T) {
221 for i := 0; i < 2; i++ {
222 cl, err := NewClient(TestingConfig())
230 func TestAddDropManyTorrents(t *testing.T) {
231 cl, err := NewClient(TestingConfig())
232 require.NoError(t, err)
234 for i := range iter.N(1000) {
236 binary.PutVarint(spec.InfoHash[:], int64(i))
237 tt, new, err := cl.AddTorrentSpec(&spec)
238 assert.NoError(t, err)
244 type FileCacheClientStorageFactoryParams struct {
247 Wrapper func(*filecache.Cache) storage.ClientImpl
250 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
251 return func(dataDir string) storage.ClientImpl {
252 fc, err := filecache.NewCache(dataDir)
257 fc.SetCapacity(ps.Capacity)
259 return ps.Wrapper(fc)
263 type storageFactory func(string) storage.ClientImpl
265 func TestClientTransferDefault(t *testing.T) {
266 testClientTransfer(t, testClientTransferParams{
267 ExportClientStatus: true,
268 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
269 Wrapper: fileCachePieceResourceStorage,
274 func TestClientTransferRateLimitedUpload(t *testing.T) {
275 started := time.Now()
276 testClientTransfer(t, testClientTransferParams{
277 // We are uploading 13 bytes (the length of the greeting torrent). The
278 // chunks are 2 bytes in length. Then the smallest burst we can run
279 // with is 2. Time taken is (13-burst)/rate.
280 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
282 require.True(t, time.Since(started) > time.Second)
285 func TestClientTransferRateLimitedDownload(t *testing.T) {
286 testClientTransfer(t, testClientTransferParams{
287 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
291 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
292 return storage.NewResourcePieces(fc.AsResourceProvider())
295 func TestClientTransferSmallCache(t *testing.T) {
296 testClientTransfer(t, testClientTransferParams{
297 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
299 // Going below the piece length means it can't complete a piece so
300 // that it can be hashed.
302 Wrapper: fileCachePieceResourceStorage,
305 // Can't readahead too far or the cache will thrash and drop data we
308 ExportClientStatus: true,
312 func TestClientTransferVarious(t *testing.T) {
314 for _, ls := range []storageFactory{
315 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
316 Wrapper: fileCachePieceResourceStorage,
321 for _, ss := range []func(string) storage.ClientImpl{
325 for _, responsive := range []bool{false, true} {
326 testClientTransfer(t, testClientTransferParams{
327 Responsive: responsive,
331 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
332 testClientTransfer(t, testClientTransferParams{
334 Responsive: responsive,
336 Readahead: readahead,
345 type testClientTransferParams struct {
349 ExportClientStatus bool
350 LeecherStorage func(string) storage.ClientImpl
351 SeederStorage func(string) storage.ClientImpl
352 SeederUploadRateLimiter *rate.Limiter
353 LeecherDownloadRateLimiter *rate.Limiter
356 // Creates a seeder and a leecher, and ensures the data transfers when a read
357 // is attempted on the leecher.
358 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
359 greetingTempDir, mi := testutil.GreetingTestTorrent()
360 defer os.RemoveAll(greetingTempDir)
361 // Create seeder and a Torrent.
362 cfg := TestingConfig()
364 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
365 // cfg.ListenAddr = "localhost:4000"
366 if ps.SeederStorage != nil {
367 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
368 defer cfg.DefaultStorage.Close()
370 cfg.DataDir = greetingTempDir
372 seeder, err := NewClient(cfg)
373 require.NoError(t, err)
374 if ps.ExportClientStatus {
375 testutil.ExportStatusWriter(seeder, "s")
377 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
378 // Run a Stats right after Closing the Client. This will trigger the Stats
379 // panic in #214 caused by RemoteAddr on Closed uTP sockets.
380 defer seederTorrent.Stats()
382 seederTorrent.VerifyData()
383 // Create leecher and a Torrent.
384 leecherDataDir, err := ioutil.TempDir("", "")
385 require.NoError(t, err)
386 defer os.RemoveAll(leecherDataDir)
387 if ps.LeecherStorage == nil {
388 cfg.DataDir = leecherDataDir
390 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
392 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
393 // cfg.ListenAddr = "localhost:4001"
394 leecher, err := NewClient(cfg)
395 require.NoError(t, err)
396 defer leecher.Close()
397 if ps.ExportClientStatus {
398 testutil.ExportStatusWriter(leecher, "l")
400 leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
401 ret = TorrentSpecFromMetaInfo(mi)
405 require.NoError(t, err)
407 // Now do some things with leecher and seeder.
408 addClientPeer(leecherGreeting, seeder)
409 r := leecherGreeting.NewReader()
415 r.SetReadahead(ps.Readahead)
417 assertReadAllGreeting(t, r)
418 // After one read through, we can assume certain torrent statistics.
419 // These are not a strict requirement. It is however interesting to
421 // t.Logf("%#v", seederTorrent.Stats())
422 // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
423 // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
424 // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
425 // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
426 // Read through again for the cases where the torrent data size exceeds
427 // the size of the cache.
428 assertReadAllGreeting(t, r)
431 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
432 pos, err := r.Seek(0, io.SeekStart)
433 assert.NoError(t, err)
434 assert.EqualValues(t, 0, pos)
435 _greeting, err := ioutil.ReadAll(r)
436 assert.NoError(t, err)
437 assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
440 // Check that after completing leeching, a leecher transitions to a seeding
441 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
442 func TestSeedAfterDownloading(t *testing.T) {
443 greetingTempDir, mi := testutil.GreetingTestTorrent()
444 defer os.RemoveAll(greetingTempDir)
445 cfg := TestingConfig()
447 cfg.DataDir = greetingTempDir
448 seeder, err := NewClient(cfg)
449 require.NoError(t, err)
451 testutil.ExportStatusWriter(seeder, "s")
452 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
453 seederTorrent.VerifyData()
454 cfg.DataDir, err = ioutil.TempDir("", "")
455 require.NoError(t, err)
456 defer os.RemoveAll(cfg.DataDir)
457 leecher, err := NewClient(cfg)
458 require.NoError(t, err)
459 defer leecher.Close()
460 testutil.ExportStatusWriter(leecher, "l")
462 cfg.DataDir, err = ioutil.TempDir("", "")
463 require.NoError(t, err)
464 defer os.RemoveAll(cfg.DataDir)
465 leecherLeecher, _ := NewClient(cfg)
466 defer leecherLeecher.Close()
467 testutil.ExportStatusWriter(leecherLeecher, "ll")
468 leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
469 ret = TorrentSpecFromMetaInfo(mi)
473 llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
474 ret = TorrentSpecFromMetaInfo(mi)
478 // Simultaneously DownloadAll in Leecher, and read the contents
479 // consecutively in LeecherLeecher. This non-deterministically triggered a
480 // case where the leecher wouldn't unchoke the LeecherLeecher.
481 var wg sync.WaitGroup
487 b, err := ioutil.ReadAll(r)
488 require.NoError(t, err)
489 assert.EqualValues(t, testutil.GreetingFileContents, b)
491 addClientPeer(leecherGreeting, seeder)
492 addClientPeer(leecherGreeting, leecherLeecher)
496 leecherGreeting.DownloadAll()
502 func TestMergingTrackersByAddingSpecs(t *testing.T) {
503 cl, err := NewClient(TestingConfig())
504 require.NoError(t, err)
506 spec := TorrentSpec{}
507 T, new, _ := cl.AddTorrentSpec(&spec)
511 spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
512 _, new, _ = cl.AddTorrentSpec(&spec)
514 assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
515 // Because trackers are disabled in TestingConfig.
516 assert.EqualValues(t, 0, len(T.trackerAnnouncers))
519 type badStorage struct{}
521 var _ storage.ClientImpl = badStorage{}
523 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
527 func (bs badStorage) Close() error {
531 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
532 return badStoragePiece{p}
535 type badStoragePiece struct {
539 var _ storage.PieceImpl = badStoragePiece{}
541 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
545 func (p badStoragePiece) Completion() storage.Completion {
546 return storage.Completion{Complete: true, Ok: true}
549 func (p badStoragePiece) MarkComplete() error {
550 return errors.New("psyyyyyyyche")
553 func (p badStoragePiece) MarkNotComplete() error {
554 return errors.New("psyyyyyyyche")
557 func (p badStoragePiece) randomlyTruncatedDataString() string {
558 return "hello, world\n"[:rand.Intn(14)]
561 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
562 r := strings.NewReader(p.randomlyTruncatedDataString())
563 return r.ReadAt(b, off+p.p.Offset())
566 // We read from a piece which is marked completed, but is missing data.
567 func TestCompletedPieceWrongSize(t *testing.T) {
568 cfg := TestingConfig()
569 cfg.DefaultStorage = badStorage{}
570 cl, err := NewClient(cfg)
571 require.NoError(t, err)
573 info := metainfo.Info{
575 Pieces: make([]byte, 20),
576 Files: []metainfo.FileInfo{
577 {Path: []string{"greeting"}, Length: 13},
580 b, err := bencode.Marshal(info)
581 require.NoError(t, err)
582 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
584 InfoHash: metainfo.HashBytes(b),
586 require.NoError(t, err)
591 b, err = ioutil.ReadAll(r)
593 assert.NoError(t, err)
596 func BenchmarkAddLargeTorrent(b *testing.B) {
597 cfg := TestingConfig()
598 cfg.DisableTCP = true
599 cfg.DisableUTP = true
600 cfg.ListenAddr = "redonk"
601 cl, err := NewClient(cfg)
602 require.NoError(b, err)
604 for range iter.N(b.N) {
605 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
613 func TestResponsive(t *testing.T) {
614 seederDataDir, mi := testutil.GreetingTestTorrent()
615 defer os.RemoveAll(seederDataDir)
616 cfg := TestingConfig()
618 cfg.DataDir = seederDataDir
619 seeder, err := NewClient(cfg)
622 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
623 seederTorrent.VerifyData()
624 leecherDataDir, err := ioutil.TempDir("", "")
626 defer os.RemoveAll(leecherDataDir)
627 cfg = TestingConfig()
628 cfg.DataDir = leecherDataDir
629 leecher, err := NewClient(cfg)
631 defer leecher.Close()
632 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
633 ret = TorrentSpecFromMetaInfo(mi)
637 addClientPeer(leecherTorrent, seeder)
638 reader := leecherTorrent.NewReader()
640 reader.SetReadahead(0)
641 reader.SetResponsive()
643 _, err = reader.Seek(3, io.SeekStart)
644 require.NoError(t, err)
645 _, err = io.ReadFull(reader, b)
647 assert.EqualValues(t, "lo", string(b))
648 _, err = reader.Seek(11, io.SeekStart)
649 require.NoError(t, err)
650 n, err := io.ReadFull(reader, b)
652 assert.EqualValues(t, 2, n)
653 assert.EqualValues(t, "d\n", string(b))
656 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
657 seederDataDir, mi := testutil.GreetingTestTorrent()
658 defer os.RemoveAll(seederDataDir)
659 cfg := TestingConfig()
661 cfg.DataDir = seederDataDir
662 seeder, err := NewClient(cfg)
665 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
666 seederTorrent.VerifyData()
667 leecherDataDir, err := ioutil.TempDir("", "")
669 defer os.RemoveAll(leecherDataDir)
670 cfg = TestingConfig()
671 cfg.DataDir = leecherDataDir
672 leecher, err := NewClient(cfg)
674 defer leecher.Close()
675 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
676 ret = TorrentSpecFromMetaInfo(mi)
680 addClientPeer(leecherTorrent, seeder)
681 reader := leecherTorrent.NewReader()
683 reader.SetReadahead(0)
684 reader.SetResponsive()
686 _, err = reader.Seek(3, io.SeekStart)
687 require.NoError(t, err)
688 _, err = io.ReadFull(reader, b)
690 assert.EqualValues(t, "lo", string(b))
691 go leecherTorrent.Drop()
692 _, err = reader.Seek(11, io.SeekStart)
693 require.NoError(t, err)
694 n, err := reader.Read(b)
695 assert.EqualError(t, err, "torrent closed")
696 assert.EqualValues(t, 0, n)
699 func TestDHTInheritBlocklist(t *testing.T) {
700 ipl := iplist.New(nil)
701 require.NotNil(t, ipl)
702 cfg := TestingConfig()
703 cfg.IPBlocklist = ipl
705 cl, err := NewClient(cfg)
706 require.NoError(t, err)
708 require.Equal(t, ipl, cl.DHT().IPBlocklist())
711 // Check that stuff is merged in subsequent AddTorrentSpec for the same
713 func TestAddTorrentSpecMerging(t *testing.T) {
714 cl, err := NewClient(TestingConfig())
715 require.NoError(t, err)
717 dir, mi := testutil.GreetingTestTorrent()
718 defer os.RemoveAll(dir)
719 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
720 InfoHash: mi.HashInfoBytes(),
722 require.NoError(t, err)
724 require.Nil(t, tt.Info())
725 _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
726 require.NoError(t, err)
727 require.False(t, new)
728 require.NotNil(t, tt.Info())
731 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
732 dir, mi := testutil.GreetingTestTorrent()
734 cl, _ := NewClient(TestingConfig())
736 tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
737 InfoHash: mi.HashInfoBytes(),
740 assert.EqualValues(t, 0, len(cl.Torrents()))
748 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
749 for i := range iter.N(info.NumPieces()) {
751 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
755 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
756 fileCacheDir, err := ioutil.TempDir("", "")
757 require.NoError(t, err)
758 defer os.RemoveAll(fileCacheDir)
759 fileCache, err := filecache.NewCache(fileCacheDir)
760 require.NoError(t, err)
761 greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
762 defer os.RemoveAll(greetingDataTempDir)
763 filePieceStore := csf(fileCache)
764 defer filePieceStore.Close()
765 info, err := greetingMetainfo.UnmarshalInfo()
766 require.NoError(t, err)
767 ih := greetingMetainfo.HashInfoBytes()
768 greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
769 require.NoError(t, err)
770 writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
771 // require.Equal(t, len(testutil.GreetingFileContents), written)
772 // require.NoError(t, err)
773 for i := 0; i < info.NumPieces(); i++ {
775 if alreadyCompleted {
776 require.NoError(t, greetingData.Piece(p).MarkComplete())
779 cfg := TestingConfig()
780 // TODO: Disable network option?
781 cfg.DisableTCP = true
782 cfg.DisableUTP = true
783 cfg.DefaultStorage = filePieceStore
784 cl, err := NewClient(cfg)
785 require.NoError(t, err)
787 tt, err := cl.AddTorrent(greetingMetainfo)
788 require.NoError(t, err)
789 psrs := tt.PieceStateRuns()
790 assert.Len(t, psrs, 1)
791 assert.EqualValues(t, 3, psrs[0].Length)
792 assert.Equal(t, alreadyCompleted, psrs[0].Complete)
793 if alreadyCompleted {
795 b, err := ioutil.ReadAll(r)
796 assert.NoError(t, err)
797 assert.EqualValues(t, testutil.GreetingFileContents, b)
801 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
802 testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
805 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
806 testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
809 func TestAddMetainfoWithNodes(t *testing.T) {
810 cfg := TestingConfig()
811 cfg.ListenAddr = ":0"
813 // For now, we want to just jam the nodes into the table, without
814 // verifying them first. Also the DHT code doesn't support mixing secure
815 // and insecure nodes if security is enabled (yet).
816 cfg.DHTConfig.NoSecurity = true
817 cl, err := NewClient(cfg)
818 require.NoError(t, err)
820 assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
821 tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
822 require.NoError(t, err)
823 // Nodes are not added or exposed in Torrent's metainfo. We just randomly
824 // check if the announce-list is here instead. TODO: Add nodes.
825 assert.Len(t, tt.metainfo.AnnounceList, 5)
826 // There are 6 nodes in the torrent file.
827 assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
830 type testDownloadCancelParams struct {
831 ExportClientStatus bool
832 SetLeecherStorageCapacity bool
833 LeecherStorageCapacity int64
837 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
838 greetingTempDir, mi := testutil.GreetingTestTorrent()
839 defer os.RemoveAll(greetingTempDir)
840 cfg := TestingConfig()
842 cfg.DataDir = greetingTempDir
843 seeder, err := NewClient(cfg)
844 require.NoError(t, err)
846 if ps.ExportClientStatus {
847 testutil.ExportStatusWriter(seeder, "s")
849 seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
850 seederTorrent.VerifyData()
851 leecherDataDir, err := ioutil.TempDir("", "")
852 require.NoError(t, err)
853 defer os.RemoveAll(leecherDataDir)
854 fc, err := filecache.NewCache(leecherDataDir)
855 require.NoError(t, err)
856 if ps.SetLeecherStorageCapacity {
857 fc.SetCapacity(ps.LeecherStorageCapacity)
859 cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
860 cfg.DataDir = leecherDataDir
861 leecher, _ := NewClient(cfg)
862 defer leecher.Close()
863 if ps.ExportClientStatus {
864 testutil.ExportStatusWriter(leecher, "l")
866 leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
867 ret = TorrentSpecFromMetaInfo(mi)
871 require.NoError(t, err)
873 psc := leecherGreeting.SubscribePieceStateChanges()
875 leecherGreeting.DownloadAll()
877 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
879 addClientPeer(leecherGreeting, seeder)
880 completes := make(map[int]bool, 3)
883 // started := time.Now()
885 case _v := <-psc.Values:
886 // log.Print(time.Since(started))
887 v := _v.(PieceStateChange)
888 completes[v.Index] = v.Complete
889 case <-time.After(100 * time.Millisecond):
894 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
896 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
901 func TestTorrentDownloadAll(t *testing.T) {
902 testDownloadCancel(t, testDownloadCancelParams{})
905 func TestTorrentDownloadAllThenCancel(t *testing.T) {
906 testDownloadCancel(t, testDownloadCancelParams{
911 // Ensure that it's an error for a peer to send an invalid have message.
912 func TestPeerInvalidHave(t *testing.T) {
913 cl, err := NewClient(TestingConfig())
914 require.NoError(t, err)
916 info := metainfo.Info{
918 Pieces: make([]byte, 20),
919 Files: []metainfo.FileInfo{{Length: 1}},
921 infoBytes, err := bencode.Marshal(info)
922 require.NoError(t, err)
923 tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
924 InfoBytes: infoBytes,
925 InfoHash: metainfo.HashBytes(infoBytes),
926 Storage: badStorage{},
928 require.NoError(t, err)
934 assert.NoError(t, cn.peerSentHave(0))
935 assert.Error(t, cn.peerSentHave(1))
938 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
939 greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
940 defer os.RemoveAll(greetingTempDir)
941 cfg := TestingConfig()
942 cfg.DataDir = greetingTempDir
943 seeder, err := NewClient(TestingConfig())
944 require.NoError(t, err)
945 seeder.AddTorrentSpec(&TorrentSpec{
946 InfoBytes: greetingMetainfo.InfoBytes,
950 func TestPrepareTrackerAnnounce(t *testing.T) {
952 blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
953 require.NoError(t, err)
954 assert.False(t, blocked)
955 assert.EqualValues(t, "localhost:1234", host)
956 assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
959 // Check that when the listen port is 0, all the protocols listened on have
960 // the same port, and it isn't zero.
961 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
962 cl, err := NewClient(TestingConfig())
963 require.NoError(t, err)
965 assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
966 assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
969 func TestClientDynamicListenTCPOnly(t *testing.T) {
970 cfg := TestingConfig()
971 cfg.DisableUTP = true
972 cl, err := NewClient(cfg)
973 require.NoError(t, err)
975 assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
976 assert.Nil(t, cl.utpSock)
979 func TestClientDynamicListenUTPOnly(t *testing.T) {
980 cfg := TestingConfig()
981 cfg.DisableTCP = true
982 cl, err := NewClient(cfg)
983 require.NoError(t, err)
985 assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
986 assert.Nil(t, cl.tcpListener)
989 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
990 cfg := TestingConfig()
991 cfg.DisableTCP = true
992 cfg.DisableUTP = true
993 cl, err := NewClient(cfg)
994 require.NoError(t, err)
996 assert.Nil(t, cl.ListenAddr())
999 func addClientPeer(t *Torrent, cl *Client) {
1002 IP: missinggo.AddrIP(cl.ListenAddr()),
1003 Port: missinggo.AddrPort(cl.ListenAddr()),
1008 func totalConns(tts []*Torrent) (ret int) {
1009 for _, tt := range tts {
1011 ret += len(tt.conns)
1017 func TestSetMaxEstablishedConn(t *testing.T) {
1019 ih := testutil.GreetingMetaInfo().HashInfoBytes()
1020 for i := range iter.N(3) {
1021 cl, err := NewClient(TestingConfig())
1022 require.NoError(t, err)
1024 tt, _ := cl.AddTorrentInfoHash(ih)
1025 tt.SetMaxEstablishedConns(2)
1026 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1027 tts = append(tts, tt)
1029 addPeers := func() {
1030 for i, tt := range tts {
1031 for _, _tt := range tts[:i] {
1032 addClientPeer(tt, _tt.cl)
1036 waitTotalConns := func(num int) {
1037 for totalConns(tts) != num {
1038 time.Sleep(time.Millisecond)
1043 tts[0].SetMaxEstablishedConns(1)
1045 tts[0].SetMaxEstablishedConns(0)
1047 tts[0].SetMaxEstablishedConns(1)
1050 tts[0].SetMaxEstablishedConns(2)
1055 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1056 os.MkdirAll(dir, 0770)
1057 file, err := os.Create(filepath.Join(dir, name))
1058 require.NoError(t, err)
1059 file.Write([]byte(name))
1061 mi := metainfo.MetaInfo{}
1063 info := metainfo.Info{PieceLength: 256 * 1024}
1064 err = info.BuildFromFilePath(filepath.Join(dir, name))
1065 require.NoError(t, err)
1066 mi.InfoBytes, err = bencode.Marshal(info)
1067 require.NoError(t, err)
1068 magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1069 tr, err := cl.AddTorrent(&mi)
1070 require.NoError(t, err)
1071 require.True(t, tr.Seeding())
1076 // https://github.com/anacrolix/torrent/issues/114
1077 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1078 cfg := TestingConfig()
1079 cfg.DisableUTP = true
1081 cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1083 cfg.ForceEncryption = true
1084 os.Mkdir(cfg.DataDir, 0755)
1085 server, err := NewClient(cfg)
1086 require.NoError(t, err)
1087 defer server.Close()
1088 testutil.ExportStatusWriter(server, "s")
1089 magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1090 makeMagnet(t, server, cfg.DataDir, "test2")
1091 cfg = TestingConfig()
1092 cfg.DisableUTP = true
1093 cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1095 cfg.ForceEncryption = true
1096 client, err := NewClient(cfg)
1097 require.NoError(t, err)
1098 defer client.Close()
1099 testutil.ExportStatusWriter(client, "c")
1100 tr, err := client.AddMagnet(magnet1)
1101 require.NoError(t, err)
1102 tr.AddPeers([]Peer{{
1103 IP: missinggo.AddrIP(server.ListenAddr()),
1104 Port: missinggo.AddrPort(server.ListenAddr()),
1111 func TestClientAddressInUse(t *testing.T) {
1112 s, _ := NewUtpSocket("udp", ":50007")
1116 cfg := TestingConfig()
1117 cfg.ListenAddr = ":50007"
1118 cl, err := NewClient(cfg)
1119 require.Error(t, err)