19 _ "github.com/anacrolix/envpprof"
20 "github.com/anacrolix/missinggo"
21 . "github.com/anacrolix/missinggo"
22 "github.com/anacrolix/missinggo/filecache"
23 "github.com/anacrolix/utp"
24 "github.com/bradfitz/iter"
25 "github.com/stretchr/testify/assert"
26 "github.com/stretchr/testify/require"
28 "github.com/anacrolix/torrent/bencode"
29 "github.com/anacrolix/torrent/data/pieceStore"
30 "github.com/anacrolix/torrent/data/pieceStore/dataBackend/fileCache"
31 "github.com/anacrolix/torrent/dht"
32 "github.com/anacrolix/torrent/internal/testutil"
33 "github.com/anacrolix/torrent/iplist"
34 "github.com/anacrolix/torrent/metainfo"
38 log.SetFlags(log.LstdFlags | log.Llongfile)
41 var TestingConfig = Config{
42 ListenAddr: "localhost:0",
44 DisableTrackers: true,
45 NoDefaultBlocklist: true,
46 DisableMetainfoCache: true,
47 DataDir: filepath.Join(os.TempDir(), "anacrolix"),
48 DHTConfig: dht.ServerConfig{
49 NoDefaultBootstrap: true,
53 func TestClientDefault(t *testing.T) {
54 cl, err := NewClient(&TestingConfig)
61 func TestAddDropTorrent(t *testing.T) {
62 cl, err := NewClient(&TestingConfig)
67 dir, mi := testutil.GreetingTestTorrent()
68 defer os.RemoveAll(dir)
69 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
79 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
83 func TestAddTorrentNoUsableURLs(t *testing.T) {
87 func TestAddPeersToUnknownTorrent(t *testing.T) {
91 func TestPieceHashSize(t *testing.T) {
92 if pieceHash.Size() != 20 {
97 func TestTorrentInitialState(t *testing.T) {
98 dir, mi := testutil.GreetingTestTorrent()
99 defer os.RemoveAll(dir)
100 tor := newTorrent(func() (ih InfoHash) {
101 missinggo.CopyExact(ih[:], mi.Info.Hash)
105 err := tor.setMetadata(&mi.Info.Info, mi.Info.Bytes)
109 if len(tor.Pieces) != 3 {
110 t.Fatal("wrong number of pieces")
112 tor.pendAllChunkSpecs(0)
113 assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
114 assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
117 func TestUnmarshalPEXMsg(t *testing.T) {
118 var m peerExchangeMessage
119 if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
122 if len(m.Added) != 2 {
125 if m.Added[0].Port != 0x506 {
130 func TestReducedDialTimeout(t *testing.T) {
131 for _, _case := range []struct {
135 ExpectedReduced time.Duration
137 {nominalDialTimeout, 40, 0, nominalDialTimeout},
138 {nominalDialTimeout, 40, 1, nominalDialTimeout},
139 {nominalDialTimeout, 40, 39, nominalDialTimeout},
140 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
141 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
142 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
144 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
145 expected := _case.ExpectedReduced
146 if expected < minDialTimeout {
147 expected = minDialTimeout
149 if reduced != expected {
150 t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
155 func TestUTPRawConn(t *testing.T) {
156 l, err := utp.NewSocket("udp", "")
169 // Connect a UTP peer to see if the RawConn will still work.
170 s, _ := utp.NewSocket("udp", "")
172 utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
174 t.Fatalf("error dialing utp listener: %s", err)
176 defer utpPeer.Close()
177 peer, err := net.ListenPacket("udp", ":0")
184 // How many messages to send. I've set this to double the channel buffer
185 // size in the raw packetConn.
187 readerStopped := make(chan struct{})
188 // The reader goroutine.
190 defer close(readerStopped)
191 b := make([]byte, 500)
192 for i := 0; i < N; i++ {
193 n, _, err := l.ReadFrom(b)
195 t.Fatalf("error reading from raw conn: %s", err)
199 fmt.Sscan(string(b[:n]), &d)
201 log.Printf("got wrong number: expected %d, got %d", i, d)
205 udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
209 for i := 0; i < N; i++ {
210 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
214 time.Sleep(time.Microsecond)
217 case <-readerStopped:
218 case <-time.After(time.Second):
219 t.Fatal("reader timed out")
221 if msgsReceived != N {
222 t.Fatalf("messages received: %d", msgsReceived)
226 func TestTwoClientsArbitraryPorts(t *testing.T) {
227 for i := 0; i < 2; i++ {
228 cl, err := NewClient(&TestingConfig)
236 func TestAddDropManyTorrents(t *testing.T) {
237 cl, _ := NewClient(&TestingConfig)
239 for i := range iter.N(1000) {
241 binary.PutVarint(spec.InfoHash[:], int64(i))
242 tt, new, err := cl.AddTorrentSpec(&spec)
253 func TestClientTransferDefault(t *testing.T) {
254 testClientTransfer(t, testClientTransferParams{})
257 func TestClientTransferSmallCache(t *testing.T) {
258 testClientTransfer(t, testClientTransferParams{
259 SetLeecherStorageCapacity: true,
260 // Going below the piece length means it can't complete a piece so
261 // that it can be hashed.
262 LeecherStorageCapacity: 5,
264 // Can't readahead too far or the cache will thrash and drop data we
267 ExportClientStatus: true,
271 func TestClientTransferVarious(t *testing.T) {
272 for _, responsive := range []bool{false, true} {
273 testClientTransfer(t, testClientTransferParams{
274 Responsive: responsive,
276 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
277 testClientTransfer(t, testClientTransferParams{
278 Responsive: responsive,
280 Readahead: readahead,
286 type testClientTransferParams struct {
290 ExportClientStatus bool
291 SetLeecherStorageCapacity bool
292 LeecherStorageCapacity int64
295 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
296 greetingTempDir, mi := testutil.GreetingTestTorrent()
297 defer os.RemoveAll(greetingTempDir)
300 cfg.DataDir = greetingTempDir
301 seeder, err := NewClient(&cfg)
302 require.NoError(t, err)
304 if ps.ExportClientStatus {
305 testutil.ExportStatusWriter(seeder, "s")
307 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
308 leecherDataDir, err := ioutil.TempDir("", "")
309 require.NoError(t, err)
310 defer os.RemoveAll(leecherDataDir)
311 cfg.TorrentDataOpener = func() TorrentDataOpener {
312 fc, err := filecache.NewCache(leecherDataDir)
313 require.NoError(t, err)
314 if ps.SetLeecherStorageCapacity {
315 fc.SetCapacity(ps.LeecherStorageCapacity)
317 store := pieceStore.New(fileCacheDataBackend.New(fc))
318 return func(mi *metainfo.Info) Data {
319 return store.OpenTorrentData(mi)
322 leecher, err := NewClient(&cfg)
323 require.NoError(t, err)
324 defer leecher.Close()
325 if ps.ExportClientStatus {
326 testutil.ExportStatusWriter(leecher, "l")
328 leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
329 ret = TorrentSpecFromMetaInfo(mi)
333 require.NoError(t, err)
335 leecherGreeting.AddPeers([]Peer{
337 IP: missinggo.AddrIP(seeder.ListenAddr()),
338 Port: missinggo.AddrPort(seeder.ListenAddr()),
341 r := leecherGreeting.NewReader()
347 r.SetReadahead(ps.Readahead)
349 for range iter.N(2) {
350 pos, err := r.Seek(0, os.SEEK_SET)
351 assert.NoError(t, err)
352 assert.EqualValues(t, 0, pos)
353 _greeting, err := ioutil.ReadAll(r)
354 assert.NoError(t, err)
355 assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
359 // Check that after completing leeching, a leecher transitions to a seeding
360 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
361 func TestSeedAfterDownloading(t *testing.T) {
362 greetingTempDir, mi := testutil.GreetingTestTorrent()
363 defer os.RemoveAll(greetingTempDir)
366 cfg.DataDir = greetingTempDir
367 seeder, err := NewClient(&cfg)
369 testutil.ExportStatusWriter(seeder, "s")
370 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
371 cfg.DataDir, err = ioutil.TempDir("", "")
372 require.NoError(t, err)
373 defer os.RemoveAll(cfg.DataDir)
374 leecher, _ := NewClient(&cfg)
375 defer leecher.Close()
376 testutil.ExportStatusWriter(leecher, "l")
378 cfg.TorrentDataOpener = nil
379 cfg.DataDir, err = ioutil.TempDir("", "")
380 require.NoError(t, err)
381 defer os.RemoveAll(cfg.DataDir)
382 leecherLeecher, _ := NewClient(&cfg)
383 defer leecherLeecher.Close()
384 testutil.ExportStatusWriter(leecherLeecher, "ll")
385 leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
386 ret = TorrentSpecFromMetaInfo(mi)
390 llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
391 ret = TorrentSpecFromMetaInfo(mi)
395 // Simultaneously DownloadAll in Leecher, and read the contents
396 // consecutively in LeecherLeecher. This non-deterministically triggered a
397 // case where the leecher wouldn't unchoke the LeecherLeecher.
398 var wg sync.WaitGroup
404 b, err := ioutil.ReadAll(r)
405 require.NoError(t, err)
406 assert.EqualValues(t, testutil.GreetingFileContents, b)
408 leecherGreeting.AddPeers([]Peer{
410 IP: missinggo.AddrIP(seeder.ListenAddr()),
411 Port: missinggo.AddrPort(seeder.ListenAddr()),
414 IP: missinggo.AddrIP(leecherLeecher.ListenAddr()),
415 Port: missinggo.AddrPort(leecherLeecher.ListenAddr()),
421 leecherGreeting.DownloadAll()
427 func TestReadaheadPieces(t *testing.T) {
428 for _, case_ := range []struct {
429 readaheadBytes, pieceLength int64
432 {5 * 1024 * 1024, 256 * 1024, 19},
433 {5 * 1024 * 1024, 5 * 1024 * 1024, 1},
434 {5*1024*1024 - 1, 5 * 1024 * 1024, 1},
435 {5 * 1024 * 1024, 5*1024*1024 - 1, 2},
436 {0, 5 * 1024 * 1024, 0},
437 {5 * 1024 * 1024, 1048576, 4},
439 pieces := readaheadPieces(case_.readaheadBytes, case_.pieceLength)
440 assert.Equal(t, case_.readaheadPieces, pieces, "%v", case_)
444 func TestMergingTrackersByAddingSpecs(t *testing.T) {
445 cl, err := NewClient(&TestingConfig)
446 require.NoError(t, err)
448 spec := TorrentSpec{}
449 T, new, _ := cl.AddTorrentSpec(&spec)
453 spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
454 _, new, _ = cl.AddTorrentSpec(&spec)
458 assert.EqualValues(t, T.torrent.Trackers[0][0], "http://a")
459 assert.EqualValues(t, T.torrent.Trackers[1][0], "udp://b")
462 type badData struct{}
464 func (me badData) Close() {}
466 func (me badData) WriteAt(b []byte, off int64) (int, error) {
470 func (me badData) PieceComplete(piece int) bool {
474 func (me badData) PieceCompleted(piece int) error {
475 return errors.New("psyyyyyyyche")
478 func (me badData) randomlyTruncatedDataString() string {
479 return "hello, world\n"[:rand.Intn(14)]
482 func (me badData) ReadAt(b []byte, off int64) (n int, err error) {
483 r := strings.NewReader(me.randomlyTruncatedDataString())
484 return r.ReadAt(b, off)
487 // We read from a piece which is marked completed, but is missing data.
488 func TestCompletedPieceWrongSize(t *testing.T) {
490 cfg.TorrentDataOpener = func(*metainfo.Info) Data {
493 cl, _ := NewClient(&cfg)
495 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
496 Info: &metainfo.InfoEx{
499 Pieces: make([]byte, 20),
500 Files: []metainfo.FileInfo{
501 metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
506 require.NoError(t, err)
511 b, err := ioutil.ReadAll(r)
513 assert.NoError(t, err)
516 func BenchmarkAddLargeTorrent(b *testing.B) {
518 cfg.DisableTCP = true
519 cfg.DisableUTP = true
520 cfg.ListenAddr = "redonk"
521 cl, _ := NewClient(&cfg)
523 for range iter.N(b.N) {
524 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
532 func TestResponsive(t *testing.T) {
533 seederDataDir, mi := testutil.GreetingTestTorrent()
534 defer os.RemoveAll(seederDataDir)
537 cfg.DataDir = seederDataDir
538 seeder, err := NewClient(&cfg)
541 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
542 leecherDataDir, err := ioutil.TempDir("", "")
544 defer os.RemoveAll(leecherDataDir)
546 cfg.DataDir = leecherDataDir
547 leecher, err := NewClient(&cfg)
549 defer leecher.Close()
550 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
551 ret = TorrentSpecFromMetaInfo(mi)
555 leecherTorrent.AddPeers([]Peer{
557 IP: missinggo.AddrIP(seeder.ListenAddr()),
558 Port: missinggo.AddrPort(seeder.ListenAddr()),
561 reader := leecherTorrent.NewReader()
563 reader.SetReadahead(0)
564 reader.SetResponsive()
566 _, err = reader.Seek(3, os.SEEK_SET)
567 require.NoError(t, err)
568 _, err = io.ReadFull(reader, b)
570 assert.EqualValues(t, "lo", string(b))
571 _, err = reader.Seek(11, os.SEEK_SET)
572 require.NoError(t, err)
573 n, err := io.ReadFull(reader, b)
575 assert.EqualValues(t, 2, n)
576 assert.EqualValues(t, "d\n", string(b))
579 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
580 seederDataDir, mi := testutil.GreetingTestTorrent()
581 defer os.RemoveAll(seederDataDir)
584 cfg.DataDir = seederDataDir
585 seeder, err := NewClient(&cfg)
588 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
589 leecherDataDir, err := ioutil.TempDir("", "")
591 defer os.RemoveAll(leecherDataDir)
593 cfg.DataDir = leecherDataDir
594 leecher, err := NewClient(&cfg)
596 defer leecher.Close()
597 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
598 ret = TorrentSpecFromMetaInfo(mi)
602 leecherTorrent.AddPeers([]Peer{
604 IP: missinggo.AddrIP(seeder.ListenAddr()),
605 Port: missinggo.AddrPort(seeder.ListenAddr()),
608 reader := leecherTorrent.NewReader()
610 reader.SetReadahead(0)
611 reader.SetResponsive()
613 _, err = reader.Seek(3, os.SEEK_SET)
614 require.NoError(t, err)
615 _, err = io.ReadFull(reader, b)
617 assert.EqualValues(t, "lo", string(b))
618 go leecherTorrent.Drop()
619 _, err = reader.Seek(11, os.SEEK_SET)
620 require.NoError(t, err)
621 n, err := reader.Read(b)
622 assert.EqualError(t, err, "torrent closed")
623 assert.EqualValues(t, 0, n)
626 func TestDHTInheritBlocklist(t *testing.T) {
627 ipl := iplist.New(nil)
628 require.NotNil(t, ipl)
630 cfg.IPBlocklist = ipl
632 cl, err := NewClient(&cfg)
633 require.NoError(t, err)
635 require.Equal(t, ipl, cl.DHT().IPBlocklist())
638 // Check that stuff is merged in subsequent AddTorrentSpec for the same
640 func TestAddTorrentSpecMerging(t *testing.T) {
641 cl, err := NewClient(&TestingConfig)
642 require.NoError(t, err)
644 dir, mi := testutil.GreetingTestTorrent()
645 defer os.RemoveAll(dir)
647 missinggo.CopyExact(&ts.InfoHash, mi.Info.Hash)
648 tt, new, err := cl.AddTorrentSpec(&ts)
649 require.NoError(t, err)
651 require.Nil(t, tt.Info())
652 _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
653 require.NoError(t, err)
654 require.False(t, new)
655 require.NotNil(t, tt.Info())
658 // Check that torrent Info is obtained from the metainfo file cache.
659 func TestAddTorrentMetainfoInCache(t *testing.T) {
661 cfg.DisableMetainfoCache = false
662 cfg.ConfigDir, _ = ioutil.TempDir(os.TempDir(), "")
663 defer os.RemoveAll(cfg.ConfigDir)
664 cl, err := NewClient(&cfg)
665 require.NoError(t, err)
667 dir, mi := testutil.GreetingTestTorrent()
668 defer os.RemoveAll(dir)
669 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
670 require.NoError(t, err)
672 require.NotNil(t, tt.Info())
673 _, err = os.Stat(filepath.Join(cfg.ConfigDir, "torrents", fmt.Sprintf("%x.torrent", mi.Info.Hash)))
674 require.NoError(t, err)
675 // Contains only the infohash.
677 missinggo.CopyExact(&ts.InfoHash, mi.Info.Hash)
678 _, ok := cl.Torrent(ts.InfoHash)
681 _, ok = cl.Torrent(ts.InfoHash)
683 tt, new, err = cl.AddTorrentSpec(&ts)
684 require.NoError(t, err)
686 // Obtained from the metainfo cache.
687 require.NotNil(t, tt.Info())
690 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
691 dir, mi := testutil.GreetingTestTorrent()
693 cl, _ := NewClient(&TestingConfig)
696 CopyExact(&ts.InfoHash, mi.Info.Hash)
697 tt, _, _ := cl.AddTorrentSpec(&ts)
699 assert.EqualValues(t, 0, len(cl.Torrents()))
707 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) {
708 fileCacheDir, err := ioutil.TempDir("", "")
709 require.NoError(t, err)
710 defer os.RemoveAll(fileCacheDir)
711 fileCache, err := filecache.NewCache(fileCacheDir)
712 require.NoError(t, err)
713 greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
714 defer os.RemoveAll(greetingDataTempDir)
715 filePieceStore := pieceStore.New(fileCacheDataBackend.New(fileCache))
716 greetingData := filePieceStore.OpenTorrentData(&greetingMetainfo.Info.Info)
717 written, err := greetingData.WriteAt([]byte(testutil.GreetingFileContents), 0)
718 require.Equal(t, len(testutil.GreetingFileContents), written)
719 require.NoError(t, err)
720 for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
721 // p := greetingMetainfo.Info.Piece(i)
722 if alreadyCompleted {
723 err := greetingData.PieceCompleted(i)
724 assert.NoError(t, err)
728 // TODO: Disable network option?
729 cfg.DisableTCP = true
730 cfg.DisableUTP = true
731 cfg.TorrentDataOpener = func(mi *metainfo.Info) Data {
732 return filePieceStore.OpenTorrentData(mi)
734 cl, err := NewClient(&cfg)
735 require.NoError(t, err)
737 tt, err := cl.AddTorrent(greetingMetainfo)
738 require.NoError(t, err)
739 psrs := tt.PieceStateRuns()
740 assert.Len(t, psrs, 1)
741 assert.EqualValues(t, 3, psrs[0].Length)
742 assert.Equal(t, alreadyCompleted, psrs[0].Complete)
743 if alreadyCompleted {
745 b, err := ioutil.ReadAll(r)
746 assert.NoError(t, err)
747 assert.EqualValues(t, testutil.GreetingFileContents, b)
751 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
752 testAddTorrentPriorPieceCompletion(t, true)
755 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
756 testAddTorrentPriorPieceCompletion(t, false)
759 func TestAddMetainfoWithNodes(t *testing.T) {
762 // For now, we want to just jam the nodes into the table, without
763 // verifying them first. Also the DHT code doesn't support mixing secure
764 // and insecure nodes if security is enabled (yet).
765 cfg.DHTConfig.NoSecurity = true
766 cl, err := NewClient(&cfg)
767 require.NoError(t, err)
769 assert.EqualValues(t, cl.DHT().NumNodes(), 0)
770 tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
771 require.NoError(t, err)
772 assert.Len(t, tt.torrent.Trackers, 5)
773 assert.EqualValues(t, 6, cl.DHT().NumNodes())