16 _ "github.com/anacrolix/envpprof"
17 "github.com/anacrolix/missinggo"
18 . "github.com/anacrolix/missinggo"
19 "github.com/anacrolix/missinggo/filecache"
20 "github.com/anacrolix/utp"
21 "github.com/bradfitz/iter"
22 "github.com/stretchr/testify/assert"
23 "github.com/stretchr/testify/require"
25 "github.com/anacrolix/torrent/bencode"
26 "github.com/anacrolix/torrent/data/pieceStore"
27 "github.com/anacrolix/torrent/data/pieceStore/dataBackend/fileCache"
28 "github.com/anacrolix/torrent/dht"
29 "github.com/anacrolix/torrent/internal/testutil"
30 "github.com/anacrolix/torrent/iplist"
31 "github.com/anacrolix/torrent/metainfo"
35 log.SetFlags(log.LstdFlags | log.Llongfile)
38 var TestingConfig = Config{
39 ListenAddr: "localhost:0",
41 DisableTrackers: true,
42 NoDefaultBlocklist: true,
43 DisableMetainfoCache: true,
44 DataDir: filepath.Join(os.TempDir(), "anacrolix"),
45 DHTConfig: dht.ServerConfig{
46 NoDefaultBootstrap: true,
50 func TestClientDefault(t *testing.T) {
51 cl, err := NewClient(&TestingConfig)
58 func TestAddDropTorrent(t *testing.T) {
59 cl, err := NewClient(&TestingConfig)
64 dir, mi := testutil.GreetingTestTorrent()
65 defer os.RemoveAll(dir)
66 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
76 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
80 func TestAddTorrentNoUsableURLs(t *testing.T) {
84 func TestAddPeersToUnknownTorrent(t *testing.T) {
88 func TestPieceHashSize(t *testing.T) {
89 if pieceHash.Size() != 20 {
94 func TestTorrentInitialState(t *testing.T) {
95 dir, mi := testutil.GreetingTestTorrent()
96 defer os.RemoveAll(dir)
97 tor, err := newTorrent(func() (ih InfoHash) {
98 missinggo.CopyExact(ih[:], mi.Info.Hash)
105 err = tor.setMetadata(&mi.Info.Info, mi.Info.Bytes)
109 if len(tor.Pieces) != 3 {
110 t.Fatal("wrong number of pieces")
112 tor.pendAllChunkSpecs(0)
113 assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
114 assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
117 func TestUnmarshalPEXMsg(t *testing.T) {
118 var m peerExchangeMessage
119 if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
122 if len(m.Added) != 2 {
125 if m.Added[0].Port != 0x506 {
130 func TestReducedDialTimeout(t *testing.T) {
131 for _, _case := range []struct {
135 ExpectedReduced time.Duration
137 {nominalDialTimeout, 40, 0, nominalDialTimeout},
138 {nominalDialTimeout, 40, 1, nominalDialTimeout},
139 {nominalDialTimeout, 40, 39, nominalDialTimeout},
140 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
141 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
142 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
144 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
145 expected := _case.ExpectedReduced
146 if expected < minDialTimeout {
147 expected = minDialTimeout
149 if reduced != expected {
150 t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
155 func TestUTPRawConn(t *testing.T) {
156 l, err := utp.NewSocket("udp", "")
169 // Connect a UTP peer to see if the RawConn will still work.
170 s, _ := utp.NewSocket("udp", "")
172 utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
174 t.Fatalf("error dialing utp listener: %s", err)
176 defer utpPeer.Close()
177 peer, err := net.ListenPacket("udp", ":0")
184 // How many messages to send. I've set this to double the channel buffer
185 // size in the raw packetConn.
187 readerStopped := make(chan struct{})
188 // The reader goroutine.
190 defer close(readerStopped)
191 b := make([]byte, 500)
192 for i := 0; i < N; i++ {
193 n, _, err := l.ReadFrom(b)
195 t.Fatalf("error reading from raw conn: %s", err)
199 fmt.Sscan(string(b[:n]), &d)
201 log.Printf("got wrong number: expected %d, got %d", i, d)
205 udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
209 for i := 0; i < N; i++ {
210 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
214 time.Sleep(time.Microsecond)
217 case <-readerStopped:
218 case <-time.After(time.Second):
219 t.Fatal("reader timed out")
221 if msgsReceived != N {
222 t.Fatalf("messages received: %d", msgsReceived)
226 func TestTwoClientsArbitraryPorts(t *testing.T) {
227 for i := 0; i < 2; i++ {
228 cl, err := NewClient(&TestingConfig)
236 func TestAddDropManyTorrents(t *testing.T) {
237 cl, _ := NewClient(&TestingConfig)
239 for i := range iter.N(1000) {
241 binary.PutVarint(spec.InfoHash[:], int64(i))
242 tt, new, err := cl.AddTorrentSpec(&spec)
253 func TestClientTransfer(t *testing.T) {
254 greetingTempDir, mi := testutil.GreetingTestTorrent()
255 defer os.RemoveAll(greetingTempDir)
258 cfg.DataDir = greetingTempDir
259 seeder, err := NewClient(&cfg)
264 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
265 leecherDataDir, err := ioutil.TempDir("", "")
269 defer os.RemoveAll(leecherDataDir)
270 // cfg.TorrentDataOpener = func(info *metainfo.Info) (data.Data, error) {
271 // return blob.TorrentData(info, leecherDataDir), nil
273 // blobStore := blob.NewStore(leecherDataDir)
274 // cfg.TorrentDataOpener = func(info *metainfo.Info) Data {
275 // return blobStore.OpenTorrent(info)
277 cfg.TorrentDataOpener = func() TorrentDataOpener {
278 fc, err := filecache.NewCache(leecherDataDir)
279 require.NoError(t, err)
280 store := pieceStore.New(fileCacheDataBackend.New(fc))
281 return func(mi *metainfo.Info) Data {
282 return store.OpenTorrentData(mi)
285 leecher, _ := NewClient(&cfg)
286 defer leecher.Close()
287 leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
288 ret = TorrentSpecFromMetaInfo(mi)
292 // TODO: The piece state publishing is kinda jammed in here until I have a
293 // more thorough test.
295 s := leecherGreeting.torrent.pieceStateChanges.Subscribe()
297 for i := range s.Values {
300 log.Print("finished")
302 leecherGreeting.AddPeers([]Peer{
304 IP: missinggo.AddrIP(seeder.ListenAddr()),
305 Port: missinggo.AddrPort(seeder.ListenAddr()),
308 r := leecherGreeting.NewReader()
310 _greeting, err := ioutil.ReadAll(r)
312 t.Fatalf("%q %s", string(_greeting), err)
314 greeting := string(_greeting)
315 if greeting != testutil.GreetingFileContents {
320 // Check that after completing leeching, a leecher transitions to a seeding
321 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
322 func TestSeedAfterDownloading(t *testing.T) {
323 greetingTempDir, mi := testutil.GreetingTestTorrent()
324 defer os.RemoveAll(greetingTempDir)
327 cfg.DataDir = greetingTempDir
328 seeder, err := NewClient(&cfg)
330 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
331 cfg.DataDir, err = ioutil.TempDir("", "")
332 require.NoError(t, err)
333 defer os.RemoveAll(cfg.DataDir)
334 leecher, _ := NewClient(&cfg)
335 defer leecher.Close()
337 cfg.TorrentDataOpener = nil
338 cfg.DataDir, err = ioutil.TempDir("", "")
339 require.NoError(t, err)
340 defer os.RemoveAll(cfg.DataDir)
341 leecherLeecher, _ := NewClient(&cfg)
342 defer leecherLeecher.Close()
343 leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
344 ret = TorrentSpecFromMetaInfo(mi)
348 llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
349 ret = TorrentSpecFromMetaInfo(mi)
353 // Simultaneously DownloadAll in Leecher, and read the contents
354 // consecutively in LeecherLeecher. This non-deterministically triggered a
355 // case where the leecher wouldn't unchoke the LeecherLeecher.
356 var wg sync.WaitGroup
362 b, err := ioutil.ReadAll(r)
363 require.NoError(t, err)
364 require.EqualValues(t, testutil.GreetingFileContents, b)
366 leecherGreeting.AddPeers([]Peer{
368 IP: missinggo.AddrIP(seeder.ListenAddr()),
369 Port: missinggo.AddrPort(seeder.ListenAddr()),
372 IP: missinggo.AddrIP(leecherLeecher.ListenAddr()),
373 Port: missinggo.AddrPort(leecherLeecher.ListenAddr()),
379 leecherGreeting.DownloadAll()
385 func TestReadaheadPieces(t *testing.T) {
386 for _, case_ := range []struct {
387 readaheadBytes, pieceLength int64
390 {5 * 1024 * 1024, 256 * 1024, 19},
391 {5 * 1024 * 1024, 5 * 1024 * 1024, 1},
392 {5*1024*1024 - 1, 5 * 1024 * 1024, 1},
393 {5 * 1024 * 1024, 5*1024*1024 - 1, 2},
394 {0, 5 * 1024 * 1024, 0},
395 {5 * 1024 * 1024, 1048576, 4},
397 pieces := readaheadPieces(case_.readaheadBytes, case_.pieceLength)
398 assert.Equal(t, case_.readaheadPieces, pieces, "%v", case_)
402 func TestMergingTrackersByAddingSpecs(t *testing.T) {
403 cl, _ := NewClient(&TestingConfig)
405 spec := TorrentSpec{}
406 T, new, _ := cl.AddTorrentSpec(&spec)
410 spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
411 _, new, _ = cl.AddTorrentSpec(&spec)
415 assert.EqualValues(t, T.torrent.Trackers[0][0].URL(), "http://a")
416 assert.EqualValues(t, T.torrent.Trackers[1][0].URL(), "udp://b")
419 type badData struct{}
421 func (me badData) Close() {}
423 func (me badData) WriteAt(b []byte, off int64) (int, error) {
427 func (me badData) WriteSectionTo(w io.Writer, off, n int64) (int64, error) {
431 func (me badData) PieceComplete(piece int) bool {
435 func (me badData) PieceCompleted(piece int) error {
439 func (me badData) ReadAt(b []byte, off int64) (n int, err error) {
444 n = copy(b, []byte("hello")[off:])
448 // We read from a piece which is marked completed, but is missing data.
449 func TestCompletedPieceWrongSize(t *testing.T) {
451 cfg.TorrentDataOpener = func(*metainfo.Info) Data {
454 cl, _ := NewClient(&cfg)
456 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
457 Info: &metainfo.InfoEx{
460 Pieces: make([]byte, 20),
461 Files: []metainfo.FileInfo{
462 metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
471 t.Fatal("expected new")
475 b := make([]byte, 20)
476 n, err := io.ReadFull(r, b)
477 if n != 5 || err != io.ErrUnexpectedEOF {
483 func BenchmarkAddLargeTorrent(b *testing.B) {
485 cfg.DisableTCP = true
486 cfg.DisableUTP = true
487 cfg.ListenAddr = "redonk"
488 cl, _ := NewClient(&cfg)
490 for range iter.N(b.N) {
491 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
499 func TestResponsive(t *testing.T) {
500 seederDataDir, mi := testutil.GreetingTestTorrent()
501 defer os.RemoveAll(seederDataDir)
504 cfg.DataDir = seederDataDir
505 seeder, err := NewClient(&cfg)
508 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
509 leecherDataDir, err := ioutil.TempDir("", "")
511 defer os.RemoveAll(leecherDataDir)
513 cfg.DataDir = leecherDataDir
514 leecher, err := NewClient(&cfg)
516 defer leecher.Close()
517 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
518 ret = TorrentSpecFromMetaInfo(mi)
522 leecherTorrent.AddPeers([]Peer{
524 IP: missinggo.AddrIP(seeder.ListenAddr()),
525 Port: missinggo.AddrPort(seeder.ListenAddr()),
528 reader := leecherTorrent.NewReader()
529 reader.SetReadahead(0)
530 reader.SetResponsive()
532 _, err = reader.ReadAt(b, 3)
534 assert.EqualValues(t, "lo", string(b))
535 n, err := reader.ReadAt(b, 11)
537 assert.EqualValues(t, 2, n)
538 assert.EqualValues(t, "d\n", string(b))
541 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
542 seederDataDir, mi := testutil.GreetingTestTorrent()
543 defer os.RemoveAll(seederDataDir)
546 cfg.DataDir = seederDataDir
547 seeder, err := NewClient(&cfg)
550 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
551 leecherDataDir, err := ioutil.TempDir("", "")
553 defer os.RemoveAll(leecherDataDir)
555 cfg.DataDir = leecherDataDir
556 leecher, err := NewClient(&cfg)
558 defer leecher.Close()
559 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
560 ret = TorrentSpecFromMetaInfo(mi)
564 leecherTorrent.AddPeers([]Peer{
566 IP: missinggo.AddrIP(seeder.ListenAddr()),
567 Port: missinggo.AddrPort(seeder.ListenAddr()),
570 reader := leecherTorrent.NewReader()
571 reader.SetReadahead(0)
572 reader.SetResponsive()
574 _, err = reader.ReadAt(b, 3)
576 assert.EqualValues(t, "lo", string(b))
577 go leecherTorrent.Drop()
578 n, err := reader.ReadAt(b, 11)
579 assert.EqualError(t, err, "torrent closed")
580 assert.EqualValues(t, 0, n)
583 func TestDHTInheritBlocklist(t *testing.T) {
584 ipl := iplist.New(nil)
585 require.NotNil(t, ipl)
587 cfg.IPBlocklist = ipl
589 cl, err := NewClient(&cfg)
590 require.NoError(t, err)
592 require.Equal(t, ipl, cl.DHT().IPBlocklist())
595 // Check that stuff is merged in subsequent AddTorrentSpec for the same
597 func TestAddTorrentSpecMerging(t *testing.T) {
598 cl, err := NewClient(&TestingConfig)
599 require.NoError(t, err)
601 dir, mi := testutil.GreetingTestTorrent()
602 defer os.RemoveAll(dir)
604 missinggo.CopyExact(&ts.InfoHash, mi.Info.Hash)
605 tt, new, err := cl.AddTorrentSpec(&ts)
606 require.NoError(t, err)
608 require.Nil(t, tt.Info())
609 _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
610 require.NoError(t, err)
611 require.False(t, new)
612 require.NotNil(t, tt.Info())
615 // Check that torrent Info is obtained from the metainfo file cache.
616 func TestAddTorrentMetainfoInCache(t *testing.T) {
618 cfg.DisableMetainfoCache = false
619 cfg.ConfigDir, _ = ioutil.TempDir(os.TempDir(), "")
620 defer os.RemoveAll(cfg.ConfigDir)
621 cl, err := NewClient(&cfg)
622 require.NoError(t, err)
624 dir, mi := testutil.GreetingTestTorrent()
625 defer os.RemoveAll(dir)
626 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
627 require.NoError(t, err)
629 require.NotNil(t, tt.Info())
630 _, err = os.Stat(filepath.Join(cfg.ConfigDir, "torrents", fmt.Sprintf("%x.torrent", mi.Info.Hash)))
631 require.NoError(t, err)
632 // Contains only the infohash.
634 missinggo.CopyExact(&ts.InfoHash, mi.Info.Hash)
635 _, ok := cl.Torrent(ts.InfoHash)
638 _, ok = cl.Torrent(ts.InfoHash)
640 tt, new, err = cl.AddTorrentSpec(&ts)
641 require.NoError(t, err)
643 // Obtained from the metainfo cache.
644 require.NotNil(t, tt.Info())
647 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
648 dir, mi := testutil.GreetingTestTorrent()
650 cl, _ := NewClient(&TestingConfig)
653 CopyExact(&ts.InfoHash, mi.Info.Hash)
654 tt, _, _ := cl.AddTorrentSpec(&ts)
656 assert.EqualValues(t, 0, len(cl.Torrents()))