18 _ "github.com/anacrolix/envpprof"
19 "github.com/anacrolix/missinggo"
20 "github.com/anacrolix/missinggo/filecache"
21 "github.com/anacrolix/missinggo/pubsub"
22 "github.com/anacrolix/utp"
23 "github.com/bradfitz/iter"
24 "github.com/stretchr/testify/assert"
25 "github.com/stretchr/testify/require"
27 "github.com/anacrolix/torrent/bencode"
28 "github.com/anacrolix/torrent/dht"
29 "github.com/anacrolix/torrent/internal/testutil"
30 "github.com/anacrolix/torrent/iplist"
31 "github.com/anacrolix/torrent/metainfo"
32 "github.com/anacrolix/torrent/storage"
36 log.SetFlags(log.LstdFlags | log.Llongfile)
39 var TestingConfig = Config{
40 ListenAddr: "localhost:0",
42 DisableTrackers: true,
44 DHTConfig: dht.ServerConfig{
45 NoDefaultBootstrap: true,
49 func TestClientDefault(t *testing.T) {
50 cl, err := NewClient(&TestingConfig)
51 require.NoError(t, err)
55 func TestAddDropTorrent(t *testing.T) {
56 cl, err := NewClient(&TestingConfig)
57 require.NoError(t, err)
59 dir, mi := testutil.GreetingTestTorrent()
60 defer os.RemoveAll(dir)
61 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
62 require.NoError(t, err)
64 tt.SetMaxEstablishedConns(0)
65 tt.SetMaxEstablishedConns(1)
69 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
73 func TestAddTorrentNoUsableURLs(t *testing.T) {
77 func TestAddPeersToUnknownTorrent(t *testing.T) {
81 func TestPieceHashSize(t *testing.T) {
82 if pieceHash.Size() != 20 {
87 func TestTorrentInitialState(t *testing.T) {
88 dir, mi := testutil.GreetingTestTorrent()
89 defer os.RemoveAll(dir)
91 infoHash: mi.Info.Hash(),
92 pieceStateChanges: pubsub.NewPubSub(),
95 tor.storageOpener = storage.NewFile("/dev/null")
96 // Needed to lock for asynchronous piece verification.
98 err := tor.setInfoBytes(mi.Info.Bytes)
99 require.NoError(t, err)
100 require.Len(t, tor.pieces, 3)
101 tor.pendAllChunkSpecs(0)
103 assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
105 assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
108 func TestUnmarshalPEXMsg(t *testing.T) {
109 var m peerExchangeMessage
110 if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
113 if len(m.Added) != 2 {
116 if m.Added[0].Port != 0x506 {
121 func TestReducedDialTimeout(t *testing.T) {
122 for _, _case := range []struct {
126 ExpectedReduced time.Duration
128 {nominalDialTimeout, 40, 0, nominalDialTimeout},
129 {nominalDialTimeout, 40, 1, nominalDialTimeout},
130 {nominalDialTimeout, 40, 39, nominalDialTimeout},
131 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
132 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
133 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
135 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
136 expected := _case.ExpectedReduced
137 if expected < minDialTimeout {
138 expected = minDialTimeout
140 if reduced != expected {
141 t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
146 func TestUTPRawConn(t *testing.T) {
147 l, err := utp.NewSocket("udp", "")
160 // Connect a UTP peer to see if the RawConn will still work.
161 s, _ := utp.NewSocket("udp", "")
163 utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
165 t.Fatalf("error dialing utp listener: %s", err)
167 defer utpPeer.Close()
168 peer, err := net.ListenPacket("udp", ":0")
175 // How many messages to send. I've set this to double the channel buffer
176 // size in the raw packetConn.
178 readerStopped := make(chan struct{})
179 // The reader goroutine.
181 defer close(readerStopped)
182 b := make([]byte, 500)
183 for i := 0; i < N; i++ {
184 n, _, err := l.ReadFrom(b)
186 t.Fatalf("error reading from raw conn: %s", err)
190 fmt.Sscan(string(b[:n]), &d)
192 log.Printf("got wrong number: expected %d, got %d", i, d)
196 udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
200 for i := 0; i < N; i++ {
201 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
205 time.Sleep(time.Microsecond)
208 case <-readerStopped:
209 case <-time.After(time.Second):
210 t.Fatal("reader timed out")
212 if msgsReceived != N {
213 t.Fatalf("messages received: %d", msgsReceived)
217 func TestTwoClientsArbitraryPorts(t *testing.T) {
218 for i := 0; i < 2; i++ {
219 cl, err := NewClient(&TestingConfig)
227 func TestAddDropManyTorrents(t *testing.T) {
228 cl, err := NewClient(&TestingConfig)
229 require.NoError(t, err)
231 for i := range iter.N(1000) {
233 binary.PutVarint(spec.InfoHash[:], int64(i))
234 tt, new, err := cl.AddTorrentSpec(&spec)
235 assert.NoError(t, err)
241 func TestClientTransferDefault(t *testing.T) {
242 testClientTransfer(t, testClientTransferParams{
243 ExportClientStatus: true,
244 LeecherFileCachePieceStorageFactory: fileCachePieceResourceStorage,
248 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.Client {
249 return storage.NewResourcePieces(fc.AsResourceProvider())
252 func fileCachePieceFileStorage(fc *filecache.Cache) storage.Client {
253 return storage.NewFileStorePieces(fc.AsFileStore())
256 func TestClientTransferSmallCache(t *testing.T) {
257 testClientTransfer(t, testClientTransferParams{
258 SetLeecherStorageCapacity: true,
259 // Going below the piece length means it can't complete a piece so
260 // that it can be hashed.
261 LeecherStorageCapacity: 5,
263 // Can't readahead too far or the cache will thrash and drop data we
266 ExportClientStatus: true,
267 LeecherFileCachePieceStorageFactory: fileCachePieceResourceStorage,
271 func TestClientTransferVarious(t *testing.T) {
272 for _, lsf := range []func(*filecache.Cache) storage.Client{
273 fileCachePieceFileStorage,
274 fileCachePieceResourceStorage,
276 for _, ss := range []func(string) storage.Client{
280 for _, responsive := range []bool{false, true} {
281 testClientTransfer(t, testClientTransferParams{
282 Responsive: responsive,
284 LeecherFileCachePieceStorageFactory: lsf,
286 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
287 testClientTransfer(t, testClientTransferParams{
289 Responsive: responsive,
291 Readahead: readahead,
292 LeecherFileCachePieceStorageFactory: lsf,
300 type testClientTransferParams struct {
304 ExportClientStatus bool
305 SetLeecherStorageCapacity bool
306 LeecherStorageCapacity int64
307 LeecherFileCachePieceStorageFactory func(*filecache.Cache) storage.Client
308 SeederStorage func(string) storage.Client
311 // Creates a seeder and a leecher, and ensures the data transfers when a read
312 // is attempted on the leecher.
313 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
314 greetingTempDir, mi := testutil.GreetingTestTorrent()
315 defer os.RemoveAll(greetingTempDir)
318 if ps.SeederStorage != nil {
319 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
321 cfg.DataDir = greetingTempDir
323 seeder, err := NewClient(&cfg)
324 require.NoError(t, err)
326 if ps.ExportClientStatus {
327 testutil.ExportStatusWriter(seeder, "s")
329 seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
330 require.NoError(t, err)
332 leecherDataDir, err := ioutil.TempDir("", "")
333 require.NoError(t, err)
334 defer os.RemoveAll(leecherDataDir)
335 fc, err := filecache.NewCache(leecherDataDir)
336 require.NoError(t, err)
337 if ps.SetLeecherStorageCapacity {
338 fc.SetCapacity(ps.LeecherStorageCapacity)
340 cfg.DefaultStorage = ps.LeecherFileCachePieceStorageFactory(fc)
341 leecher, err := NewClient(&cfg)
342 require.NoError(t, err)
343 defer leecher.Close()
344 if ps.ExportClientStatus {
345 testutil.ExportStatusWriter(leecher, "l")
347 leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
348 ret = TorrentSpecFromMetaInfo(mi)
350 ret.Storage = storage.NewFile(leecherDataDir)
353 require.NoError(t, err)
355 leecherGreeting.AddPeers([]Peer{
357 IP: missinggo.AddrIP(seeder.ListenAddr()),
358 Port: missinggo.AddrPort(seeder.ListenAddr()),
361 r := leecherGreeting.NewReader()
367 r.SetReadahead(ps.Readahead)
369 assertReadAllGreeting(t, r)
370 // After one read through, we can assume certain torrent statistics.
371 assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesSent)
372 assert.EqualValues(t, 8, seederTorrent.Stats().ChunksSent)
373 // This is not a strict requirement. It is however interesting to follow.
374 assert.EqualValues(t, 261, seederTorrent.Stats().BytesSent)
375 // Read through again for the cases where the torrent data size exceed the
376 // size of the cache.
377 assertReadAllGreeting(t, r)
380 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
381 pos, err := r.Seek(0, os.SEEK_SET)
382 assert.NoError(t, err)
383 assert.EqualValues(t, 0, pos)
384 _greeting, err := ioutil.ReadAll(r)
385 assert.NoError(t, err)
386 assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
389 // Check that after completing leeching, a leecher transitions to a seeding
390 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
391 func TestSeedAfterDownloading(t *testing.T) {
392 greetingTempDir, mi := testutil.GreetingTestTorrent()
393 defer os.RemoveAll(greetingTempDir)
396 cfg.DataDir = greetingTempDir
397 seeder, err := NewClient(&cfg)
398 require.NoError(t, err)
400 testutil.ExportStatusWriter(seeder, "s")
401 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
402 cfg.DataDir, err = ioutil.TempDir("", "")
403 require.NoError(t, err)
404 defer os.RemoveAll(cfg.DataDir)
405 leecher, err := NewClient(&cfg)
406 require.NoError(t, err)
407 defer leecher.Close()
408 testutil.ExportStatusWriter(leecher, "l")
410 // cfg.TorrentDataOpener = nil
411 cfg.DataDir, err = ioutil.TempDir("", "")
412 require.NoError(t, err)
413 defer os.RemoveAll(cfg.DataDir)
414 leecherLeecher, _ := NewClient(&cfg)
415 defer leecherLeecher.Close()
416 testutil.ExportStatusWriter(leecherLeecher, "ll")
417 leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
418 ret = TorrentSpecFromMetaInfo(mi)
422 llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
423 ret = TorrentSpecFromMetaInfo(mi)
427 // Simultaneously DownloadAll in Leecher, and read the contents
428 // consecutively in LeecherLeecher. This non-deterministically triggered a
429 // case where the leecher wouldn't unchoke the LeecherLeecher.
430 var wg sync.WaitGroup
436 b, err := ioutil.ReadAll(r)
437 require.NoError(t, err)
438 assert.EqualValues(t, testutil.GreetingFileContents, b)
440 leecherGreeting.AddPeers([]Peer{
442 IP: missinggo.AddrIP(seeder.ListenAddr()),
443 Port: missinggo.AddrPort(seeder.ListenAddr()),
446 IP: missinggo.AddrIP(leecherLeecher.ListenAddr()),
447 Port: missinggo.AddrPort(leecherLeecher.ListenAddr()),
453 leecherGreeting.DownloadAll()
459 func TestMergingTrackersByAddingSpecs(t *testing.T) {
460 cl, err := NewClient(&TestingConfig)
461 require.NoError(t, err)
463 spec := TorrentSpec{}
464 T, new, _ := cl.AddTorrentSpec(&spec)
468 spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
469 _, new, _ = cl.AddTorrentSpec(&spec)
471 assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
472 // Because trackers are disabled in TestingConfig.
473 assert.EqualValues(t, 0, len(T.trackerAnnouncers))
476 type badStorage struct{}
478 func (bs badStorage) OpenTorrent(*metainfo.InfoEx) (storage.Torrent, error) {
482 func (bs badStorage) Close() error {
486 func (bs badStorage) Piece(p metainfo.Piece) storage.Piece {
487 return badStoragePiece{p}
490 type badStoragePiece struct {
494 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
498 func (p badStoragePiece) GetIsComplete() bool {
502 func (p badStoragePiece) MarkComplete() error {
503 return errors.New("psyyyyyyyche")
506 func (p badStoragePiece) randomlyTruncatedDataString() string {
507 return "hello, world\n"[:rand.Intn(14)]
510 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
511 r := strings.NewReader(p.randomlyTruncatedDataString())
512 return r.ReadAt(b, off+p.p.Offset())
515 // We read from a piece which is marked completed, but is missing data.
516 func TestCompletedPieceWrongSize(t *testing.T) {
518 cfg.DefaultStorage = badStorage{}
519 cl, err := NewClient(&cfg)
520 require.NoError(t, err)
522 ie := metainfo.InfoEx{
525 Pieces: make([]byte, 20),
526 Files: []metainfo.FileInfo{
527 metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
532 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
536 require.NoError(t, err)
541 b, err := ioutil.ReadAll(r)
543 assert.NoError(t, err)
546 func BenchmarkAddLargeTorrent(b *testing.B) {
548 cfg.DisableTCP = true
549 cfg.DisableUTP = true
550 cfg.ListenAddr = "redonk"
551 cl, _ := NewClient(&cfg)
553 for range iter.N(b.N) {
554 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
562 func TestResponsive(t *testing.T) {
563 seederDataDir, mi := testutil.GreetingTestTorrent()
564 defer os.RemoveAll(seederDataDir)
567 cfg.DataDir = seederDataDir
568 seeder, err := NewClient(&cfg)
571 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
572 leecherDataDir, err := ioutil.TempDir("", "")
574 defer os.RemoveAll(leecherDataDir)
576 cfg.DataDir = leecherDataDir
577 leecher, err := NewClient(&cfg)
579 defer leecher.Close()
580 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
581 ret = TorrentSpecFromMetaInfo(mi)
585 leecherTorrent.AddPeers([]Peer{
587 IP: missinggo.AddrIP(seeder.ListenAddr()),
588 Port: missinggo.AddrPort(seeder.ListenAddr()),
591 reader := leecherTorrent.NewReader()
593 reader.SetReadahead(0)
594 reader.SetResponsive()
596 _, err = reader.Seek(3, os.SEEK_SET)
597 require.NoError(t, err)
598 _, err = io.ReadFull(reader, b)
600 assert.EqualValues(t, "lo", string(b))
601 _, err = reader.Seek(11, os.SEEK_SET)
602 require.NoError(t, err)
603 n, err := io.ReadFull(reader, b)
605 assert.EqualValues(t, 2, n)
606 assert.EqualValues(t, "d\n", string(b))
609 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
610 seederDataDir, mi := testutil.GreetingTestTorrent()
611 defer os.RemoveAll(seederDataDir)
614 cfg.DataDir = seederDataDir
615 seeder, err := NewClient(&cfg)
618 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
619 leecherDataDir, err := ioutil.TempDir("", "")
621 defer os.RemoveAll(leecherDataDir)
623 cfg.DataDir = leecherDataDir
624 leecher, err := NewClient(&cfg)
626 defer leecher.Close()
627 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
628 ret = TorrentSpecFromMetaInfo(mi)
632 leecherTorrent.AddPeers([]Peer{
634 IP: missinggo.AddrIP(seeder.ListenAddr()),
635 Port: missinggo.AddrPort(seeder.ListenAddr()),
638 reader := leecherTorrent.NewReader()
640 reader.SetReadahead(0)
641 reader.SetResponsive()
643 _, err = reader.Seek(3, os.SEEK_SET)
644 require.NoError(t, err)
645 _, err = io.ReadFull(reader, b)
647 assert.EqualValues(t, "lo", string(b))
648 go leecherTorrent.Drop()
649 _, err = reader.Seek(11, os.SEEK_SET)
650 require.NoError(t, err)
651 n, err := reader.Read(b)
652 assert.EqualError(t, err, "torrent closed")
653 assert.EqualValues(t, 0, n)
656 func TestDHTInheritBlocklist(t *testing.T) {
657 ipl := iplist.New(nil)
658 require.NotNil(t, ipl)
660 cfg.IPBlocklist = ipl
662 cl, err := NewClient(&cfg)
663 require.NoError(t, err)
665 require.Equal(t, ipl, cl.DHT().IPBlocklist())
668 // Check that stuff is merged in subsequent AddTorrentSpec for the same
670 func TestAddTorrentSpecMerging(t *testing.T) {
671 cl, err := NewClient(&TestingConfig)
672 require.NoError(t, err)
674 dir, mi := testutil.GreetingTestTorrent()
675 defer os.RemoveAll(dir)
676 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
677 InfoHash: mi.Info.Hash(),
679 require.NoError(t, err)
681 require.Nil(t, tt.Info())
682 _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
683 require.NoError(t, err)
684 require.False(t, new)
685 require.NotNil(t, tt.Info())
688 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
689 dir, mi := testutil.GreetingTestTorrent()
691 cl, _ := NewClient(&TestingConfig)
693 tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
694 InfoHash: mi.Info.Hash(),
697 assert.EqualValues(t, 0, len(cl.Torrents()))
705 func writeTorrentData(ts storage.Torrent, info *metainfo.InfoEx, b []byte) {
706 for i := range iter.N(info.NumPieces()) {
707 n, _ := ts.Piece(info.Piece(i)).WriteAt(b, 0)
712 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.Client) {
713 fileCacheDir, err := ioutil.TempDir("", "")
714 require.NoError(t, err)
715 defer os.RemoveAll(fileCacheDir)
716 fileCache, err := filecache.NewCache(fileCacheDir)
717 require.NoError(t, err)
718 greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
719 defer os.RemoveAll(greetingDataTempDir)
720 filePieceStore := csf(fileCache)
721 greetingData, err := filePieceStore.OpenTorrent(&greetingMetainfo.Info)
722 require.NoError(t, err)
723 writeTorrentData(greetingData, &greetingMetainfo.Info, []byte(testutil.GreetingFileContents))
724 // require.Equal(t, len(testutil.GreetingFileContents), written)
725 // require.NoError(t, err)
726 for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
727 p := greetingMetainfo.Info.Piece(i)
728 if alreadyCompleted {
729 err := greetingData.Piece(p).MarkComplete()
730 assert.NoError(t, err)
734 // TODO: Disable network option?
735 cfg.DisableTCP = true
736 cfg.DisableUTP = true
737 cfg.DefaultStorage = filePieceStore
738 cl, err := NewClient(&cfg)
739 require.NoError(t, err)
741 tt, err := cl.AddTorrent(greetingMetainfo)
742 require.NoError(t, err)
743 psrs := tt.PieceStateRuns()
744 assert.Len(t, psrs, 1)
745 assert.EqualValues(t, 3, psrs[0].Length)
746 assert.Equal(t, alreadyCompleted, psrs[0].Complete)
747 if alreadyCompleted {
749 b, err := ioutil.ReadAll(r)
750 assert.NoError(t, err)
751 assert.EqualValues(t, testutil.GreetingFileContents, b)
755 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
756 testAddTorrentPriorPieceCompletion(t, true, fileCachePieceFileStorage)
757 testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
760 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
761 testAddTorrentPriorPieceCompletion(t, false, fileCachePieceFileStorage)
762 testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
765 func TestAddMetainfoWithNodes(t *testing.T) {
768 // For now, we want to just jam the nodes into the table, without
769 // verifying them first. Also the DHT code doesn't support mixing secure
770 // and insecure nodes if security is enabled (yet).
771 cfg.DHTConfig.NoSecurity = true
772 cl, err := NewClient(&cfg)
773 require.NoError(t, err)
775 assert.EqualValues(t, cl.DHT().NumNodes(), 0)
776 tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
777 require.NoError(t, err)
778 assert.Len(t, tt.metainfo.AnnounceList, 5)
779 assert.EqualValues(t, 6, cl.DHT().NumNodes())
782 type testDownloadCancelParams struct {
783 ExportClientStatus bool
784 SetLeecherStorageCapacity bool
785 LeecherStorageCapacity int64
789 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
790 greetingTempDir, mi := testutil.GreetingTestTorrent()
791 defer os.RemoveAll(greetingTempDir)
794 cfg.DataDir = greetingTempDir
795 seeder, err := NewClient(&cfg)
796 require.NoError(t, err)
798 if ps.ExportClientStatus {
799 testutil.ExportStatusWriter(seeder, "s")
801 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
802 leecherDataDir, err := ioutil.TempDir("", "")
803 require.NoError(t, err)
804 defer os.RemoveAll(leecherDataDir)
805 fc, err := filecache.NewCache(leecherDataDir)
806 require.NoError(t, err)
807 if ps.SetLeecherStorageCapacity {
808 fc.SetCapacity(ps.LeecherStorageCapacity)
810 cfg.DefaultStorage = storage.NewFileStorePieces(fc.AsFileStore())
811 cfg.DataDir = leecherDataDir
812 leecher, _ := NewClient(&cfg)
813 defer leecher.Close()
814 if ps.ExportClientStatus {
815 testutil.ExportStatusWriter(leecher, "l")
817 leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
818 ret = TorrentSpecFromMetaInfo(mi)
822 require.NoError(t, err)
824 psc := leecherGreeting.SubscribePieceStateChanges()
826 leecherGreeting.DownloadAll()
828 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
830 leecherGreeting.AddPeers([]Peer{
832 IP: missinggo.AddrIP(seeder.ListenAddr()),
833 Port: missinggo.AddrPort(seeder.ListenAddr()),
836 completes := make(map[int]bool, 3)
839 // started := time.Now()
841 case _v := <-psc.Values:
842 // log.Print(time.Since(started))
843 v := _v.(PieceStateChange)
844 completes[v.Index] = v.Complete
845 case <-time.After(100 * time.Millisecond):
850 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
852 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
857 func TestTorrentDownloadAll(t *testing.T) {
858 testDownloadCancel(t, testDownloadCancelParams{})
861 func TestTorrentDownloadAllThenCancel(t *testing.T) {
862 testDownloadCancel(t, testDownloadCancelParams{
867 // Ensure that it's an error for a peer to send an invalid have message.
868 func TestPeerInvalidHave(t *testing.T) {
869 cl, err := NewClient(&TestingConfig)
870 require.NoError(t, err)
872 ie := metainfo.InfoEx{
875 Pieces: make([]byte, 20),
876 Files: []metainfo.FileInfo{{Length: 1}},
880 tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
884 require.NoError(t, err)
890 assert.NoError(t, cn.peerSentHave(0))
891 assert.Error(t, cn.peerSentHave(1))
894 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
895 greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
896 defer os.RemoveAll(greetingTempDir)
898 cfg.DataDir = greetingTempDir
899 seeder, err := NewClient(&TestingConfig)
900 require.NoError(t, err)
901 seeder.AddTorrentSpec(&TorrentSpec{
902 Info: &greetingMetainfo.Info,
906 func TestPrepareTrackerAnnounce(t *testing.T) {
908 blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
909 require.NoError(t, err)
910 assert.False(t, blocked)
911 assert.EqualValues(t, "localhost:1234", host)
912 assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
915 // Check that when the listen port is 0, all the protocols listened on have
916 // the same port, and it isn't zero.
917 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
918 cl, err := NewClient(&TestingConfig)
919 require.NoError(t, err)
921 assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
922 assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
925 func TestClientDynamicListenTCPOnly(t *testing.T) {
927 cfg.DisableUTP = true
928 cl, err := NewClient(&cfg)
929 require.NoError(t, err)
931 assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
932 assert.Nil(t, cl.utpSock)
935 func TestClientDynamicListenUTPOnly(t *testing.T) {
937 cfg.DisableTCP = true
938 cl, err := NewClient(&cfg)
939 require.NoError(t, err)
941 assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
942 assert.Nil(t, cl.tcpListener)
945 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
947 cfg.DisableTCP = true
948 cfg.DisableUTP = true
949 cl, err := NewClient(&cfg)
950 require.NoError(t, err)
953 assert.Nil(t, cl.ListenAddr())
956 func addClientPeer(t *Torrent, cl *Client) {
959 IP: missinggo.AddrIP(cl.ListenAddr()),
960 Port: missinggo.AddrPort(cl.ListenAddr()),
965 func printConnPeerCounts(t *Torrent) {
967 log.Println(len(t.conns), len(t.peers))
971 func totalConns(tts []*Torrent) (ret int) {
972 for _, tt := range tts {
980 func TestSetMaxEstablishedConn(t *testing.T) {
982 ih := testutil.GreetingMetaInfo().Info.Hash()
984 cfg.DisableUTP = true
985 for i := range iter.N(3) {
986 cl, err := NewClient(&cfg)
987 require.NoError(t, err)
989 tt, _ := cl.AddTorrentInfoHash(ih)
990 tt.SetMaxEstablishedConns(2)
991 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
992 tts = append(tts, tt)
995 for i, tt := range tts {
996 for _, _tt := range tts[:i] {
997 addClientPeer(tt, _tt.cl)
1001 waitTotalConns := func(num int) {
1002 for totalConns(tts) != num {
1003 time.Sleep(time.Millisecond)
1008 tts[0].SetMaxEstablishedConns(1)
1010 tts[0].SetMaxEstablishedConns(0)
1012 tts[0].SetMaxEstablishedConns(1)
1015 tts[0].SetMaxEstablishedConns(2)