19 _ "github.com/anacrolix/envpprof"
20 "github.com/anacrolix/missinggo"
21 . "github.com/anacrolix/missinggo"
22 "github.com/anacrolix/utp"
23 "github.com/bradfitz/iter"
24 "github.com/stretchr/testify/assert"
25 "github.com/stretchr/testify/require"
27 "github.com/anacrolix/torrent/bencode"
28 "github.com/anacrolix/torrent/dht"
29 "github.com/anacrolix/torrent/internal/testutil"
30 "github.com/anacrolix/torrent/iplist"
31 "github.com/anacrolix/torrent/metainfo"
32 "github.com/anacrolix/torrent/storage"
36 log.SetFlags(log.LstdFlags | log.Llongfile)
39 var TestingConfig = Config{
40 ListenAddr: "localhost:0",
42 DisableTrackers: true,
43 NoDefaultBlocklist: true,
44 DisableMetainfoCache: true,
46 DHTConfig: dht.ServerConfig{
47 NoDefaultBootstrap: true,
51 func TestClientDefault(t *testing.T) {
52 cl, err := NewClient(&TestingConfig)
53 require.NoError(t, err)
57 func TestAddDropTorrent(t *testing.T) {
58 cl, err := NewClient(&TestingConfig)
59 require.NoError(t, err)
61 dir, mi := testutil.GreetingTestTorrent()
62 defer os.RemoveAll(dir)
63 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
64 require.NoError(t, err)
69 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
73 func TestAddTorrentNoUsableURLs(t *testing.T) {
77 func TestAddPeersToUnknownTorrent(t *testing.T) {
81 func TestPieceHashSize(t *testing.T) {
82 if pieceHash.Size() != 20 {
87 func TestTorrentInitialState(t *testing.T) {
88 dir, mi := testutil.GreetingTestTorrent()
89 defer os.RemoveAll(dir)
90 tor := newTorrent(func() (ih metainfo.InfoHash) {
91 missinggo.CopyExact(ih[:], mi.Info.Hash)
95 tor.storageOpener = storage.NewFile(dir)
96 // Needed to lock for asynchronous piece verification.
98 err := tor.setMetadata(&mi.Info.Info, mi.Info.Bytes)
99 require.NoError(t, err)
100 require.Len(t, tor.Pieces, 3)
101 tor.pendAllChunkSpecs(0)
102 assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
103 assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
106 func TestUnmarshalPEXMsg(t *testing.T) {
107 var m peerExchangeMessage
108 if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
111 if len(m.Added) != 2 {
114 if m.Added[0].Port != 0x506 {
119 func TestReducedDialTimeout(t *testing.T) {
120 for _, _case := range []struct {
124 ExpectedReduced time.Duration
126 {nominalDialTimeout, 40, 0, nominalDialTimeout},
127 {nominalDialTimeout, 40, 1, nominalDialTimeout},
128 {nominalDialTimeout, 40, 39, nominalDialTimeout},
129 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
130 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
131 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
133 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
134 expected := _case.ExpectedReduced
135 if expected < minDialTimeout {
136 expected = minDialTimeout
138 if reduced != expected {
139 t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
144 func TestUTPRawConn(t *testing.T) {
145 l, err := utp.NewSocket("udp", "")
158 // Connect a UTP peer to see if the RawConn will still work.
159 s, _ := utp.NewSocket("udp", "")
161 utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
163 t.Fatalf("error dialing utp listener: %s", err)
165 defer utpPeer.Close()
166 peer, err := net.ListenPacket("udp", ":0")
173 // How many messages to send. I've set this to double the channel buffer
174 // size in the raw packetConn.
176 readerStopped := make(chan struct{})
177 // The reader goroutine.
179 defer close(readerStopped)
180 b := make([]byte, 500)
181 for i := 0; i < N; i++ {
182 n, _, err := l.ReadFrom(b)
184 t.Fatalf("error reading from raw conn: %s", err)
188 fmt.Sscan(string(b[:n]), &d)
190 log.Printf("got wrong number: expected %d, got %d", i, d)
194 udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
198 for i := 0; i < N; i++ {
199 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
203 time.Sleep(time.Microsecond)
206 case <-readerStopped:
207 case <-time.After(time.Second):
208 t.Fatal("reader timed out")
210 if msgsReceived != N {
211 t.Fatalf("messages received: %d", msgsReceived)
215 func TestTwoClientsArbitraryPorts(t *testing.T) {
216 for i := 0; i < 2; i++ {
217 cl, err := NewClient(&TestingConfig)
225 func TestAddDropManyTorrents(t *testing.T) {
226 cl, err := NewClient(&TestingConfig)
227 require.NoError(t, err)
229 for i := range iter.N(1000) {
231 binary.PutVarint(spec.InfoHash[:], int64(i))
232 tt, new, err := cl.AddTorrentSpec(&spec)
233 assert.NoError(t, err)
239 func TestClientTransferDefault(t *testing.T) {
240 testClientTransfer(t, testClientTransferParams{
241 ExportClientStatus: true,
245 func TestClientTransferSmallCache(t *testing.T) {
246 testClientTransfer(t, testClientTransferParams{
247 SetLeecherStorageCapacity: true,
248 // Going below the piece length means it can't complete a piece so
249 // that it can be hashed.
250 LeecherStorageCapacity: 5,
252 // Can't readahead too far or the cache will thrash and drop data we
255 ExportClientStatus: true,
259 func TestClientTransferVarious(t *testing.T) {
260 for _, ss := range []func(string) storage.I{
264 for _, responsive := range []bool{false, true} {
265 testClientTransfer(t, testClientTransferParams{
266 Responsive: responsive,
269 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
270 testClientTransfer(t, testClientTransferParams{
272 Responsive: responsive,
274 Readahead: readahead,
281 type testClientTransferParams struct {
285 ExportClientStatus bool
286 SetLeecherStorageCapacity bool
287 LeecherStorageCapacity int64
288 SeederStorage func(string) storage.I
291 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
292 greetingTempDir, mi := testutil.GreetingTestTorrent()
293 defer os.RemoveAll(greetingTempDir)
296 if ps.SeederStorage != nil {
297 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
299 cfg.DataDir = greetingTempDir
301 seeder, err := NewClient(&cfg)
302 require.NoError(t, err)
304 if ps.ExportClientStatus {
305 testutil.ExportStatusWriter(seeder, "s")
307 _, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
308 require.NoError(t, err)
310 leecherDataDir, err := ioutil.TempDir("", "")
311 require.NoError(t, err)
312 defer os.RemoveAll(leecherDataDir)
313 // cfg.TorrentDataOpener = func() TorrentDataOpener {
314 // fc, err := filecache.NewCache(leecherDataDir)
315 // require.NoError(t, err)
316 // if ps.SetLeecherStorageCapacity {
317 // fc.SetCapacity(ps.LeecherStorageCapacity)
319 // store := pieceStore.New(fileCacheDataBackend.New(fc))
320 // return func(mi *metainfo.Info) storage.I {
321 // return store.OpenTorrentData(mi)
324 leecher, err := NewClient(&cfg)
325 require.NoError(t, err)
326 defer leecher.Close()
327 if ps.ExportClientStatus {
328 testutil.ExportStatusWriter(leecher, "l")
330 leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
331 ret = TorrentSpecFromMetaInfo(mi)
333 ret.Storage = storage.NewFile(leecherDataDir)
336 require.NoError(t, err)
338 leecherGreeting.AddPeers([]Peer{
340 IP: missinggo.AddrIP(seeder.ListenAddr()),
341 Port: missinggo.AddrPort(seeder.ListenAddr()),
344 r := leecherGreeting.NewReader()
350 r.SetReadahead(ps.Readahead)
352 for range iter.N(2) {
353 pos, err := r.Seek(0, os.SEEK_SET)
354 assert.NoError(t, err)
355 assert.EqualValues(t, 0, pos)
356 _greeting, err := ioutil.ReadAll(r)
357 assert.NoError(t, err)
358 assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
362 // Check that after completing leeching, a leecher transitions to a seeding
363 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
364 func TestSeedAfterDownloading(t *testing.T) {
365 greetingTempDir, mi := testutil.GreetingTestTorrent()
366 defer os.RemoveAll(greetingTempDir)
369 cfg.DataDir = greetingTempDir
370 seeder, err := NewClient(&cfg)
372 testutil.ExportStatusWriter(seeder, "s")
373 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
374 cfg.DataDir, err = ioutil.TempDir("", "")
375 require.NoError(t, err)
376 defer os.RemoveAll(cfg.DataDir)
377 leecher, _ := NewClient(&cfg)
378 defer leecher.Close()
379 testutil.ExportStatusWriter(leecher, "l")
381 // cfg.TorrentDataOpener = nil
382 cfg.DataDir, err = ioutil.TempDir("", "")
383 require.NoError(t, err)
384 defer os.RemoveAll(cfg.DataDir)
385 leecherLeecher, _ := NewClient(&cfg)
386 defer leecherLeecher.Close()
387 testutil.ExportStatusWriter(leecherLeecher, "ll")
388 leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
389 ret = TorrentSpecFromMetaInfo(mi)
393 llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
394 ret = TorrentSpecFromMetaInfo(mi)
398 // Simultaneously DownloadAll in Leecher, and read the contents
399 // consecutively in LeecherLeecher. This non-deterministically triggered a
400 // case where the leecher wouldn't unchoke the LeecherLeecher.
401 var wg sync.WaitGroup
407 b, err := ioutil.ReadAll(r)
408 require.NoError(t, err)
409 assert.EqualValues(t, testutil.GreetingFileContents, b)
411 leecherGreeting.AddPeers([]Peer{
413 IP: missinggo.AddrIP(seeder.ListenAddr()),
414 Port: missinggo.AddrPort(seeder.ListenAddr()),
417 IP: missinggo.AddrIP(leecherLeecher.ListenAddr()),
418 Port: missinggo.AddrPort(leecherLeecher.ListenAddr()),
424 leecherGreeting.DownloadAll()
430 func TestReadaheadPieces(t *testing.T) {
431 for _, case_ := range []struct {
432 readaheadBytes, pieceLength int64
435 {5 * 1024 * 1024, 256 * 1024, 19},
436 {5 * 1024 * 1024, 5 * 1024 * 1024, 1},
437 {5*1024*1024 - 1, 5 * 1024 * 1024, 1},
438 {5 * 1024 * 1024, 5*1024*1024 - 1, 2},
439 {0, 5 * 1024 * 1024, 0},
440 {5 * 1024 * 1024, 1048576, 4},
442 pieces := readaheadPieces(case_.readaheadBytes, case_.pieceLength)
443 assert.Equal(t, case_.readaheadPieces, pieces, "%v", case_)
447 func TestMergingTrackersByAddingSpecs(t *testing.T) {
448 cl, err := NewClient(&TestingConfig)
449 require.NoError(t, err)
451 spec := TorrentSpec{}
452 T, new, _ := cl.AddTorrentSpec(&spec)
456 spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
457 _, new, _ = cl.AddTorrentSpec(&spec)
461 assert.EqualValues(t, T.torrent.Trackers[0][0], "http://a")
462 assert.EqualValues(t, T.torrent.Trackers[1][0], "udp://b")
465 type badStorage struct{}
467 func (me badStorage) OpenTorrent(*metainfo.InfoEx) (storage.Torrent, error) {
471 func (me badStorage) Close() error {
475 func (me badStorage) Piece(p metainfo.Piece) storage.Piece {
476 return badStoragePiece{p}
479 type badStoragePiece struct {
483 func (me badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
487 func (me badStoragePiece) GetIsComplete() bool {
491 func (me badStoragePiece) MarkComplete() error {
492 return errors.New("psyyyyyyyche")
495 func (me badStoragePiece) randomlyTruncatedDataString() string {
496 return "hello, world\n"[:rand.Intn(14)]
499 func (me badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
500 r := strings.NewReader(me.randomlyTruncatedDataString())
501 return r.ReadAt(b, off+me.p.Offset())
504 // We read from a piece which is marked completed, but is missing data.
505 func TestCompletedPieceWrongSize(t *testing.T) {
507 cfg.DefaultStorage = badStorage{}
508 cl, _ := NewClient(&cfg)
510 tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
511 Info: &metainfo.InfoEx{
514 Pieces: make([]byte, 20),
515 Files: []metainfo.FileInfo{
516 metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
521 require.NoError(t, err)
526 b, err := ioutil.ReadAll(r)
528 assert.NoError(t, err)
531 func BenchmarkAddLargeTorrent(b *testing.B) {
533 cfg.DisableTCP = true
534 cfg.DisableUTP = true
535 cfg.ListenAddr = "redonk"
536 cl, _ := NewClient(&cfg)
538 for range iter.N(b.N) {
539 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
547 func TestResponsive(t *testing.T) {
548 seederDataDir, mi := testutil.GreetingTestTorrent()
549 defer os.RemoveAll(seederDataDir)
552 cfg.DataDir = seederDataDir
553 seeder, err := NewClient(&cfg)
556 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
557 leecherDataDir, err := ioutil.TempDir("", "")
559 defer os.RemoveAll(leecherDataDir)
561 cfg.DataDir = leecherDataDir
562 leecher, err := NewClient(&cfg)
564 defer leecher.Close()
565 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
566 ret = TorrentSpecFromMetaInfo(mi)
570 leecherTorrent.AddPeers([]Peer{
572 IP: missinggo.AddrIP(seeder.ListenAddr()),
573 Port: missinggo.AddrPort(seeder.ListenAddr()),
576 reader := leecherTorrent.NewReader()
578 reader.SetReadahead(0)
579 reader.SetResponsive()
581 _, err = reader.Seek(3, os.SEEK_SET)
582 require.NoError(t, err)
583 _, err = io.ReadFull(reader, b)
585 assert.EqualValues(t, "lo", string(b))
586 _, err = reader.Seek(11, os.SEEK_SET)
587 require.NoError(t, err)
588 n, err := io.ReadFull(reader, b)
590 assert.EqualValues(t, 2, n)
591 assert.EqualValues(t, "d\n", string(b))
594 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
595 seederDataDir, mi := testutil.GreetingTestTorrent()
596 defer os.RemoveAll(seederDataDir)
599 cfg.DataDir = seederDataDir
600 seeder, err := NewClient(&cfg)
603 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
604 leecherDataDir, err := ioutil.TempDir("", "")
606 defer os.RemoveAll(leecherDataDir)
608 cfg.DataDir = leecherDataDir
609 leecher, err := NewClient(&cfg)
611 defer leecher.Close()
612 leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
613 ret = TorrentSpecFromMetaInfo(mi)
617 leecherTorrent.AddPeers([]Peer{
619 IP: missinggo.AddrIP(seeder.ListenAddr()),
620 Port: missinggo.AddrPort(seeder.ListenAddr()),
623 reader := leecherTorrent.NewReader()
625 reader.SetReadahead(0)
626 reader.SetResponsive()
628 _, err = reader.Seek(3, os.SEEK_SET)
629 require.NoError(t, err)
630 _, err = io.ReadFull(reader, b)
632 assert.EqualValues(t, "lo", string(b))
633 go leecherTorrent.Drop()
634 _, err = reader.Seek(11, os.SEEK_SET)
635 require.NoError(t, err)
636 n, err := reader.Read(b)
637 assert.EqualError(t, err, "torrent closed")
638 assert.EqualValues(t, 0, n)
641 func TestDHTInheritBlocklist(t *testing.T) {
642 ipl := iplist.New(nil)
643 require.NotNil(t, ipl)
645 cfg.IPBlocklist = ipl
647 cl, err := NewClient(&cfg)
648 require.NoError(t, err)
650 require.Equal(t, ipl, cl.DHT().IPBlocklist())
653 // Check that stuff is merged in subsequent AddTorrentSpec for the same
655 func TestAddTorrentSpecMerging(t *testing.T) {
656 cl, err := NewClient(&TestingConfig)
657 require.NoError(t, err)
659 dir, mi := testutil.GreetingTestTorrent()
660 defer os.RemoveAll(dir)
662 missinggo.CopyExact(&ts.InfoHash, mi.Info.Hash)
663 tt, new, err := cl.AddTorrentSpec(&ts)
664 require.NoError(t, err)
666 require.Nil(t, tt.Info())
667 _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
668 require.NoError(t, err)
669 require.False(t, new)
670 require.NotNil(t, tt.Info())
673 // Check that torrent Info is obtained from the metainfo file cache.
674 func TestAddTorrentMetainfoInCache(t *testing.T) {
676 cfg.DisableMetainfoCache = false
677 cfg.ConfigDir, _ = ioutil.TempDir(os.TempDir(), "")
678 defer os.RemoveAll(cfg.ConfigDir)
679 cl, err := NewClient(&cfg)
680 require.NoError(t, err)
682 dir, mi := testutil.GreetingTestTorrent()
683 defer os.RemoveAll(dir)
684 tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
685 require.NoError(t, err)
687 require.NotNil(t, tt.Info())
688 _, err = os.Stat(filepath.Join(cfg.ConfigDir, "torrents", fmt.Sprintf("%x.torrent", mi.Info.Hash.Bytes())))
689 require.NoError(t, err)
690 // Contains only the infohash.
692 missinggo.CopyExact(&ts.InfoHash, mi.Info.Hash)
693 _, ok := cl.Torrent(ts.InfoHash)
696 _, ok = cl.Torrent(ts.InfoHash)
698 tt, new, err = cl.AddTorrentSpec(&ts)
699 require.NoError(t, err)
701 // Obtained from the metainfo cache.
702 require.NotNil(t, tt.Info())
705 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
706 dir, mi := testutil.GreetingTestTorrent()
708 cl, _ := NewClient(&TestingConfig)
711 CopyExact(&ts.InfoHash, mi.Info.Hash)
712 tt, _, _ := cl.AddTorrentSpec(&ts)
714 assert.EqualValues(t, 0, len(cl.Torrents()))
722 // func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) {
723 // fileCacheDir, err := ioutil.TempDir("", "")
724 // require.NoError(t, err)
725 // defer os.RemoveAll(fileCacheDir)
726 // fileCache, err := filecache.NewCache(fileCacheDir)
727 // require.NoError(t, err)
728 // greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
729 // defer os.RemoveAll(greetingDataTempDir)
730 // filePieceStore := pieceStore.New(fileCacheDataBackend.New(fileCache))
731 // greetingData := filePieceStore.OpenTorrentData(&greetingMetainfo.Info.Info)
732 // written, err := greetingData.WriteAt([]byte(testutil.GreetingFileContents), 0)
733 // require.Equal(t, len(testutil.GreetingFileContents), written)
734 // require.NoError(t, err)
735 // for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
736 // // p := greetingMetainfo.Info.Piece(i)
737 // if alreadyCompleted {
738 // err := greetingData.PieceCompleted(i)
739 // assert.NoError(t, err)
742 // cfg := TestingConfig
743 // // TODO: Disable network option?
744 // cfg.DisableTCP = true
745 // cfg.DisableUTP = true
746 // // cfg.DefaultStorage = filePieceStore
747 // cl, err := NewClient(&cfg)
748 // require.NoError(t, err)
750 // tt, err := cl.AddTorrent(greetingMetainfo)
751 // require.NoError(t, err)
752 // psrs := tt.PieceStateRuns()
753 // assert.Len(t, psrs, 1)
754 // assert.EqualValues(t, 3, psrs[0].Length)
755 // assert.Equal(t, alreadyCompleted, psrs[0].Complete)
756 // if alreadyCompleted {
757 // r := tt.NewReader()
758 // b, err := ioutil.ReadAll(r)
759 // assert.NoError(t, err)
760 // assert.EqualValues(t, testutil.GreetingFileContents, b)
764 // func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
765 // testAddTorrentPriorPieceCompletion(t, true)
768 // func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
769 // testAddTorrentPriorPieceCompletion(t, false)
772 func TestAddMetainfoWithNodes(t *testing.T) {
775 // For now, we want to just jam the nodes into the table, without
776 // verifying them first. Also the DHT code doesn't support mixing secure
777 // and insecure nodes if security is enabled (yet).
778 cfg.DHTConfig.NoSecurity = true
779 cl, err := NewClient(&cfg)
780 require.NoError(t, err)
782 assert.EqualValues(t, cl.DHT().NumNodes(), 0)
783 tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
784 require.NoError(t, err)
785 assert.Len(t, tt.torrent.Trackers, 5)
786 assert.EqualValues(t, 6, cl.DHT().NumNodes())
789 type testDownloadCancelParams struct {
793 ExportClientStatus bool
794 SetLeecherStorageCapacity bool
795 LeecherStorageCapacity int64
799 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
800 greetingTempDir, mi := testutil.GreetingTestTorrent()
801 defer os.RemoveAll(greetingTempDir)
804 cfg.DataDir = greetingTempDir
805 seeder, err := NewClient(&cfg)
806 require.NoError(t, err)
808 if ps.ExportClientStatus {
809 testutil.ExportStatusWriter(seeder, "s")
811 seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
812 leecherDataDir, err := ioutil.TempDir("", "")
813 require.NoError(t, err)
814 defer os.RemoveAll(leecherDataDir)
815 // cfg.TorrentDataOpener = func() TorrentDataOpener {
816 // fc, err := filecache.NewCache(leecherDataDir)
817 // require.NoError(t, err)
818 // if ps.SetLeecherStorageCapacity {
819 // fc.SetCapacity(ps.LeecherStorageCapacity)
821 // store := pieceStore.New(fileCacheDataBackend.New(fc))
822 // return func(mi *metainfo.Info) storage.I {
823 // return store.OpenTorrentData(mi)
826 cfg.DataDir = leecherDataDir
827 leecher, _ := NewClient(&cfg)
828 defer leecher.Close()
829 if ps.ExportClientStatus {
830 testutil.ExportStatusWriter(leecher, "l")
832 leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
833 ret = TorrentSpecFromMetaInfo(mi)
837 require.NoError(t, err)
839 psc := leecherGreeting.SubscribePieceStateChanges()
841 leecherGreeting.DownloadAll()
843 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
845 leecherGreeting.AddPeers([]Peer{
847 IP: missinggo.AddrIP(seeder.ListenAddr()),
848 Port: missinggo.AddrPort(seeder.ListenAddr()),
851 completes := make(map[int]bool, 3)
854 // started := time.Now()
856 case _v := <-psc.Values:
857 // log.Print(time.Since(started))
858 v := _v.(PieceStateChange)
859 completes[v.Index] = v.Complete
860 case <-time.After(100 * time.Millisecond):
865 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
867 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
872 func TestTorrentDownloadAll(t *testing.T) {
873 testDownloadCancel(t, testDownloadCancelParams{})
876 func TestTorrentDownloadAllThenCancel(t *testing.T) {
877 testDownloadCancel(t, testDownloadCancelParams{
882 // Ensure that it's an error for a peer to send an invalid have message.
883 func TestPeerInvalidHave(t *testing.T) {
884 cl, err := NewClient(&TestingConfig)
885 require.NoError(t, err)
887 tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
888 Info: &metainfo.InfoEx{
891 Pieces: make([]byte, 20),
892 Files: []metainfo.FileInfo{{Length: 1}},
896 require.NoError(t, err)
902 assert.NoError(t, cn.peerSentHave(0))
903 assert.Error(t, cn.peerSentHave(1))
906 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
907 greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
908 defer os.RemoveAll(greetingTempDir)
910 cfg.DataDir = greetingTempDir
911 seeder, err := NewClient(&TestingConfig)
912 require.NoError(t, err)
913 seeder.AddTorrentSpec(&TorrentSpec{
914 Info: &greetingMetainfo.Info,