]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
b61d6b1bd91fc4656ed994cc2728c716be45e94c
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "log"
9         "net"
10         "os"
11         "path/filepath"
12         "sync"
13         "testing"
14         "time"
15
16         _ "github.com/anacrolix/envpprof"
17         "github.com/anacrolix/missinggo"
18         . "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/filecache"
20         "github.com/anacrolix/utp"
21         "github.com/bradfitz/iter"
22         "github.com/stretchr/testify/assert"
23         "github.com/stretchr/testify/require"
24
25         "github.com/anacrolix/torrent/bencode"
26         "github.com/anacrolix/torrent/data/pieceStore"
27         "github.com/anacrolix/torrent/data/pieceStore/dataBackend/fileCache"
28         "github.com/anacrolix/torrent/dht"
29         "github.com/anacrolix/torrent/internal/testutil"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32 )
33
34 func init() {
35         log.SetFlags(log.LstdFlags | log.Llongfile)
36 }
37
38 var TestingConfig = Config{
39         ListenAddr:           "localhost:0",
40         NoDHT:                true,
41         DisableTrackers:      true,
42         NoDefaultBlocklist:   true,
43         DisableMetainfoCache: true,
44         DataDir:              filepath.Join(os.TempDir(), "anacrolix"),
45 }
46
47 func TestClientDefault(t *testing.T) {
48         cl, err := NewClient(&TestingConfig)
49         if err != nil {
50                 t.Fatal(err)
51         }
52         cl.Close()
53 }
54
55 func TestAddDropTorrent(t *testing.T) {
56         cl, err := NewClient(&TestingConfig)
57         if err != nil {
58                 t.Fatal(err)
59         }
60         defer cl.Close()
61         dir, mi := testutil.GreetingTestTorrent()
62         defer os.RemoveAll(dir)
63         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
64         if err != nil {
65                 t.Fatal(err)
66         }
67         if !new {
68                 t.FailNow()
69         }
70         tt.Drop()
71 }
72
73 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
74         t.SkipNow()
75 }
76
77 func TestAddTorrentNoUsableURLs(t *testing.T) {
78         t.SkipNow()
79 }
80
81 func TestAddPeersToUnknownTorrent(t *testing.T) {
82         t.SkipNow()
83 }
84
85 func TestPieceHashSize(t *testing.T) {
86         if pieceHash.Size() != 20 {
87                 t.FailNow()
88         }
89 }
90
91 func TestTorrentInitialState(t *testing.T) {
92         dir, mi := testutil.GreetingTestTorrent()
93         defer os.RemoveAll(dir)
94         tor, err := newTorrent(func() (ih InfoHash) {
95                 missinggo.CopyExact(ih[:], mi.Info.Hash)
96                 return
97         }())
98         if err != nil {
99                 t.Fatal(err)
100         }
101         tor.chunkSize = 2
102         err = tor.setMetadata(&mi.Info.Info, mi.Info.Bytes)
103         if err != nil {
104                 t.Fatal(err)
105         }
106         if len(tor.Pieces) != 3 {
107                 t.Fatal("wrong number of pieces")
108         }
109         p := &tor.Pieces[0]
110         tor.pendAllChunkSpecs(0)
111         assert.EqualValues(t, 3, p.numPendingChunks())
112         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
113 }
114
115 func TestUnmarshalPEXMsg(t *testing.T) {
116         var m peerExchangeMessage
117         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
118                 t.Fatal(err)
119         }
120         if len(m.Added) != 2 {
121                 t.FailNow()
122         }
123         if m.Added[0].Port != 0x506 {
124                 t.FailNow()
125         }
126 }
127
128 func TestReducedDialTimeout(t *testing.T) {
129         for _, _case := range []struct {
130                 Max             time.Duration
131                 HalfOpenLimit   int
132                 PendingPeers    int
133                 ExpectedReduced time.Duration
134         }{
135                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
136                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
137                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
138                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
139                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
140                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
141         } {
142                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
143                 expected := _case.ExpectedReduced
144                 if expected < minDialTimeout {
145                         expected = minDialTimeout
146                 }
147                 if reduced != expected {
148                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
149                 }
150         }
151 }
152
153 func TestUTPRawConn(t *testing.T) {
154         l, err := utp.NewSocket("udp", "")
155         if err != nil {
156                 t.Fatal(err)
157         }
158         defer l.Close()
159         go func() {
160                 for {
161                         _, err := l.Accept()
162                         if err != nil {
163                                 break
164                         }
165                 }
166         }()
167         // Connect a UTP peer to see if the RawConn will still work.
168         s, _ := utp.NewSocket("udp", "")
169         defer s.Close()
170         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
171         if err != nil {
172                 t.Fatalf("error dialing utp listener: %s", err)
173         }
174         defer utpPeer.Close()
175         peer, err := net.ListenPacket("udp", ":0")
176         if err != nil {
177                 t.Fatal(err)
178         }
179         defer peer.Close()
180
181         msgsReceived := 0
182         // How many messages to send. I've set this to double the channel buffer
183         // size in the raw packetConn.
184         const N = 200
185         readerStopped := make(chan struct{})
186         // The reader goroutine.
187         go func() {
188                 defer close(readerStopped)
189                 b := make([]byte, 500)
190                 for i := 0; i < N; i++ {
191                         n, _, err := l.ReadFrom(b)
192                         if err != nil {
193                                 t.Fatalf("error reading from raw conn: %s", err)
194                         }
195                         msgsReceived++
196                         var d int
197                         fmt.Sscan(string(b[:n]), &d)
198                         if d != i {
199                                 log.Printf("got wrong number: expected %d, got %d", i, d)
200                         }
201                 }
202         }()
203         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
204         if err != nil {
205                 t.Fatal(err)
206         }
207         for i := 0; i < N; i++ {
208                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
209                 if err != nil {
210                         t.Fatal(err)
211                 }
212                 time.Sleep(time.Microsecond)
213         }
214         select {
215         case <-readerStopped:
216         case <-time.After(time.Second):
217                 t.Fatal("reader timed out")
218         }
219         if msgsReceived != N {
220                 t.Fatalf("messages received: %d", msgsReceived)
221         }
222 }
223
224 func TestTwoClientsArbitraryPorts(t *testing.T) {
225         for i := 0; i < 2; i++ {
226                 cl, err := NewClient(&TestingConfig)
227                 if err != nil {
228                         t.Fatal(err)
229                 }
230                 defer cl.Close()
231         }
232 }
233
234 func TestAddDropManyTorrents(t *testing.T) {
235         cl, _ := NewClient(&TestingConfig)
236         defer cl.Close()
237         for i := range iter.N(1000) {
238                 var spec TorrentSpec
239                 binary.PutVarint(spec.InfoHash[:], int64(i))
240                 tt, new, err := cl.AddTorrentSpec(&spec)
241                 if err != nil {
242                         t.Error(err)
243                 }
244                 if !new {
245                         t.FailNow()
246                 }
247                 defer tt.Drop()
248         }
249 }
250
251 func TestClientTransfer(t *testing.T) {
252         greetingTempDir, mi := testutil.GreetingTestTorrent()
253         defer os.RemoveAll(greetingTempDir)
254         cfg := TestingConfig
255         cfg.Seed = true
256         cfg.DataDir = greetingTempDir
257         seeder, err := NewClient(&cfg)
258         if err != nil {
259                 t.Fatal(err)
260         }
261         defer seeder.Close()
262         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
263         leecherDataDir, err := ioutil.TempDir("", "")
264         if err != nil {
265                 t.Fatal(err)
266         }
267         defer os.RemoveAll(leecherDataDir)
268         // cfg.TorrentDataOpener = func(info *metainfo.Info) (data.Data, error) {
269         //      return blob.TorrentData(info, leecherDataDir), nil
270         // }
271         // blobStore := blob.NewStore(leecherDataDir)
272         // cfg.TorrentDataOpener = func(info *metainfo.Info) Data {
273         //      return blobStore.OpenTorrent(info)
274         // }
275         cfg.TorrentDataOpener = func() TorrentDataOpener {
276                 fc, err := filecache.NewCache(leecherDataDir)
277                 require.NoError(t, err)
278                 store := pieceStore.New(fileCacheDataBackend.New(fc))
279                 return func(mi *metainfo.Info) Data {
280                         return store.OpenTorrentData(mi)
281                 }
282         }()
283         leecher, _ := NewClient(&cfg)
284         defer leecher.Close()
285         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
286                 ret = TorrentSpecFromMetaInfo(mi)
287                 ret.ChunkSize = 2
288                 return
289         }())
290         // TODO: The piece state publishing is kinda jammed in here until I have a
291         // more thorough test.
292         go func() {
293                 s := leecherGreeting.pieceStateChanges.Subscribe()
294                 defer s.Close()
295                 for i := range s.Values {
296                         log.Print(i)
297                 }
298                 log.Print("finished")
299         }()
300         leecherGreeting.AddPeers([]Peer{
301                 Peer{
302                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
303                         Port: missinggo.AddrPort(seeder.ListenAddr()),
304                 },
305         })
306         r := leecherGreeting.NewReader()
307         defer r.Close()
308         _greeting, err := ioutil.ReadAll(r)
309         if err != nil {
310                 t.Fatalf("%q %s", string(_greeting), err)
311         }
312         greeting := string(_greeting)
313         if greeting != testutil.GreetingFileContents {
314                 t.Fatal(":(")
315         }
316 }
317
318 // Check that after completing leeching, a leecher transitions to a seeding
319 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
320 func TestSeedAfterDownloading(t *testing.T) {
321         greetingTempDir, mi := testutil.GreetingTestTorrent()
322         defer os.RemoveAll(greetingTempDir)
323         cfg := TestingConfig
324         cfg.Seed = true
325         cfg.DataDir = greetingTempDir
326         seeder, err := NewClient(&cfg)
327         defer seeder.Close()
328         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
329         cfg.DataDir, err = ioutil.TempDir("", "")
330         require.NoError(t, err)
331         defer os.RemoveAll(cfg.DataDir)
332         leecher, _ := NewClient(&cfg)
333         defer leecher.Close()
334         cfg.Seed = false
335         cfg.TorrentDataOpener = nil
336         cfg.DataDir, err = ioutil.TempDir("", "")
337         require.NoError(t, err)
338         defer os.RemoveAll(cfg.DataDir)
339         leecherLeecher, _ := NewClient(&cfg)
340         defer leecherLeecher.Close()
341         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
342                 ret = TorrentSpecFromMetaInfo(mi)
343                 ret.ChunkSize = 2
344                 return
345         }())
346         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
347                 ret = TorrentSpecFromMetaInfo(mi)
348                 ret.ChunkSize = 3
349                 return
350         }())
351         // Simultaneously DownloadAll in Leecher, and read the contents
352         // consecutively in LeecherLeecher. This non-deterministically triggered a
353         // case where the leecher wouldn't unchoke the LeecherLeecher.
354         var wg sync.WaitGroup
355         wg.Add(1)
356         go func() {
357                 defer wg.Done()
358                 r := llg.NewReader()
359                 defer r.Close()
360                 b, err := ioutil.ReadAll(r)
361                 require.NoError(t, err)
362                 require.EqualValues(t, testutil.GreetingFileContents, b)
363         }()
364         leecherGreeting.AddPeers([]Peer{
365                 Peer{
366                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
367                         Port: missinggo.AddrPort(seeder.ListenAddr()),
368                 },
369                 Peer{
370                         IP:   missinggo.AddrIP(leecherLeecher.ListenAddr()),
371                         Port: missinggo.AddrPort(leecherLeecher.ListenAddr()),
372                 },
373         })
374         wg.Add(1)
375         go func() {
376                 defer wg.Done()
377                 leecherGreeting.DownloadAll()
378                 leecher.WaitAll()
379         }()
380         wg.Wait()
381 }
382
383 func TestReadaheadPieces(t *testing.T) {
384         for _, case_ := range []struct {
385                 readaheadBytes, pieceLength int64
386                 readaheadPieces             int
387         }{
388                 {5 * 1024 * 1024, 256 * 1024, 19},
389                 {5 * 1024 * 1024, 5 * 1024 * 1024, 1},
390                 {5*1024*1024 - 1, 5 * 1024 * 1024, 1},
391                 {5 * 1024 * 1024, 5*1024*1024 - 1, 2},
392                 {0, 5 * 1024 * 1024, 0},
393                 {5 * 1024 * 1024, 1048576, 4},
394         } {
395                 pieces := readaheadPieces(case_.readaheadBytes, case_.pieceLength)
396                 assert.Equal(t, case_.readaheadPieces, pieces, "%v", case_)
397         }
398 }
399
400 func TestMergingTrackersByAddingSpecs(t *testing.T) {
401         cl, _ := NewClient(&TestingConfig)
402         defer cl.Close()
403         spec := TorrentSpec{}
404         T, new, _ := cl.AddTorrentSpec(&spec)
405         if !new {
406                 t.FailNow()
407         }
408         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
409         _, new, _ = cl.AddTorrentSpec(&spec)
410         if new {
411                 t.FailNow()
412         }
413         assert.EqualValues(t, T.Trackers[0][0].URL(), "http://a")
414         assert.EqualValues(t, T.Trackers[1][0].URL(), "udp://b")
415 }
416
417 type badData struct{}
418
419 func (me badData) Close() {}
420
421 func (me badData) WriteAt(b []byte, off int64) (int, error) {
422         return 0, nil
423 }
424
425 func (me badData) WriteSectionTo(w io.Writer, off, n int64) (int64, error) {
426         return 0, nil
427 }
428
429 func (me badData) PieceComplete(piece int) bool {
430         return true
431 }
432
433 func (me badData) PieceCompleted(piece int) error {
434         return nil
435 }
436
437 func (me badData) ReadAt(b []byte, off int64) (n int, err error) {
438         if off >= 5 {
439                 err = io.EOF
440                 return
441         }
442         n = copy(b, []byte("hello")[off:])
443         return
444 }
445
446 // We read from a piece which is marked completed, but is missing data.
447 func TestCompletedPieceWrongSize(t *testing.T) {
448         cfg := TestingConfig
449         cfg.TorrentDataOpener = func(*metainfo.Info) Data {
450                 return badData{}
451         }
452         cl, _ := NewClient(&cfg)
453         defer cl.Close()
454         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
455                 Info: &metainfo.InfoEx{
456                         Info: metainfo.Info{
457                                 PieceLength: 15,
458                                 Pieces:      make([]byte, 20),
459                                 Files: []metainfo.FileInfo{
460                                         metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
461                                 },
462                         },
463                 },
464         })
465         if err != nil {
466                 t.Fatal(err)
467         }
468         if !new {
469                 t.Fatal("expected new")
470         }
471         r := tt.NewReader()
472         defer r.Close()
473         b := make([]byte, 20)
474         n, err := io.ReadFull(r, b)
475         if n != 5 || err != io.ErrUnexpectedEOF {
476                 t.Fatal(n, err)
477         }
478         defer tt.Drop()
479 }
480
481 func BenchmarkAddLargeTorrent(b *testing.B) {
482         cfg := TestingConfig
483         cfg.DisableTCP = true
484         cfg.DisableUTP = true
485         cfg.ListenAddr = "redonk"
486         cl, _ := NewClient(&cfg)
487         defer cl.Close()
488         for range iter.N(b.N) {
489                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
490                 if err != nil {
491                         b.Fatal(err)
492                 }
493                 t.Drop()
494         }
495 }
496
497 func TestResponsive(t *testing.T) {
498         seederDataDir, mi := testutil.GreetingTestTorrent()
499         defer os.RemoveAll(seederDataDir)
500         cfg := TestingConfig
501         cfg.Seed = true
502         cfg.DataDir = seederDataDir
503         seeder, err := NewClient(&cfg)
504         require.Nil(t, err)
505         defer seeder.Close()
506         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
507         leecherDataDir, err := ioutil.TempDir("", "")
508         require.Nil(t, err)
509         defer os.RemoveAll(leecherDataDir)
510         cfg = TestingConfig
511         cfg.DataDir = leecherDataDir
512         leecher, err := NewClient(&cfg)
513         require.Nil(t, err)
514         defer leecher.Close()
515         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
516                 ret = TorrentSpecFromMetaInfo(mi)
517                 ret.ChunkSize = 2
518                 return
519         }())
520         leecherTorrent.AddPeers([]Peer{
521                 Peer{
522                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
523                         Port: missinggo.AddrPort(seeder.ListenAddr()),
524                 },
525         })
526         reader := leecherTorrent.NewReader()
527         reader.SetReadahead(0)
528         reader.SetResponsive()
529         b := make([]byte, 2)
530         _, err = reader.ReadAt(b, 3)
531         assert.Nil(t, err)
532         assert.EqualValues(t, "lo", string(b))
533         n, err := reader.ReadAt(b, 11)
534         assert.Nil(t, err)
535         assert.EqualValues(t, 2, n)
536         assert.EqualValues(t, "d\n", string(b))
537 }
538
539 func TestDHTInheritBlocklist(t *testing.T) {
540         ipl := iplist.New(nil)
541         require.NotNil(t, ipl)
542         cl, err := NewClient(&Config{
543                 IPBlocklist: iplist.New(nil),
544                 DHTConfig:   &dht.ServerConfig{},
545         })
546         require.NoError(t, err)
547         defer cl.Close()
548         require.Equal(t, ipl, cl.DHT().IPBlocklist())
549 }
550
551 // Check that stuff is merged in subsequent AddTorrentSpec for the same
552 // infohash.
553 func TestAddTorrentSpecMerging(t *testing.T) {
554         cl, err := NewClient(&TestingConfig)
555         require.NoError(t, err)
556         defer cl.Close()
557         dir, mi := testutil.GreetingTestTorrent()
558         defer os.RemoveAll(dir)
559         var ts TorrentSpec
560         missinggo.CopyExact(&ts.InfoHash, mi.Info.Hash)
561         tt, new, err := cl.AddTorrentSpec(&ts)
562         require.NoError(t, err)
563         require.True(t, new)
564         require.Nil(t, tt.Info())
565         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
566         require.NoError(t, err)
567         require.False(t, new)
568         require.NotNil(t, tt.Info())
569 }
570
571 // Check that torrent Info is obtained from the metainfo file cache.
572 func TestAddTorrentMetainfoInCache(t *testing.T) {
573         cfg := TestingConfig
574         cfg.DisableMetainfoCache = false
575         cfg.ConfigDir, _ = ioutil.TempDir(os.TempDir(), "")
576         defer os.RemoveAll(cfg.ConfigDir)
577         cl, err := NewClient(&cfg)
578         require.NoError(t, err)
579         defer cl.Close()
580         dir, mi := testutil.GreetingTestTorrent()
581         defer os.RemoveAll(dir)
582         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
583         require.NoError(t, err)
584         require.True(t, new)
585         require.NotNil(t, tt.Info())
586         _, err = os.Stat(filepath.Join(cfg.ConfigDir, "torrents", fmt.Sprintf("%x.torrent", mi.Info.Hash)))
587         require.NoError(t, err)
588         // Contains only the infohash.
589         var ts TorrentSpec
590         missinggo.CopyExact(&ts.InfoHash, mi.Info.Hash)
591         _, ok := cl.Torrent(ts.InfoHash)
592         require.True(t, ok)
593         tt.Drop()
594         _, ok = cl.Torrent(ts.InfoHash)
595         require.False(t, ok)
596         tt, new, err = cl.AddTorrentSpec(&ts)
597         require.NoError(t, err)
598         require.True(t, new)
599         // Obtained from the metainfo cache.
600         require.NotNil(t, tt.Info())
601 }
602
603 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
604         dir, mi := testutil.GreetingTestTorrent()
605         os.RemoveAll(dir)
606         cl, _ := NewClient(&TestingConfig)
607         defer cl.Close()
608         var ts TorrentSpec
609         CopyExact(&ts.InfoHash, mi.Info.Hash)
610         tt, _, _ := cl.AddTorrentSpec(&ts)
611         tt.Drop()
612         assert.EqualValues(t, 0, len(cl.Torrents()))
613         select {
614         case <-tt.GotInfo():
615                 t.FailNow()
616         default:
617         }
618 }