20 _ "github.com/anacrolix/envpprof"
21 "github.com/anacrolix/missinggo"
22 . "github.com/anacrolix/missinggo"
23 "github.com/anacrolix/missinggo/filecache"
24 "github.com/anacrolix/utp"
25 "github.com/bradfitz/iter"
26 "github.com/stretchr/testify/assert"
27 "github.com/stretchr/testify/require"
29 "github.com/anacrolix/torrent/bencode"
30 "github.com/anacrolix/torrent/data/pieceStore"
31 "github.com/anacrolix/torrent/data/pieceStore/dataBackend/fileCache"
32 "github.com/anacrolix/torrent/dht"
33 "github.com/anacrolix/torrent/internal/testutil"
34 "github.com/anacrolix/torrent/iplist"
35 "github.com/anacrolix/torrent/metainfo"
39 log.SetFlags(log.LstdFlags | log.Llongfile)
42 var TestingConfig = Config{
43 ListenAddr: "localhost:0",
45 DisableTrackers: true,
46 NoDefaultBlocklist: true,
47 DisableMetainfoCache: true,
48 DataDir: filepath.Join(os.TempDir(), "anacrolix"),
49 DHTConfig: dht.ServerConfig{
50 NoDefaultBootstrap: true,
54 func TestClientDefault(t *testing.T) {
55 cl, err := NewClient(&TestingConfig)
62 func TestAddDropTorrent(t *testing.T) {
63 cl, err := NewClient(&TestingConfig)
68 dir, mi := testutil.GreetingTestTorrent()
69 defer os.RemoveAll(dir)
70 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
80 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
84 func TestAddTorrentNoUsableURLs(t *testing.T) {
88 func TestAddPeersToUnknownTorrent(t *testing.T) {
92 func TestPieceHashSize(t *testing.T) {
93 if pieceHash.Size() != 20 {
98 func TestTorrentInitialState(t *testing.T) {
99 dir, mi := testutil.GreetingTestTorrent()
100 defer os.RemoveAll(dir)
101 tor := newTorrent(func() (ih InfoHash) {
102 missinggo.CopyExact(ih[:], mi.Info.Hash)
106 err := tor.setMetadata(&mi.Info.Info, mi.Info.Bytes)
110 if len(tor.Pieces) != 3 {
111 t.Fatal("wrong number of pieces")
113 tor.pendAllChunkSpecs(0)
114 assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
115 assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
118 func TestUnmarshalPEXMsg(t *testing.T) {
119 var m peerExchangeMessage
120 if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
123 if len(m.Added) != 2 {
126 if m.Added[0].Port != 0x506 {
131 func TestReducedDialTimeout(t *testing.T) {
132 for _, _case := range []struct {
136 ExpectedReduced time.Duration
138 {nominalDialTimeout, 40, 0, nominalDialTimeout},
139 {nominalDialTimeout, 40, 1, nominalDialTimeout},
140 {nominalDialTimeout, 40, 39, nominalDialTimeout},
141 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
142 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
143 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
145 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
146 expected := _case.ExpectedReduced
147 if expected < minDialTimeout {
148 expected = minDialTimeout
150 if reduced != expected {
151 t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
156 func TestUTPRawConn(t *testing.T) {
157 l, err := utp.NewSocket("udp", "")
170 // Connect a UTP peer to see if the RawConn will still work.
171 s, _ := utp.NewSocket("udp", "")
173 utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
175 t.Fatalf("error dialing utp listener: %s", err)
177 defer utpPeer.Close()
178 peer, err := net.ListenPacket("udp", ":0")
185 // How many messages to send. I've set this to double the channel buffer
186 // size in the raw packetConn.
188 readerStopped := make(chan struct{})
189 // The reader goroutine.
191 defer close(readerStopped)
192 b := make([]byte, 500)
193 for i := 0; i < N; i++ {
194 n, _, err := l.ReadFrom(b)
196 t.Fatalf("error reading from raw conn: %s", err)
200 fmt.Sscan(string(b[:n]), &d)
202 log.Printf("got wrong number: expected %d, got %d", i, d)
206 udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
210 for i := 0; i < N; i++ {
211 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
215 time.Sleep(time.Microsecond)
218 case <-readerStopped:
219 case <-time.After(time.Second):
220 t.Fatal("reader timed out")
222 if msgsReceived != N {
223 t.Fatalf("messages received: %d", msgsReceived)
227 func TestTwoClientsArbitraryPorts(t *testing.T) {
228 for i := 0; i < 2; i++ {
229 cl, err := NewClient(&TestingConfig)
237 func TestAddDropManyTorrents(t *testing.T) {
238 cl, _ := NewClient(&TestingConfig)
240 for i := range iter.N(1000) {
242 binary.PutVarint(spec.InfoHash[:], int64(i))
243 tt, new, err := cl.AddTorrentSpec(&spec)
254 func TestClientTransfer(t *testing.T) {
255 greetingTempDir, mi := testutil.GreetingTestTorrent()
256 defer os.RemoveAll(greetingTempDir)
259 cfg.DataDir = greetingTempDir
260 seeder, err := NewClient(&cfg)
265 exportClientStatus(seeder, "/TestClientTransfer/s")
266 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
267 leecherDataDir, err := ioutil.TempDir("", "")
271 defer os.RemoveAll(leecherDataDir)
272 // cfg.TorrentDataOpener = func(info *metainfo.Info) (data.Data, error) {
273 // return blob.TorrentData(info, leecherDataDir), nil
275 // blobStore := blob.NewStore(leecherDataDir)
276 // cfg.TorrentDataOpener = func(info *metainfo.Info) Data {
277 // return blobStore.OpenTorrent(info)
279 cfg.TorrentDataOpener = func() TorrentDataOpener {
280 fc, err := filecache.NewCache(leecherDataDir)
281 require.NoError(t, err)
282 store := pieceStore.New(fileCacheDataBackend.New(fc))
283 return func(mi *metainfo.Info) Data {
284 return store.OpenTorrentData(mi)
287 leecher, _ := NewClient(&cfg)
288 defer leecher.Close()
289 exportClientStatus(leecher, "/TestClientTransfer/l")
290 leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
291 ret = TorrentSpecFromMetaInfo(mi)
295 // TODO: The piece state publishing is kinda jammed in here until I have a
296 // more thorough test.
298 s := leecherGreeting.torrent.pieceStateChanges.Subscribe()
300 for v := range s.Values {
303 log.Print("finished")
305 leecherGreeting.AddPeers([]Peer{
307 IP: missinggo.AddrIP(seeder.ListenAddr()),
308 Port: missinggo.AddrPort(seeder.ListenAddr()),
311 r := leecherGreeting.NewReader()
313 _greeting, err := ioutil.ReadAll(r)
315 t.Fatalf("%q %s", string(_greeting), err)
317 greeting := string(_greeting)
318 if greeting != testutil.GreetingFileContents {
323 func exportClientStatus(cl *Client, path string) {
324 http.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
329 // Check that after completing leeching, a leecher transitions to a seeding
330 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
331 func TestSeedAfterDownloading(t *testing.T) {
332 greetingTempDir, mi := testutil.GreetingTestTorrent()
333 defer os.RemoveAll(greetingTempDir)
336 cfg.DataDir = greetingTempDir
337 seeder, err := NewClient(&cfg)
339 exportClientStatus(seeder, "/s")
340 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
341 cfg.DataDir, err = ioutil.TempDir("", "")
342 require.NoError(t, err)
343 defer os.RemoveAll(cfg.DataDir)
344 leecher, _ := NewClient(&cfg)
345 defer leecher.Close()
346 exportClientStatus(leecher, "/l")
348 cfg.TorrentDataOpener = nil
349 cfg.DataDir, err = ioutil.TempDir("", "")
350 require.NoError(t, err)
351 defer os.RemoveAll(cfg.DataDir)
352 leecherLeecher, _ := NewClient(&cfg)
353 defer leecherLeecher.Close()
354 exportClientStatus(leecherLeecher, "/ll")
355 leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
356 ret = TorrentSpecFromMetaInfo(mi)
360 llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
361 ret = TorrentSpecFromMetaInfo(mi)
365 // Simultaneously DownloadAll in Leecher, and read the contents
366 // consecutively in LeecherLeecher. This non-deterministically triggered a
367 // case where the leecher wouldn't unchoke the LeecherLeecher.
368 var wg sync.WaitGroup
374 b, err := ioutil.ReadAll(r)
375 require.NoError(t, err)
376 assert.EqualValues(t, testutil.GreetingFileContents, b)
378 leecherGreeting.AddPeers([]Peer{
380 IP: missinggo.AddrIP(seeder.ListenAddr()),
381 Port: missinggo.AddrPort(seeder.ListenAddr()),
384 IP: missinggo.AddrIP(leecherLeecher.ListenAddr()),
385 Port: missinggo.AddrPort(leecherLeecher.ListenAddr()),
391 leecherGreeting.DownloadAll()
397 func TestReadaheadPieces(t *testing.T) {
398 for _, case_ := range []struct {
399 readaheadBytes, pieceLength int64
402 {5 * 1024 * 1024, 256 * 1024, 19},
403 {5 * 1024 * 1024, 5 * 1024 * 1024, 1},
404 {5*1024*1024 - 1, 5 * 1024 * 1024, 1},
405 {5 * 1024 * 1024, 5*1024*1024 - 1, 2},
406 {0, 5 * 1024 * 1024, 0},
407 {5 * 1024 * 1024, 1048576, 4},
409 pieces := readaheadPieces(case_.readaheadBytes, case_.pieceLength)
410 assert.Equal(t, case_.readaheadPieces, pieces, "%v", case_)
414 func TestMergingTrackersByAddingSpecs(t *testing.T) {
415 cl, _ := NewClient(&TestingConfig)
417 spec := TorrentSpec{}
418 T, new, _ := cl.AddTorrentSpec(&spec)
422 spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
423 _, new, _ = cl.AddTorrentSpec(&spec)
427 assert.EqualValues(t, T.torrent.Trackers[0][0], "http://a")
428 assert.EqualValues(t, T.torrent.Trackers[1][0], "udp://b")
431 type badData struct{}
433 func (me badData) Close() {}
435 func (me badData) WriteAt(b []byte, off int64) (int, error) {
439 func (me badData) WriteSectionTo(w io.Writer, off, n int64) (int64, error) {
440 p := []byte(me.randomlyTruncatedDataString())
441 written, err := w.Write(p)
442 return int64(written), err
445 func (me badData) PieceComplete(piece int) bool {
449 func (me badData) PieceCompleted(piece int) error {
450 return errors.New("psyyyyyyyche")
453 func (me badData) randomlyTruncatedDataString() string {
454 return "hello, world\n"[:rand.Intn(14)]
457 func (me badData) ReadAt(b []byte, off int64) (n int, err error) {
458 r := strings.NewReader(me.randomlyTruncatedDataString())
459 return r.ReadAt(b, off)
462 // We read from a piece which is marked completed, but is missing data.
463 func TestCompletedPieceWrongSize(t *testing.T) {
466 cfg.TorrentDataOpener = func(*metainfo.Info) Data {
469 cl, _ := NewClient(&cfg)
471 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
472 Info: &metainfo.InfoEx{
475 Pieces: make([]byte, 20),
476 Files: []metainfo.FileInfo{
477 metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
482 require.NoError(t, err)
487 b, err := ioutil.ReadAll(r)
489 assert.NoError(t, err)
492 func BenchmarkAddLargeTorrent(b *testing.B) {
494 cfg.DisableTCP = true
495 cfg.DisableUTP = true
496 cfg.ListenAddr = "redonk"
497 cl, _ := NewClient(&cfg)
499 for range iter.N(b.N) {
500 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
508 func TestResponsive(t *testing.T) {
509 seederDataDir, mi := testutil.GreetingTestTorrent()
510 defer os.RemoveAll(seederDataDir)
513 cfg.DataDir = seederDataDir
514 seeder, err := NewClient(&cfg)
517 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
518 leecherDataDir, err := ioutil.TempDir("", "")
520 defer os.RemoveAll(leecherDataDir)
522 cfg.DataDir = leecherDataDir
523 leecher, err := NewClient(&cfg)
525 defer leecher.Close()
526 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
527 ret = TorrentSpecFromMetaInfo(mi)
531 leecherTorrent.AddPeers([]Peer{
533 IP: missinggo.AddrIP(seeder.ListenAddr()),
534 Port: missinggo.AddrPort(seeder.ListenAddr()),
537 reader := leecherTorrent.NewReader()
539 reader.SetReadahead(0)
540 reader.SetResponsive()
542 _, err = reader.Seek(3, os.SEEK_SET)
543 require.NoError(t, err)
544 _, err = io.ReadFull(reader, b)
546 assert.EqualValues(t, "lo", string(b))
547 _, err = reader.Seek(11, os.SEEK_SET)
548 require.NoError(t, err)
549 n, err := io.ReadFull(reader, b)
551 assert.EqualValues(t, 2, n)
552 assert.EqualValues(t, "d\n", string(b))
555 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
556 seederDataDir, mi := testutil.GreetingTestTorrent()
557 defer os.RemoveAll(seederDataDir)
560 cfg.DataDir = seederDataDir
561 seeder, err := NewClient(&cfg)
564 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
565 leecherDataDir, err := ioutil.TempDir("", "")
567 defer os.RemoveAll(leecherDataDir)
569 cfg.DataDir = leecherDataDir
570 leecher, err := NewClient(&cfg)
572 defer leecher.Close()
573 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
574 ret = TorrentSpecFromMetaInfo(mi)
578 leecherTorrent.AddPeers([]Peer{
580 IP: missinggo.AddrIP(seeder.ListenAddr()),
581 Port: missinggo.AddrPort(seeder.ListenAddr()),
584 reader := leecherTorrent.NewReader()
586 reader.SetReadahead(0)
587 reader.SetResponsive()
589 _, err = reader.Seek(3, os.SEEK_SET)
590 require.NoError(t, err)
591 _, err = io.ReadFull(reader, b)
593 assert.EqualValues(t, "lo", string(b))
594 go leecherTorrent.Drop()
595 _, err = reader.Seek(11, os.SEEK_SET)
596 require.NoError(t, err)
597 n, err := reader.Read(b)
598 assert.EqualError(t, err, "torrent closed")
599 assert.EqualValues(t, 0, n)
602 func TestDHTInheritBlocklist(t *testing.T) {
603 ipl := iplist.New(nil)
604 require.NotNil(t, ipl)
606 cfg.IPBlocklist = ipl
608 cl, err := NewClient(&cfg)
609 require.NoError(t, err)
611 require.Equal(t, ipl, cl.DHT().IPBlocklist())
614 // Check that stuff is merged in subsequent AddTorrentSpec for the same
616 func TestAddTorrentSpecMerging(t *testing.T) {
617 cl, err := NewClient(&TestingConfig)
618 require.NoError(t, err)
620 dir, mi := testutil.GreetingTestTorrent()
621 defer os.RemoveAll(dir)
623 missinggo.CopyExact(&ts.InfoHash, mi.Info.Hash)
624 tt, new, err := cl.AddTorrentSpec(&ts)
625 require.NoError(t, err)
627 require.Nil(t, tt.Info())
628 _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
629 require.NoError(t, err)
630 require.False(t, new)
631 require.NotNil(t, tt.Info())
634 // Check that torrent Info is obtained from the metainfo file cache.
635 func TestAddTorrentMetainfoInCache(t *testing.T) {
637 cfg.DisableMetainfoCache = false
638 cfg.ConfigDir, _ = ioutil.TempDir(os.TempDir(), "")
639 defer os.RemoveAll(cfg.ConfigDir)
640 cl, err := NewClient(&cfg)
641 require.NoError(t, err)
643 dir, mi := testutil.GreetingTestTorrent()
644 defer os.RemoveAll(dir)
645 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
646 require.NoError(t, err)
648 require.NotNil(t, tt.Info())
649 _, err = os.Stat(filepath.Join(cfg.ConfigDir, "torrents", fmt.Sprintf("%x.torrent", mi.Info.Hash)))
650 require.NoError(t, err)
651 // Contains only the infohash.
653 missinggo.CopyExact(&ts.InfoHash, mi.Info.Hash)
654 _, ok := cl.Torrent(ts.InfoHash)
657 _, ok = cl.Torrent(ts.InfoHash)
659 tt, new, err = cl.AddTorrentSpec(&ts)
660 require.NoError(t, err)
662 // Obtained from the metainfo cache.
663 require.NotNil(t, tt.Info())
666 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
667 dir, mi := testutil.GreetingTestTorrent()
669 cl, _ := NewClient(&TestingConfig)
672 CopyExact(&ts.InfoHash, mi.Info.Hash)
673 tt, _, _ := cl.AddTorrentSpec(&ts)
675 assert.EqualValues(t, 0, len(cl.Torrents()))
683 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) {
684 fileCacheDir, err := ioutil.TempDir("", "")
685 require.NoError(t, err)
686 defer os.RemoveAll(fileCacheDir)
687 fileCache, err := filecache.NewCache(fileCacheDir)
688 require.NoError(t, err)
689 greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
690 defer os.RemoveAll(greetingDataTempDir)
691 filePieceStore := pieceStore.New(fileCacheDataBackend.New(fileCache))
692 greetingData := filePieceStore.OpenTorrentData(&greetingMetainfo.Info.Info)
693 written, err := greetingData.WriteAt([]byte(testutil.GreetingFileContents), 0)
694 require.Equal(t, len(testutil.GreetingFileContents), written)
695 require.NoError(t, err)
696 for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
697 // p := greetingMetainfo.Info.Piece(i)
698 if alreadyCompleted {
699 err := greetingData.PieceCompleted(i)
700 assert.NoError(t, err)
704 // TODO: Disable network option?
705 cfg.DisableTCP = true
706 cfg.DisableUTP = true
707 cfg.TorrentDataOpener = func(mi *metainfo.Info) Data {
708 return filePieceStore.OpenTorrentData(mi)
710 cl, err := NewClient(&cfg)
711 require.NoError(t, err)
713 tt, err := cl.AddTorrent(greetingMetainfo)
714 require.NoError(t, err)
715 psrs := tt.PieceStateRuns()
716 assert.Len(t, psrs, 1)
717 assert.EqualValues(t, 3, psrs[0].Length)
718 assert.Equal(t, alreadyCompleted, psrs[0].Complete)
719 if alreadyCompleted {
721 b, err := ioutil.ReadAll(r)
722 assert.NoError(t, err)
723 assert.EqualValues(t, testutil.GreetingFileContents, b)
727 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
728 testAddTorrentPriorPieceCompletion(t, true)
731 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
732 testAddTorrentPriorPieceCompletion(t, false)