]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Use go-libutp if cgo is enabled
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "math/rand"
11         "net"
12         "os"
13         "path/filepath"
14         "strings"
15         "sync"
16         "testing"
17         "time"
18
19         "github.com/anacrolix/dht"
20         _ "github.com/anacrolix/envpprof"
21         "github.com/anacrolix/missinggo"
22         "github.com/anacrolix/missinggo/filecache"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/bradfitz/iter"
25         "github.com/stretchr/testify/assert"
26         "github.com/stretchr/testify/require"
27         "golang.org/x/time/rate"
28
29         "github.com/anacrolix/torrent/bencode"
30         "github.com/anacrolix/torrent/internal/testutil"
31         "github.com/anacrolix/torrent/iplist"
32         "github.com/anacrolix/torrent/metainfo"
33         "github.com/anacrolix/torrent/storage"
34 )
35
36 func init() {
37         log.SetFlags(log.LstdFlags | log.Llongfile)
38 }
39
40 func TestingConfig() *Config {
41         return &Config{
42                 ListenAddr:      "localhost:0",
43                 NoDHT:           true,
44                 DisableTrackers: true,
45                 DataDir: func() string {
46                         ret, err := ioutil.TempDir("", "")
47                         if err != nil {
48                                 panic(err)
49                         }
50                         return ret
51                 }(),
52                 DHTConfig: dht.ServerConfig{
53                         NoDefaultBootstrap: true,
54                 },
55                 Debug: true,
56         }
57 }
58
59 func TestClientDefault(t *testing.T) {
60         cl, err := NewClient(TestingConfig())
61         require.NoError(t, err)
62         cl.Close()
63 }
64
65 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
66         cfg := TestingConfig()
67         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
68         require.NoError(t, err)
69         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
70         defer ci.Close()
71         cfg.DefaultStorage = ci
72         cl, err := NewClient(cfg)
73         require.NoError(t, err)
74         cl.Close()
75         // And again, https://github.com/anacrolix/torrent/issues/158
76         cl, err = NewClient(cfg)
77         require.NoError(t, err)
78         cl.Close()
79 }
80
81 func TestAddDropTorrent(t *testing.T) {
82         cl, err := NewClient(TestingConfig())
83         require.NoError(t, err)
84         defer cl.Close()
85         dir, mi := testutil.GreetingTestTorrent()
86         defer os.RemoveAll(dir)
87         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
88         require.NoError(t, err)
89         assert.True(t, new)
90         tt.SetMaxEstablishedConns(0)
91         tt.SetMaxEstablishedConns(1)
92         tt.Drop()
93 }
94
95 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
96         t.SkipNow()
97 }
98
99 func TestAddTorrentNoUsableURLs(t *testing.T) {
100         t.SkipNow()
101 }
102
103 func TestAddPeersToUnknownTorrent(t *testing.T) {
104         t.SkipNow()
105 }
106
107 func TestPieceHashSize(t *testing.T) {
108         if pieceHash.Size() != 20 {
109                 t.FailNow()
110         }
111 }
112
113 func TestTorrentInitialState(t *testing.T) {
114         dir, mi := testutil.GreetingTestTorrent()
115         defer os.RemoveAll(dir)
116         tor := &Torrent{
117                 infoHash:          mi.HashInfoBytes(),
118                 pieceStateChanges: pubsub.NewPubSub(),
119         }
120         tor.chunkSize = 2
121         tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion("/dev/null", storage.NewMapPieceCompletion()))
122         // Needed to lock for asynchronous piece verification.
123         tor.cl = new(Client)
124         err := tor.setInfoBytes(mi.InfoBytes)
125         require.NoError(t, err)
126         require.Len(t, tor.pieces, 3)
127         tor.pendAllChunkSpecs(0)
128         tor.cl.mu.Lock()
129         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
130         tor.cl.mu.Unlock()
131         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
132 }
133
134 func TestUnmarshalPEXMsg(t *testing.T) {
135         var m peerExchangeMessage
136         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
137                 t.Fatal(err)
138         }
139         if len(m.Added) != 2 {
140                 t.FailNow()
141         }
142         if m.Added[0].Port != 0x506 {
143                 t.FailNow()
144         }
145 }
146
147 func TestReducedDialTimeout(t *testing.T) {
148         for _, _case := range []struct {
149                 Max             time.Duration
150                 HalfOpenLimit   int
151                 PendingPeers    int
152                 ExpectedReduced time.Duration
153         }{
154                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
155                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
156                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
157                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
158                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
159                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
160         } {
161                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
162                 expected := _case.ExpectedReduced
163                 if expected < minDialTimeout {
164                         expected = minDialTimeout
165                 }
166                 if reduced != expected {
167                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
168                 }
169         }
170 }
171
172 func TestUTPRawConn(t *testing.T) {
173         l, err := NewUtpSocket("udp", "")
174         if err != nil {
175                 t.Fatal(err)
176         }
177         defer l.Close()
178         go func() {
179                 for {
180                         _, err := l.Accept()
181                         if err != nil {
182                                 break
183                         }
184                 }
185         }()
186         // Connect a UTP peer to see if the RawConn will still work.
187         s, _ := NewUtpSocket("udp", "")
188         defer s.Close()
189         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
190         if err != nil {
191                 t.Fatalf("error dialing utp listener: %s", err)
192         }
193         defer utpPeer.Close()
194         peer, err := net.ListenPacket("udp", ":0")
195         if err != nil {
196                 t.Fatal(err)
197         }
198         defer peer.Close()
199
200         msgsReceived := 0
201         // How many messages to send. I've set this to double the channel buffer
202         // size in the raw packetConn.
203         const N = 200
204         readerStopped := make(chan struct{})
205         // The reader goroutine.
206         go func() {
207                 defer close(readerStopped)
208                 b := make([]byte, 500)
209                 for i := 0; i < N; i++ {
210                         n, _, err := l.ReadFrom(b)
211                         if err != nil {
212                                 t.Fatalf("error reading from raw conn: %s", err)
213                         }
214                         msgsReceived++
215                         var d int
216                         fmt.Sscan(string(b[:n]), &d)
217                         if d != i {
218                                 log.Printf("got wrong number: expected %d, got %d", i, d)
219                         }
220                 }
221         }()
222         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
223         if err != nil {
224                 t.Fatal(err)
225         }
226         for i := 0; i < N; i++ {
227                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
228                 if err != nil {
229                         t.Fatal(err)
230                 }
231                 time.Sleep(time.Microsecond)
232         }
233         select {
234         case <-readerStopped:
235         case <-time.After(time.Second):
236                 t.Fatal("reader timed out")
237         }
238         if msgsReceived != N {
239                 t.Fatalf("messages received: %d", msgsReceived)
240         }
241 }
242
243 func TestTwoClientsArbitraryPorts(t *testing.T) {
244         for i := 0; i < 2; i++ {
245                 cl, err := NewClient(TestingConfig())
246                 if err != nil {
247                         t.Fatal(err)
248                 }
249                 defer cl.Close()
250         }
251 }
252
253 func TestAddDropManyTorrents(t *testing.T) {
254         cl, err := NewClient(TestingConfig())
255         require.NoError(t, err)
256         defer cl.Close()
257         for i := range iter.N(1000) {
258                 var spec TorrentSpec
259                 binary.PutVarint(spec.InfoHash[:], int64(i))
260                 tt, new, err := cl.AddTorrentSpec(&spec)
261                 assert.NoError(t, err)
262                 assert.True(t, new)
263                 defer tt.Drop()
264         }
265 }
266
267 type FileCacheClientStorageFactoryParams struct {
268         Capacity    int64
269         SetCapacity bool
270         Wrapper     func(*filecache.Cache) storage.ClientImpl
271 }
272
273 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
274         return func(dataDir string) storage.ClientImpl {
275                 fc, err := filecache.NewCache(dataDir)
276                 if err != nil {
277                         panic(err)
278                 }
279                 if ps.SetCapacity {
280                         fc.SetCapacity(ps.Capacity)
281                 }
282                 return ps.Wrapper(fc)
283         }
284 }
285
286 type storageFactory func(string) storage.ClientImpl
287
288 func TestClientTransferDefault(t *testing.T) {
289         testClientTransfer(t, testClientTransferParams{
290                 ExportClientStatus: true,
291                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
292                         Wrapper: fileCachePieceResourceStorage,
293                 }),
294         })
295 }
296
297 func TestClientTransferRateLimitedUpload(t *testing.T) {
298         started := time.Now()
299         testClientTransfer(t, testClientTransferParams{
300                 // We are uploading 13 bytes (the length of the greeting torrent). The
301                 // chunks are 2 bytes in length. Then the smallest burst we can run
302                 // with is 2. Time taken is (13-burst)/rate.
303                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
304         })
305         require.True(t, time.Since(started) > time.Second)
306 }
307
308 func TestClientTransferRateLimitedDownload(t *testing.T) {
309         testClientTransfer(t, testClientTransferParams{
310                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
311         })
312 }
313
314 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
315         return storage.NewResourcePieces(fc.AsResourceProvider())
316 }
317
318 func TestClientTransferSmallCache(t *testing.T) {
319         testClientTransfer(t, testClientTransferParams{
320                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
321                         SetCapacity: true,
322                         // Going below the piece length means it can't complete a piece so
323                         // that it can be hashed.
324                         Capacity: 5,
325                         Wrapper:  fileCachePieceResourceStorage,
326                 }),
327                 SetReadahead: true,
328                 // Can't readahead too far or the cache will thrash and drop data we
329                 // thought we had.
330                 Readahead:          0,
331                 ExportClientStatus: true,
332         })
333 }
334
335 func TestClientTransferVarious(t *testing.T) {
336         // Leecher storage
337         for _, ls := range []storageFactory{
338                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
339                         Wrapper: fileCachePieceResourceStorage,
340                 }),
341                 storage.NewBoltDB,
342         } {
343                 // Seeder storage
344                 for _, ss := range []func(string) storage.ClientImpl{
345                         storage.NewFile,
346                         storage.NewMMap,
347                 } {
348                         for _, responsive := range []bool{false, true} {
349                                 testClientTransfer(t, testClientTransferParams{
350                                         Responsive:     responsive,
351                                         SeederStorage:  ss,
352                                         LeecherStorage: ls,
353                                 })
354                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
355                                         testClientTransfer(t, testClientTransferParams{
356                                                 SeederStorage:  ss,
357                                                 Responsive:     responsive,
358                                                 SetReadahead:   true,
359                                                 Readahead:      readahead,
360                                                 LeecherStorage: ls,
361                                         })
362                                 }
363                         }
364                 }
365         }
366 }
367
368 type testClientTransferParams struct {
369         Responsive                 bool
370         Readahead                  int64
371         SetReadahead               bool
372         ExportClientStatus         bool
373         LeecherStorage             func(string) storage.ClientImpl
374         SeederStorage              func(string) storage.ClientImpl
375         SeederUploadRateLimiter    *rate.Limiter
376         LeecherDownloadRateLimiter *rate.Limiter
377 }
378
379 // Creates a seeder and a leecher, and ensures the data transfers when a read
380 // is attempted on the leecher.
381 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
382         greetingTempDir, mi := testutil.GreetingTestTorrent()
383         defer os.RemoveAll(greetingTempDir)
384         // Create seeder and a Torrent.
385         cfg := TestingConfig()
386         cfg.Seed = true
387         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
388         // cfg.ListenAddr = "localhost:4000"
389         if ps.SeederStorage != nil {
390                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
391                 defer cfg.DefaultStorage.Close()
392         } else {
393                 cfg.DataDir = greetingTempDir
394         }
395         seeder, err := NewClient(cfg)
396         require.NoError(t, err)
397         defer seeder.Close()
398         if ps.ExportClientStatus {
399                 testutil.ExportStatusWriter(seeder, "s")
400         }
401         // seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
402         _, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
403         require.NoError(t, err)
404         assert.True(t, new)
405         // Create leecher and a Torrent.
406         leecherDataDir, err := ioutil.TempDir("", "")
407         require.NoError(t, err)
408         defer os.RemoveAll(leecherDataDir)
409         if ps.LeecherStorage == nil {
410                 cfg.DataDir = leecherDataDir
411         } else {
412                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
413         }
414         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
415         // cfg.ListenAddr = "localhost:4001"
416         leecher, err := NewClient(cfg)
417         require.NoError(t, err)
418         defer leecher.Close()
419         if ps.ExportClientStatus {
420                 testutil.ExportStatusWriter(leecher, "l")
421         }
422         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
423                 ret = TorrentSpecFromMetaInfo(mi)
424                 ret.ChunkSize = 2
425                 return
426         }())
427         require.NoError(t, err)
428         assert.True(t, new)
429         // Now do some things with leecher and seeder.
430         addClientPeer(leecherGreeting, seeder)
431         r := leecherGreeting.NewReader()
432         defer r.Close()
433         if ps.Responsive {
434                 r.SetResponsive()
435         }
436         if ps.SetReadahead {
437                 r.SetReadahead(ps.Readahead)
438         }
439         assertReadAllGreeting(t, r)
440         // After one read through, we can assume certain torrent statistics.
441         // These are not a strict requirement. It is however interesting to
442         // follow.
443         // t.Logf("%#v", seederTorrent.Stats())
444         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
445         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
446         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
447         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
448         // Read through again for the cases where the torrent data size exceeds
449         // the size of the cache.
450         assertReadAllGreeting(t, r)
451 }
452
453 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
454         pos, err := r.Seek(0, os.SEEK_SET)
455         assert.NoError(t, err)
456         assert.EqualValues(t, 0, pos)
457         _greeting, err := ioutil.ReadAll(r)
458         assert.NoError(t, err)
459         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
460 }
461
462 // Check that after completing leeching, a leecher transitions to a seeding
463 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
464 func TestSeedAfterDownloading(t *testing.T) {
465         greetingTempDir, mi := testutil.GreetingTestTorrent()
466         defer os.RemoveAll(greetingTempDir)
467         cfg := TestingConfig()
468         cfg.Seed = true
469         cfg.DataDir = greetingTempDir
470         seeder, err := NewClient(cfg)
471         require.NoError(t, err)
472         defer seeder.Close()
473         testutil.ExportStatusWriter(seeder, "s")
474         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
475         cfg.DataDir, err = ioutil.TempDir("", "")
476         require.NoError(t, err)
477         defer os.RemoveAll(cfg.DataDir)
478         leecher, err := NewClient(cfg)
479         require.NoError(t, err)
480         defer leecher.Close()
481         testutil.ExportStatusWriter(leecher, "l")
482         cfg.Seed = false
483         // cfg.TorrentDataOpener = nil
484         cfg.DataDir, err = ioutil.TempDir("", "")
485         require.NoError(t, err)
486         defer os.RemoveAll(cfg.DataDir)
487         leecherLeecher, _ := NewClient(cfg)
488         defer leecherLeecher.Close()
489         testutil.ExportStatusWriter(leecherLeecher, "ll")
490         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
491                 ret = TorrentSpecFromMetaInfo(mi)
492                 ret.ChunkSize = 2
493                 return
494         }())
495         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
496                 ret = TorrentSpecFromMetaInfo(mi)
497                 ret.ChunkSize = 3
498                 return
499         }())
500         // Simultaneously DownloadAll in Leecher, and read the contents
501         // consecutively in LeecherLeecher. This non-deterministically triggered a
502         // case where the leecher wouldn't unchoke the LeecherLeecher.
503         var wg sync.WaitGroup
504         wg.Add(1)
505         go func() {
506                 defer wg.Done()
507                 r := llg.NewReader()
508                 defer r.Close()
509                 b, err := ioutil.ReadAll(r)
510                 require.NoError(t, err)
511                 assert.EqualValues(t, testutil.GreetingFileContents, b)
512         }()
513         addClientPeer(leecherGreeting, seeder)
514         addClientPeer(leecherGreeting, leecherLeecher)
515         wg.Add(1)
516         go func() {
517                 defer wg.Done()
518                 leecherGreeting.DownloadAll()
519                 leecher.WaitAll()
520         }()
521         wg.Wait()
522 }
523
524 func TestMergingTrackersByAddingSpecs(t *testing.T) {
525         cl, err := NewClient(TestingConfig())
526         require.NoError(t, err)
527         defer cl.Close()
528         spec := TorrentSpec{}
529         T, new, _ := cl.AddTorrentSpec(&spec)
530         if !new {
531                 t.FailNow()
532         }
533         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
534         _, new, _ = cl.AddTorrentSpec(&spec)
535         assert.False(t, new)
536         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
537         // Because trackers are disabled in TestingConfig.
538         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
539 }
540
541 type badStorage struct{}
542
543 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
544         return bs, nil
545 }
546
547 func (bs badStorage) Close() error {
548         return nil
549 }
550
551 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
552         return badStoragePiece{p}
553 }
554
555 type badStoragePiece struct {
556         p metainfo.Piece
557 }
558
559 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
560         return 0, nil
561 }
562
563 func (p badStoragePiece) GetIsComplete() bool {
564         return true
565 }
566
567 func (p badStoragePiece) MarkComplete() error {
568         return errors.New("psyyyyyyyche")
569 }
570
571 func (p badStoragePiece) MarkNotComplete() error {
572         return errors.New("psyyyyyyyche")
573 }
574
575 func (p badStoragePiece) randomlyTruncatedDataString() string {
576         return "hello, world\n"[:rand.Intn(14)]
577 }
578
579 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
580         r := strings.NewReader(p.randomlyTruncatedDataString())
581         return r.ReadAt(b, off+p.p.Offset())
582 }
583
584 // We read from a piece which is marked completed, but is missing data.
585 func TestCompletedPieceWrongSize(t *testing.T) {
586         cfg := TestingConfig()
587         cfg.DefaultStorage = badStorage{}
588         cl, err := NewClient(cfg)
589         require.NoError(t, err)
590         defer cl.Close()
591         info := metainfo.Info{
592                 PieceLength: 15,
593                 Pieces:      make([]byte, 20),
594                 Files: []metainfo.FileInfo{
595                         {Path: []string{"greeting"}, Length: 13},
596                 },
597         }
598         b, err := bencode.Marshal(info)
599         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
600                 InfoBytes: b,
601                 InfoHash:  metainfo.HashBytes(b),
602         })
603         require.NoError(t, err)
604         defer tt.Drop()
605         assert.True(t, new)
606         r := tt.NewReader()
607         defer r.Close()
608         b, err = ioutil.ReadAll(r)
609         assert.Len(t, b, 13)
610         assert.NoError(t, err)
611 }
612
613 func BenchmarkAddLargeTorrent(b *testing.B) {
614         cfg := TestingConfig()
615         cfg.DisableTCP = true
616         cfg.DisableUTP = true
617         cfg.ListenAddr = "redonk"
618         cl, err := NewClient(cfg)
619         require.NoError(b, err)
620         defer cl.Close()
621         for range iter.N(b.N) {
622                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
623                 if err != nil {
624                         b.Fatal(err)
625                 }
626                 t.Drop()
627         }
628 }
629
630 func TestResponsive(t *testing.T) {
631         seederDataDir, mi := testutil.GreetingTestTorrent()
632         defer os.RemoveAll(seederDataDir)
633         cfg := TestingConfig()
634         cfg.Seed = true
635         cfg.DataDir = seederDataDir
636         seeder, err := NewClient(cfg)
637         require.Nil(t, err)
638         defer seeder.Close()
639         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
640         leecherDataDir, err := ioutil.TempDir("", "")
641         require.Nil(t, err)
642         defer os.RemoveAll(leecherDataDir)
643         cfg = TestingConfig()
644         cfg.DataDir = leecherDataDir
645         leecher, err := NewClient(cfg)
646         require.Nil(t, err)
647         defer leecher.Close()
648         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
649                 ret = TorrentSpecFromMetaInfo(mi)
650                 ret.ChunkSize = 2
651                 return
652         }())
653         addClientPeer(leecherTorrent, seeder)
654         reader := leecherTorrent.NewReader()
655         defer reader.Close()
656         reader.SetReadahead(0)
657         reader.SetResponsive()
658         b := make([]byte, 2)
659         _, err = reader.Seek(3, os.SEEK_SET)
660         require.NoError(t, err)
661         _, err = io.ReadFull(reader, b)
662         assert.Nil(t, err)
663         assert.EqualValues(t, "lo", string(b))
664         _, err = reader.Seek(11, os.SEEK_SET)
665         require.NoError(t, err)
666         n, err := io.ReadFull(reader, b)
667         assert.Nil(t, err)
668         assert.EqualValues(t, 2, n)
669         assert.EqualValues(t, "d\n", string(b))
670 }
671
672 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
673         seederDataDir, mi := testutil.GreetingTestTorrent()
674         defer os.RemoveAll(seederDataDir)
675         cfg := TestingConfig()
676         cfg.Seed = true
677         cfg.DataDir = seederDataDir
678         seeder, err := NewClient(cfg)
679         require.Nil(t, err)
680         defer seeder.Close()
681         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
682         leecherDataDir, err := ioutil.TempDir("", "")
683         require.Nil(t, err)
684         defer os.RemoveAll(leecherDataDir)
685         cfg = TestingConfig()
686         cfg.DataDir = leecherDataDir
687         leecher, err := NewClient(cfg)
688         require.Nil(t, err)
689         defer leecher.Close()
690         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
691                 ret = TorrentSpecFromMetaInfo(mi)
692                 ret.ChunkSize = 2
693                 return
694         }())
695         addClientPeer(leecherTorrent, seeder)
696         reader := leecherTorrent.NewReader()
697         defer reader.Close()
698         reader.SetReadahead(0)
699         reader.SetResponsive()
700         b := make([]byte, 2)
701         _, err = reader.Seek(3, os.SEEK_SET)
702         require.NoError(t, err)
703         _, err = io.ReadFull(reader, b)
704         assert.Nil(t, err)
705         assert.EqualValues(t, "lo", string(b))
706         go leecherTorrent.Drop()
707         _, err = reader.Seek(11, os.SEEK_SET)
708         require.NoError(t, err)
709         n, err := reader.Read(b)
710         assert.EqualError(t, err, "torrent closed")
711         assert.EqualValues(t, 0, n)
712 }
713
714 func TestDHTInheritBlocklist(t *testing.T) {
715         ipl := iplist.New(nil)
716         require.NotNil(t, ipl)
717         cfg := TestingConfig()
718         cfg.IPBlocklist = ipl
719         cfg.NoDHT = false
720         cl, err := NewClient(cfg)
721         require.NoError(t, err)
722         defer cl.Close()
723         require.Equal(t, ipl, cl.DHT().IPBlocklist())
724 }
725
726 // Check that stuff is merged in subsequent AddTorrentSpec for the same
727 // infohash.
728 func TestAddTorrentSpecMerging(t *testing.T) {
729         cl, err := NewClient(TestingConfig())
730         require.NoError(t, err)
731         defer cl.Close()
732         dir, mi := testutil.GreetingTestTorrent()
733         defer os.RemoveAll(dir)
734         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
735                 InfoHash: mi.HashInfoBytes(),
736         })
737         require.NoError(t, err)
738         require.True(t, new)
739         require.Nil(t, tt.Info())
740         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
741         require.NoError(t, err)
742         require.False(t, new)
743         require.NotNil(t, tt.Info())
744 }
745
746 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
747         dir, mi := testutil.GreetingTestTorrent()
748         os.RemoveAll(dir)
749         cl, _ := NewClient(TestingConfig())
750         defer cl.Close()
751         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
752                 InfoHash: mi.HashInfoBytes(),
753         })
754         tt.Drop()
755         assert.EqualValues(t, 0, len(cl.Torrents()))
756         select {
757         case <-tt.GotInfo():
758                 t.FailNow()
759         default:
760         }
761 }
762
763 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
764         for i := range iter.N(info.NumPieces()) {
765                 p := info.Piece(i)
766                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
767         }
768 }
769
770 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
771         fileCacheDir, err := ioutil.TempDir("", "")
772         require.NoError(t, err)
773         defer os.RemoveAll(fileCacheDir)
774         fileCache, err := filecache.NewCache(fileCacheDir)
775         require.NoError(t, err)
776         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
777         defer os.RemoveAll(greetingDataTempDir)
778         filePieceStore := csf(fileCache)
779         info, err := greetingMetainfo.UnmarshalInfo()
780         require.NoError(t, err)
781         ih := greetingMetainfo.HashInfoBytes()
782         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
783         require.NoError(t, err)
784         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
785         // require.Equal(t, len(testutil.GreetingFileContents), written)
786         // require.NoError(t, err)
787         for i := 0; i < info.NumPieces(); i++ {
788                 p := info.Piece(i)
789                 if alreadyCompleted {
790                         err := greetingData.Piece(p).MarkComplete()
791                         assert.NoError(t, err)
792                 }
793         }
794         cfg := TestingConfig()
795         // TODO: Disable network option?
796         cfg.DisableTCP = true
797         cfg.DisableUTP = true
798         cfg.DefaultStorage = filePieceStore
799         cl, err := NewClient(cfg)
800         require.NoError(t, err)
801         defer cl.Close()
802         tt, err := cl.AddTorrent(greetingMetainfo)
803         require.NoError(t, err)
804         psrs := tt.PieceStateRuns()
805         assert.Len(t, psrs, 1)
806         assert.EqualValues(t, 3, psrs[0].Length)
807         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
808         if alreadyCompleted {
809                 r := tt.NewReader()
810                 b, err := ioutil.ReadAll(r)
811                 assert.NoError(t, err)
812                 assert.EqualValues(t, testutil.GreetingFileContents, b)
813         }
814 }
815
816 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
817         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
818 }
819
820 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
821         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
822 }
823
824 func TestAddMetainfoWithNodes(t *testing.T) {
825         cfg := TestingConfig()
826         cfg.NoDHT = false
827         // For now, we want to just jam the nodes into the table, without
828         // verifying them first. Also the DHT code doesn't support mixing secure
829         // and insecure nodes if security is enabled (yet).
830         cfg.DHTConfig.NoSecurity = true
831         cl, err := NewClient(cfg)
832         require.NoError(t, err)
833         defer cl.Close()
834         assert.EqualValues(t, cl.DHT().NumNodes(), 0)
835         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
836         require.NoError(t, err)
837         assert.Len(t, tt.metainfo.AnnounceList, 5)
838         assert.EqualValues(t, 6, cl.DHT().NumNodes())
839 }
840
841 type testDownloadCancelParams struct {
842         ExportClientStatus        bool
843         SetLeecherStorageCapacity bool
844         LeecherStorageCapacity    int64
845         Cancel                    bool
846 }
847
848 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
849         greetingTempDir, mi := testutil.GreetingTestTorrent()
850         defer os.RemoveAll(greetingTempDir)
851         cfg := TestingConfig()
852         cfg.Seed = true
853         cfg.DataDir = greetingTempDir
854         seeder, err := NewClient(cfg)
855         require.NoError(t, err)
856         defer seeder.Close()
857         if ps.ExportClientStatus {
858                 testutil.ExportStatusWriter(seeder, "s")
859         }
860         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
861         leecherDataDir, err := ioutil.TempDir("", "")
862         require.NoError(t, err)
863         defer os.RemoveAll(leecherDataDir)
864         fc, err := filecache.NewCache(leecherDataDir)
865         require.NoError(t, err)
866         if ps.SetLeecherStorageCapacity {
867                 fc.SetCapacity(ps.LeecherStorageCapacity)
868         }
869         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
870         cfg.DataDir = leecherDataDir
871         leecher, _ := NewClient(cfg)
872         defer leecher.Close()
873         if ps.ExportClientStatus {
874                 testutil.ExportStatusWriter(leecher, "l")
875         }
876         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
877                 ret = TorrentSpecFromMetaInfo(mi)
878                 ret.ChunkSize = 2
879                 return
880         }())
881         require.NoError(t, err)
882         assert.True(t, new)
883         psc := leecherGreeting.SubscribePieceStateChanges()
884         defer psc.Close()
885         leecherGreeting.DownloadAll()
886         if ps.Cancel {
887                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
888         }
889         addClientPeer(leecherGreeting, seeder)
890         completes := make(map[int]bool, 3)
891 values:
892         for {
893                 // started := time.Now()
894                 select {
895                 case _v := <-psc.Values:
896                         // log.Print(time.Since(started))
897                         v := _v.(PieceStateChange)
898                         completes[v.Index] = v.Complete
899                 case <-time.After(100 * time.Millisecond):
900                         break values
901                 }
902         }
903         if ps.Cancel {
904                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
905         } else {
906                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
907         }
908
909 }
910
911 func TestTorrentDownloadAll(t *testing.T) {
912         testDownloadCancel(t, testDownloadCancelParams{})
913 }
914
915 func TestTorrentDownloadAllThenCancel(t *testing.T) {
916         testDownloadCancel(t, testDownloadCancelParams{
917                 Cancel: true,
918         })
919 }
920
921 // Ensure that it's an error for a peer to send an invalid have message.
922 func TestPeerInvalidHave(t *testing.T) {
923         cl, err := NewClient(TestingConfig())
924         require.NoError(t, err)
925         defer cl.Close()
926         info := metainfo.Info{
927                 PieceLength: 1,
928                 Pieces:      make([]byte, 20),
929                 Files:       []metainfo.FileInfo{{Length: 1}},
930         }
931         infoBytes, err := bencode.Marshal(info)
932         require.NoError(t, err)
933         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
934                 InfoBytes: infoBytes,
935                 InfoHash:  metainfo.HashBytes(infoBytes),
936         })
937         require.NoError(t, err)
938         assert.True(t, _new)
939         defer tt.Drop()
940         cn := &connection{
941                 t: tt,
942         }
943         assert.NoError(t, cn.peerSentHave(0))
944         assert.Error(t, cn.peerSentHave(1))
945 }
946
947 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
948         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
949         defer os.RemoveAll(greetingTempDir)
950         cfg := TestingConfig()
951         cfg.DataDir = greetingTempDir
952         seeder, err := NewClient(TestingConfig())
953         require.NoError(t, err)
954         seeder.AddTorrentSpec(&TorrentSpec{
955                 InfoBytes: greetingMetainfo.InfoBytes,
956         })
957 }
958
959 func TestPrepareTrackerAnnounce(t *testing.T) {
960         cl := &Client{}
961         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
962         require.NoError(t, err)
963         assert.False(t, blocked)
964         assert.EqualValues(t, "localhost:1234", host)
965         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
966 }
967
968 // Check that when the listen port is 0, all the protocols listened on have
969 // the same port, and it isn't zero.
970 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
971         cl, err := NewClient(TestingConfig())
972         require.NoError(t, err)
973         defer cl.Close()
974         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
975         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
976 }
977
978 func TestClientDynamicListenTCPOnly(t *testing.T) {
979         cfg := TestingConfig()
980         cfg.DisableUTP = true
981         cl, err := NewClient(cfg)
982         require.NoError(t, err)
983         defer cl.Close()
984         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
985         assert.Nil(t, cl.utpSock)
986 }
987
988 func TestClientDynamicListenUTPOnly(t *testing.T) {
989         cfg := TestingConfig()
990         cfg.DisableTCP = true
991         cl, err := NewClient(cfg)
992         require.NoError(t, err)
993         defer cl.Close()
994         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
995         assert.Nil(t, cl.tcpListener)
996 }
997
998 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
999         cfg := TestingConfig()
1000         cfg.DisableTCP = true
1001         cfg.DisableUTP = true
1002         cl, err := NewClient(cfg)
1003         require.NoError(t, err)
1004         defer cl.Close()
1005         assert.Nil(t, cl.ListenAddr())
1006 }
1007
1008 func addClientPeer(t *Torrent, cl *Client) {
1009         t.AddPeers([]Peer{
1010                 {
1011                         IP:   missinggo.AddrIP(cl.ListenAddr()),
1012                         Port: missinggo.AddrPort(cl.ListenAddr()),
1013                 },
1014         })
1015 }
1016
1017 func printConnPeerCounts(t *Torrent) {
1018         t.cl.mu.Lock()
1019         log.Println(len(t.conns), len(t.peers))
1020         t.cl.mu.Unlock()
1021 }
1022
1023 func totalConns(tts []*Torrent) (ret int) {
1024         for _, tt := range tts {
1025                 tt.cl.mu.Lock()
1026                 ret += len(tt.conns)
1027                 tt.cl.mu.Unlock()
1028         }
1029         return
1030 }
1031
1032 func TestSetMaxEstablishedConn(t *testing.T) {
1033         var tts []*Torrent
1034         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1035         for i := range iter.N(3) {
1036                 cl, err := NewClient(TestingConfig())
1037                 require.NoError(t, err)
1038                 defer cl.Close()
1039                 tt, _ := cl.AddTorrentInfoHash(ih)
1040                 tt.SetMaxEstablishedConns(2)
1041                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1042                 tts = append(tts, tt)
1043         }
1044         addPeers := func() {
1045                 for i, tt := range tts {
1046                         for _, _tt := range tts[:i] {
1047                                 addClientPeer(tt, _tt.cl)
1048                         }
1049                 }
1050         }
1051         waitTotalConns := func(num int) {
1052                 for totalConns(tts) != num {
1053                         time.Sleep(time.Millisecond)
1054                 }
1055         }
1056         addPeers()
1057         waitTotalConns(6)
1058         tts[0].SetMaxEstablishedConns(1)
1059         waitTotalConns(4)
1060         tts[0].SetMaxEstablishedConns(0)
1061         waitTotalConns(2)
1062         tts[0].SetMaxEstablishedConns(1)
1063         addPeers()
1064         waitTotalConns(4)
1065         tts[0].SetMaxEstablishedConns(2)
1066         addPeers()
1067         waitTotalConns(6)
1068 }
1069
1070 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1071         os.MkdirAll(dir, 0770)
1072         file, err := os.Create(filepath.Join(dir, name))
1073         require.NoError(t, err)
1074         file.Write([]byte(name))
1075         file.Close()
1076         mi := metainfo.MetaInfo{}
1077         mi.SetDefaults()
1078         info := metainfo.Info{PieceLength: 256 * 1024}
1079         err = info.BuildFromFilePath(filepath.Join(dir, name))
1080         require.NoError(t, err)
1081         mi.InfoBytes, err = bencode.Marshal(info)
1082         require.NoError(t, err)
1083         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1084         tr, err := cl.AddTorrent(&mi)
1085         require.NoError(t, err)
1086         assert.True(t, tr.Seeding())
1087         return magnet
1088 }
1089
1090 // https://github.com/anacrolix/torrent/issues/114
1091 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1092         cfg := TestingConfig()
1093         cfg.DisableUTP = true
1094         cfg.Seed = true
1095         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1096         cfg.Debug = true
1097         cfg.ForceEncryption = true
1098         os.Mkdir(cfg.DataDir, 0755)
1099         server, err := NewClient(cfg)
1100         require.NoError(t, err)
1101         defer server.Close()
1102         testutil.ExportStatusWriter(server, "s")
1103         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1104         makeMagnet(t, server, cfg.DataDir, "test2")
1105         cfg = TestingConfig()
1106         cfg.DisableUTP = true
1107         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1108         cfg.Debug = true
1109         cfg.ForceEncryption = true
1110         client, err := NewClient(cfg)
1111         require.NoError(t, err)
1112         defer client.Close()
1113         testutil.ExportStatusWriter(client, "c")
1114         tr, err := client.AddMagnet(magnet1)
1115         require.NoError(t, err)
1116         tr.AddPeers([]Peer{{
1117                 IP:   missinggo.AddrIP(server.ListenAddr()),
1118                 Port: missinggo.AddrPort(server.ListenAddr()),
1119         }})
1120         <-tr.GotInfo()
1121         tr.DownloadAll()
1122         client.WaitAll()
1123 }