]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Close implicit Client default storage on Client.Close
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "math/rand"
11         "net"
12         "os"
13         "path/filepath"
14         "strings"
15         "sync"
16         "testing"
17         "time"
18
19         "github.com/anacrolix/dht"
20         _ "github.com/anacrolix/envpprof"
21         "github.com/anacrolix/missinggo"
22         "github.com/anacrolix/missinggo/filecache"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/anacrolix/utp"
25         "github.com/bradfitz/iter"
26         "github.com/stretchr/testify/assert"
27         "github.com/stretchr/testify/require"
28         "golang.org/x/time/rate"
29
30         "github.com/anacrolix/torrent/bencode"
31         "github.com/anacrolix/torrent/internal/testutil"
32         "github.com/anacrolix/torrent/iplist"
33         "github.com/anacrolix/torrent/metainfo"
34         "github.com/anacrolix/torrent/storage"
35 )
36
37 func init() {
38         log.SetFlags(log.LstdFlags | log.Llongfile)
39 }
40
41 func TestingConfig() *Config {
42         return &Config{
43                 ListenAddr:      "localhost:0",
44                 NoDHT:           true,
45                 DisableTrackers: true,
46                 DataDir: func() string {
47                         ret, err := ioutil.TempDir("", "")
48                         if err != nil {
49                                 panic(err)
50                         }
51                         return ret
52                 }(),
53                 DHTConfig: dht.ServerConfig{
54                         NoDefaultBootstrap: true,
55                 },
56                 Debug: true,
57         }
58 }
59
60 func TestClientDefault(t *testing.T) {
61         cl, err := NewClient(TestingConfig())
62         require.NoError(t, err)
63         cl.Close()
64 }
65
66 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
67         cfg := TestingConfig()
68         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
69         require.NoError(t, err)
70         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
71         defer ci.Close()
72         cfg.DefaultStorage = ci
73         cl, err := NewClient(cfg)
74         require.NoError(t, err)
75         cl.Close()
76         // And again, https://github.com/anacrolix/torrent/issues/158
77         cl, err = NewClient(cfg)
78         require.NoError(t, err)
79         cl.Close()
80 }
81
82 func TestAddDropTorrent(t *testing.T) {
83         cl, err := NewClient(TestingConfig())
84         require.NoError(t, err)
85         defer cl.Close()
86         dir, mi := testutil.GreetingTestTorrent()
87         defer os.RemoveAll(dir)
88         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
89         require.NoError(t, err)
90         assert.True(t, new)
91         tt.SetMaxEstablishedConns(0)
92         tt.SetMaxEstablishedConns(1)
93         tt.Drop()
94 }
95
96 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
97         t.SkipNow()
98 }
99
100 func TestAddTorrentNoUsableURLs(t *testing.T) {
101         t.SkipNow()
102 }
103
104 func TestAddPeersToUnknownTorrent(t *testing.T) {
105         t.SkipNow()
106 }
107
108 func TestPieceHashSize(t *testing.T) {
109         if pieceHash.Size() != 20 {
110                 t.FailNow()
111         }
112 }
113
114 func TestTorrentInitialState(t *testing.T) {
115         dir, mi := testutil.GreetingTestTorrent()
116         defer os.RemoveAll(dir)
117         tor := &Torrent{
118                 infoHash:          mi.HashInfoBytes(),
119                 pieceStateChanges: pubsub.NewPubSub(),
120         }
121         tor.chunkSize = 2
122         tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion("/dev/null", storage.NewMapPieceCompletion()))
123         // Needed to lock for asynchronous piece verification.
124         tor.cl = new(Client)
125         err := tor.setInfoBytes(mi.InfoBytes)
126         require.NoError(t, err)
127         require.Len(t, tor.pieces, 3)
128         tor.pendAllChunkSpecs(0)
129         tor.cl.mu.Lock()
130         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
131         tor.cl.mu.Unlock()
132         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
133 }
134
135 func TestUnmarshalPEXMsg(t *testing.T) {
136         var m peerExchangeMessage
137         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
138                 t.Fatal(err)
139         }
140         if len(m.Added) != 2 {
141                 t.FailNow()
142         }
143         if m.Added[0].Port != 0x506 {
144                 t.FailNow()
145         }
146 }
147
148 func TestReducedDialTimeout(t *testing.T) {
149         for _, _case := range []struct {
150                 Max             time.Duration
151                 HalfOpenLimit   int
152                 PendingPeers    int
153                 ExpectedReduced time.Duration
154         }{
155                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
156                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
157                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
158                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
159                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
160                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
161         } {
162                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
163                 expected := _case.ExpectedReduced
164                 if expected < minDialTimeout {
165                         expected = minDialTimeout
166                 }
167                 if reduced != expected {
168                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
169                 }
170         }
171 }
172
173 func TestUTPRawConn(t *testing.T) {
174         l, err := utp.NewSocket("udp", "")
175         if err != nil {
176                 t.Fatal(err)
177         }
178         defer l.Close()
179         go func() {
180                 for {
181                         _, err := l.Accept()
182                         if err != nil {
183                                 break
184                         }
185                 }
186         }()
187         // Connect a UTP peer to see if the RawConn will still work.
188         s, _ := utp.NewSocket("udp", "")
189         defer s.Close()
190         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
191         if err != nil {
192                 t.Fatalf("error dialing utp listener: %s", err)
193         }
194         defer utpPeer.Close()
195         peer, err := net.ListenPacket("udp", ":0")
196         if err != nil {
197                 t.Fatal(err)
198         }
199         defer peer.Close()
200
201         msgsReceived := 0
202         // How many messages to send. I've set this to double the channel buffer
203         // size in the raw packetConn.
204         const N = 200
205         readerStopped := make(chan struct{})
206         // The reader goroutine.
207         go func() {
208                 defer close(readerStopped)
209                 b := make([]byte, 500)
210                 for i := 0; i < N; i++ {
211                         n, _, err := l.ReadFrom(b)
212                         if err != nil {
213                                 t.Fatalf("error reading from raw conn: %s", err)
214                         }
215                         msgsReceived++
216                         var d int
217                         fmt.Sscan(string(b[:n]), &d)
218                         if d != i {
219                                 log.Printf("got wrong number: expected %d, got %d", i, d)
220                         }
221                 }
222         }()
223         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
224         if err != nil {
225                 t.Fatal(err)
226         }
227         for i := 0; i < N; i++ {
228                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
229                 if err != nil {
230                         t.Fatal(err)
231                 }
232                 time.Sleep(time.Microsecond)
233         }
234         select {
235         case <-readerStopped:
236         case <-time.After(time.Second):
237                 t.Fatal("reader timed out")
238         }
239         if msgsReceived != N {
240                 t.Fatalf("messages received: %d", msgsReceived)
241         }
242 }
243
244 func TestTwoClientsArbitraryPorts(t *testing.T) {
245         for i := 0; i < 2; i++ {
246                 cl, err := NewClient(TestingConfig())
247                 if err != nil {
248                         t.Fatal(err)
249                 }
250                 defer cl.Close()
251         }
252 }
253
254 func TestAddDropManyTorrents(t *testing.T) {
255         cl, err := NewClient(TestingConfig())
256         require.NoError(t, err)
257         defer cl.Close()
258         for i := range iter.N(1000) {
259                 var spec TorrentSpec
260                 binary.PutVarint(spec.InfoHash[:], int64(i))
261                 tt, new, err := cl.AddTorrentSpec(&spec)
262                 assert.NoError(t, err)
263                 assert.True(t, new)
264                 defer tt.Drop()
265         }
266 }
267
268 type FileCacheClientStorageFactoryParams struct {
269         Capacity    int64
270         SetCapacity bool
271         Wrapper     func(*filecache.Cache) storage.ClientImpl
272 }
273
274 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
275         return func(dataDir string) storage.ClientImpl {
276                 fc, err := filecache.NewCache(dataDir)
277                 if err != nil {
278                         panic(err)
279                 }
280                 if ps.SetCapacity {
281                         fc.SetCapacity(ps.Capacity)
282                 }
283                 return ps.Wrapper(fc)
284         }
285 }
286
287 type storageFactory func(string) storage.ClientImpl
288
289 func TestClientTransferDefault(t *testing.T) {
290         testClientTransfer(t, testClientTransferParams{
291                 ExportClientStatus: true,
292                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
293                         Wrapper: fileCachePieceResourceStorage,
294                 }),
295         })
296 }
297
298 func TestClientTransferRateLimitedUpload(t *testing.T) {
299         started := time.Now()
300         testClientTransfer(t, testClientTransferParams{
301                 // We are uploading 13 bytes (the length of the greeting torrent). The
302                 // chunks are 2 bytes in length. Then the smallest burst we can run
303                 // with is 2. Time taken is (13-burst)/rate.
304                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
305         })
306         require.True(t, time.Since(started) > time.Second)
307 }
308
309 func TestClientTransferRateLimitedDownload(t *testing.T) {
310         testClientTransfer(t, testClientTransferParams{
311                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
312         })
313 }
314
315 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
316         return storage.NewResourcePieces(fc.AsResourceProvider())
317 }
318
319 func TestClientTransferSmallCache(t *testing.T) {
320         testClientTransfer(t, testClientTransferParams{
321                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
322                         SetCapacity: true,
323                         // Going below the piece length means it can't complete a piece so
324                         // that it can be hashed.
325                         Capacity: 5,
326                         Wrapper:  fileCachePieceResourceStorage,
327                 }),
328                 SetReadahead: true,
329                 // Can't readahead too far or the cache will thrash and drop data we
330                 // thought we had.
331                 Readahead:          0,
332                 ExportClientStatus: true,
333         })
334 }
335
336 func TestClientTransferVarious(t *testing.T) {
337         // Leecher storage
338         for _, ls := range []storageFactory{
339                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
340                         Wrapper: fileCachePieceResourceStorage,
341                 }),
342                 storage.NewBoltDB,
343         } {
344                 // Seeder storage
345                 for _, ss := range []func(string) storage.ClientImpl{
346                         storage.NewFile,
347                         storage.NewMMap,
348                 } {
349                         for _, responsive := range []bool{false, true} {
350                                 testClientTransfer(t, testClientTransferParams{
351                                         Responsive:     responsive,
352                                         SeederStorage:  ss,
353                                         LeecherStorage: ls,
354                                 })
355                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
356                                         testClientTransfer(t, testClientTransferParams{
357                                                 SeederStorage:  ss,
358                                                 Responsive:     responsive,
359                                                 SetReadahead:   true,
360                                                 Readahead:      readahead,
361                                                 LeecherStorage: ls,
362                                         })
363                                 }
364                         }
365                 }
366         }
367 }
368
369 type testClientTransferParams struct {
370         Responsive                 bool
371         Readahead                  int64
372         SetReadahead               bool
373         ExportClientStatus         bool
374         LeecherStorage             func(string) storage.ClientImpl
375         SeederStorage              func(string) storage.ClientImpl
376         SeederUploadRateLimiter    *rate.Limiter
377         LeecherDownloadRateLimiter *rate.Limiter
378 }
379
380 // Creates a seeder and a leecher, and ensures the data transfers when a read
381 // is attempted on the leecher.
382 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
383         greetingTempDir, mi := testutil.GreetingTestTorrent()
384         defer os.RemoveAll(greetingTempDir)
385         // Create seeder and a Torrent.
386         cfg := TestingConfig()
387         cfg.Seed = true
388         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
389         // cfg.ListenAddr = "localhost:4000"
390         if ps.SeederStorage != nil {
391                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
392                 defer cfg.DefaultStorage.Close()
393         } else {
394                 cfg.DataDir = greetingTempDir
395         }
396         seeder, err := NewClient(cfg)
397         require.NoError(t, err)
398         defer seeder.Close()
399         if ps.ExportClientStatus {
400                 testutil.ExportStatusWriter(seeder, "s")
401         }
402         // seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
403         _, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
404         require.NoError(t, err)
405         assert.True(t, new)
406         // Create leecher and a Torrent.
407         leecherDataDir, err := ioutil.TempDir("", "")
408         require.NoError(t, err)
409         defer os.RemoveAll(leecherDataDir)
410         if ps.LeecherStorage == nil {
411                 cfg.DataDir = leecherDataDir
412         } else {
413                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
414         }
415         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
416         // cfg.ListenAddr = "localhost:4001"
417         leecher, err := NewClient(cfg)
418         require.NoError(t, err)
419         defer leecher.Close()
420         if ps.ExportClientStatus {
421                 testutil.ExportStatusWriter(leecher, "l")
422         }
423         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
424                 ret = TorrentSpecFromMetaInfo(mi)
425                 ret.ChunkSize = 2
426                 return
427         }())
428         require.NoError(t, err)
429         assert.True(t, new)
430         // Now do some things with leecher and seeder.
431         addClientPeer(leecherGreeting, seeder)
432         r := leecherGreeting.NewReader()
433         defer r.Close()
434         if ps.Responsive {
435                 r.SetResponsive()
436         }
437         if ps.SetReadahead {
438                 r.SetReadahead(ps.Readahead)
439         }
440         assertReadAllGreeting(t, r)
441         // After one read through, we can assume certain torrent statistics.
442         // These are not a strict requirement. It is however interesting to
443         // follow.
444         // t.Logf("%#v", seederTorrent.Stats())
445         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
446         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
447         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
448         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
449         // Read through again for the cases where the torrent data size exceeds
450         // the size of the cache.
451         assertReadAllGreeting(t, r)
452 }
453
454 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
455         pos, err := r.Seek(0, os.SEEK_SET)
456         assert.NoError(t, err)
457         assert.EqualValues(t, 0, pos)
458         _greeting, err := ioutil.ReadAll(r)
459         assert.NoError(t, err)
460         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
461 }
462
463 // Check that after completing leeching, a leecher transitions to a seeding
464 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
465 func TestSeedAfterDownloading(t *testing.T) {
466         greetingTempDir, mi := testutil.GreetingTestTorrent()
467         defer os.RemoveAll(greetingTempDir)
468         cfg := TestingConfig()
469         cfg.Seed = true
470         cfg.DataDir = greetingTempDir
471         seeder, err := NewClient(cfg)
472         require.NoError(t, err)
473         defer seeder.Close()
474         testutil.ExportStatusWriter(seeder, "s")
475         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
476         cfg.DataDir, err = ioutil.TempDir("", "")
477         require.NoError(t, err)
478         defer os.RemoveAll(cfg.DataDir)
479         leecher, err := NewClient(cfg)
480         require.NoError(t, err)
481         defer leecher.Close()
482         testutil.ExportStatusWriter(leecher, "l")
483         cfg.Seed = false
484         // cfg.TorrentDataOpener = nil
485         cfg.DataDir, err = ioutil.TempDir("", "")
486         require.NoError(t, err)
487         defer os.RemoveAll(cfg.DataDir)
488         leecherLeecher, _ := NewClient(cfg)
489         defer leecherLeecher.Close()
490         testutil.ExportStatusWriter(leecherLeecher, "ll")
491         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
492                 ret = TorrentSpecFromMetaInfo(mi)
493                 ret.ChunkSize = 2
494                 return
495         }())
496         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
497                 ret = TorrentSpecFromMetaInfo(mi)
498                 ret.ChunkSize = 3
499                 return
500         }())
501         // Simultaneously DownloadAll in Leecher, and read the contents
502         // consecutively in LeecherLeecher. This non-deterministically triggered a
503         // case where the leecher wouldn't unchoke the LeecherLeecher.
504         var wg sync.WaitGroup
505         wg.Add(1)
506         go func() {
507                 defer wg.Done()
508                 r := llg.NewReader()
509                 defer r.Close()
510                 b, err := ioutil.ReadAll(r)
511                 require.NoError(t, err)
512                 assert.EqualValues(t, testutil.GreetingFileContents, b)
513         }()
514         addClientPeer(leecherGreeting, seeder)
515         addClientPeer(leecherGreeting, leecherLeecher)
516         wg.Add(1)
517         go func() {
518                 defer wg.Done()
519                 leecherGreeting.DownloadAll()
520                 leecher.WaitAll()
521         }()
522         wg.Wait()
523 }
524
525 func TestMergingTrackersByAddingSpecs(t *testing.T) {
526         cl, err := NewClient(TestingConfig())
527         require.NoError(t, err)
528         defer cl.Close()
529         spec := TorrentSpec{}
530         T, new, _ := cl.AddTorrentSpec(&spec)
531         if !new {
532                 t.FailNow()
533         }
534         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
535         _, new, _ = cl.AddTorrentSpec(&spec)
536         assert.False(t, new)
537         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
538         // Because trackers are disabled in TestingConfig.
539         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
540 }
541
542 type badStorage struct{}
543
544 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
545         return bs, nil
546 }
547
548 func (bs badStorage) Close() error {
549         return nil
550 }
551
552 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
553         return badStoragePiece{p}
554 }
555
556 type badStoragePiece struct {
557         p metainfo.Piece
558 }
559
560 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
561         return 0, nil
562 }
563
564 func (p badStoragePiece) GetIsComplete() bool {
565         return true
566 }
567
568 func (p badStoragePiece) MarkComplete() error {
569         return errors.New("psyyyyyyyche")
570 }
571
572 func (p badStoragePiece) MarkNotComplete() error {
573         return errors.New("psyyyyyyyche")
574 }
575
576 func (p badStoragePiece) randomlyTruncatedDataString() string {
577         return "hello, world\n"[:rand.Intn(14)]
578 }
579
580 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
581         r := strings.NewReader(p.randomlyTruncatedDataString())
582         return r.ReadAt(b, off+p.p.Offset())
583 }
584
585 // We read from a piece which is marked completed, but is missing data.
586 func TestCompletedPieceWrongSize(t *testing.T) {
587         cfg := TestingConfig()
588         cfg.DefaultStorage = badStorage{}
589         cl, err := NewClient(cfg)
590         require.NoError(t, err)
591         defer cl.Close()
592         info := metainfo.Info{
593                 PieceLength: 15,
594                 Pieces:      make([]byte, 20),
595                 Files: []metainfo.FileInfo{
596                         {Path: []string{"greeting"}, Length: 13},
597                 },
598         }
599         b, err := bencode.Marshal(info)
600         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
601                 InfoBytes: b,
602                 InfoHash:  metainfo.HashBytes(b),
603         })
604         require.NoError(t, err)
605         defer tt.Drop()
606         assert.True(t, new)
607         r := tt.NewReader()
608         defer r.Close()
609         b, err = ioutil.ReadAll(r)
610         assert.Len(t, b, 13)
611         assert.NoError(t, err)
612 }
613
614 func BenchmarkAddLargeTorrent(b *testing.B) {
615         cfg := TestingConfig()
616         cfg.DisableTCP = true
617         cfg.DisableUTP = true
618         cfg.ListenAddr = "redonk"
619         cl, err := NewClient(cfg)
620         require.NoError(b, err)
621         defer cl.Close()
622         for range iter.N(b.N) {
623                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
624                 if err != nil {
625                         b.Fatal(err)
626                 }
627                 t.Drop()
628         }
629 }
630
631 func TestResponsive(t *testing.T) {
632         seederDataDir, mi := testutil.GreetingTestTorrent()
633         defer os.RemoveAll(seederDataDir)
634         cfg := TestingConfig()
635         cfg.Seed = true
636         cfg.DataDir = seederDataDir
637         seeder, err := NewClient(cfg)
638         require.Nil(t, err)
639         defer seeder.Close()
640         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
641         leecherDataDir, err := ioutil.TempDir("", "")
642         require.Nil(t, err)
643         defer os.RemoveAll(leecherDataDir)
644         cfg = TestingConfig()
645         cfg.DataDir = leecherDataDir
646         leecher, err := NewClient(cfg)
647         require.Nil(t, err)
648         defer leecher.Close()
649         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
650                 ret = TorrentSpecFromMetaInfo(mi)
651                 ret.ChunkSize = 2
652                 return
653         }())
654         addClientPeer(leecherTorrent, seeder)
655         reader := leecherTorrent.NewReader()
656         defer reader.Close()
657         reader.SetReadahead(0)
658         reader.SetResponsive()
659         b := make([]byte, 2)
660         _, err = reader.Seek(3, os.SEEK_SET)
661         require.NoError(t, err)
662         _, err = io.ReadFull(reader, b)
663         assert.Nil(t, err)
664         assert.EqualValues(t, "lo", string(b))
665         _, err = reader.Seek(11, os.SEEK_SET)
666         require.NoError(t, err)
667         n, err := io.ReadFull(reader, b)
668         assert.Nil(t, err)
669         assert.EqualValues(t, 2, n)
670         assert.EqualValues(t, "d\n", string(b))
671 }
672
673 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
674         seederDataDir, mi := testutil.GreetingTestTorrent()
675         defer os.RemoveAll(seederDataDir)
676         cfg := TestingConfig()
677         cfg.Seed = true
678         cfg.DataDir = seederDataDir
679         seeder, err := NewClient(cfg)
680         require.Nil(t, err)
681         defer seeder.Close()
682         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
683         leecherDataDir, err := ioutil.TempDir("", "")
684         require.Nil(t, err)
685         defer os.RemoveAll(leecherDataDir)
686         cfg = TestingConfig()
687         cfg.DataDir = leecherDataDir
688         leecher, err := NewClient(cfg)
689         require.Nil(t, err)
690         defer leecher.Close()
691         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
692                 ret = TorrentSpecFromMetaInfo(mi)
693                 ret.ChunkSize = 2
694                 return
695         }())
696         addClientPeer(leecherTorrent, seeder)
697         reader := leecherTorrent.NewReader()
698         defer reader.Close()
699         reader.SetReadahead(0)
700         reader.SetResponsive()
701         b := make([]byte, 2)
702         _, err = reader.Seek(3, os.SEEK_SET)
703         require.NoError(t, err)
704         _, err = io.ReadFull(reader, b)
705         assert.Nil(t, err)
706         assert.EqualValues(t, "lo", string(b))
707         go leecherTorrent.Drop()
708         _, err = reader.Seek(11, os.SEEK_SET)
709         require.NoError(t, err)
710         n, err := reader.Read(b)
711         assert.EqualError(t, err, "torrent closed")
712         assert.EqualValues(t, 0, n)
713 }
714
715 func TestDHTInheritBlocklist(t *testing.T) {
716         ipl := iplist.New(nil)
717         require.NotNil(t, ipl)
718         cfg := TestingConfig()
719         cfg.IPBlocklist = ipl
720         cfg.NoDHT = false
721         cl, err := NewClient(cfg)
722         require.NoError(t, err)
723         defer cl.Close()
724         require.Equal(t, ipl, cl.DHT().IPBlocklist())
725 }
726
727 // Check that stuff is merged in subsequent AddTorrentSpec for the same
728 // infohash.
729 func TestAddTorrentSpecMerging(t *testing.T) {
730         cl, err := NewClient(TestingConfig())
731         require.NoError(t, err)
732         defer cl.Close()
733         dir, mi := testutil.GreetingTestTorrent()
734         defer os.RemoveAll(dir)
735         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
736                 InfoHash: mi.HashInfoBytes(),
737         })
738         require.NoError(t, err)
739         require.True(t, new)
740         require.Nil(t, tt.Info())
741         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
742         require.NoError(t, err)
743         require.False(t, new)
744         require.NotNil(t, tt.Info())
745 }
746
747 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
748         dir, mi := testutil.GreetingTestTorrent()
749         os.RemoveAll(dir)
750         cl, _ := NewClient(TestingConfig())
751         defer cl.Close()
752         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
753                 InfoHash: mi.HashInfoBytes(),
754         })
755         tt.Drop()
756         assert.EqualValues(t, 0, len(cl.Torrents()))
757         select {
758         case <-tt.GotInfo():
759                 t.FailNow()
760         default:
761         }
762 }
763
764 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
765         for i := range iter.N(info.NumPieces()) {
766                 p := info.Piece(i)
767                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
768         }
769 }
770
771 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
772         fileCacheDir, err := ioutil.TempDir("", "")
773         require.NoError(t, err)
774         defer os.RemoveAll(fileCacheDir)
775         fileCache, err := filecache.NewCache(fileCacheDir)
776         require.NoError(t, err)
777         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
778         defer os.RemoveAll(greetingDataTempDir)
779         filePieceStore := csf(fileCache)
780         info, err := greetingMetainfo.UnmarshalInfo()
781         require.NoError(t, err)
782         ih := greetingMetainfo.HashInfoBytes()
783         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
784         require.NoError(t, err)
785         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
786         // require.Equal(t, len(testutil.GreetingFileContents), written)
787         // require.NoError(t, err)
788         for i := 0; i < info.NumPieces(); i++ {
789                 p := info.Piece(i)
790                 if alreadyCompleted {
791                         err := greetingData.Piece(p).MarkComplete()
792                         assert.NoError(t, err)
793                 }
794         }
795         cfg := TestingConfig()
796         // TODO: Disable network option?
797         cfg.DisableTCP = true
798         cfg.DisableUTP = true
799         cfg.DefaultStorage = filePieceStore
800         cl, err := NewClient(cfg)
801         require.NoError(t, err)
802         defer cl.Close()
803         tt, err := cl.AddTorrent(greetingMetainfo)
804         require.NoError(t, err)
805         psrs := tt.PieceStateRuns()
806         assert.Len(t, psrs, 1)
807         assert.EqualValues(t, 3, psrs[0].Length)
808         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
809         if alreadyCompleted {
810                 r := tt.NewReader()
811                 b, err := ioutil.ReadAll(r)
812                 assert.NoError(t, err)
813                 assert.EqualValues(t, testutil.GreetingFileContents, b)
814         }
815 }
816
817 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
818         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
819 }
820
821 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
822         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
823 }
824
825 func TestAddMetainfoWithNodes(t *testing.T) {
826         cfg := TestingConfig()
827         cfg.NoDHT = false
828         // For now, we want to just jam the nodes into the table, without
829         // verifying them first. Also the DHT code doesn't support mixing secure
830         // and insecure nodes if security is enabled (yet).
831         cfg.DHTConfig.NoSecurity = true
832         cl, err := NewClient(cfg)
833         require.NoError(t, err)
834         defer cl.Close()
835         assert.EqualValues(t, cl.DHT().NumNodes(), 0)
836         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
837         require.NoError(t, err)
838         assert.Len(t, tt.metainfo.AnnounceList, 5)
839         assert.EqualValues(t, 6, cl.DHT().NumNodes())
840 }
841
842 type testDownloadCancelParams struct {
843         ExportClientStatus        bool
844         SetLeecherStorageCapacity bool
845         LeecherStorageCapacity    int64
846         Cancel                    bool
847 }
848
849 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
850         greetingTempDir, mi := testutil.GreetingTestTorrent()
851         defer os.RemoveAll(greetingTempDir)
852         cfg := TestingConfig()
853         cfg.Seed = true
854         cfg.DataDir = greetingTempDir
855         seeder, err := NewClient(cfg)
856         require.NoError(t, err)
857         defer seeder.Close()
858         if ps.ExportClientStatus {
859                 testutil.ExportStatusWriter(seeder, "s")
860         }
861         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
862         leecherDataDir, err := ioutil.TempDir("", "")
863         require.NoError(t, err)
864         defer os.RemoveAll(leecherDataDir)
865         fc, err := filecache.NewCache(leecherDataDir)
866         require.NoError(t, err)
867         if ps.SetLeecherStorageCapacity {
868                 fc.SetCapacity(ps.LeecherStorageCapacity)
869         }
870         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
871         cfg.DataDir = leecherDataDir
872         leecher, _ := NewClient(cfg)
873         defer leecher.Close()
874         if ps.ExportClientStatus {
875                 testutil.ExportStatusWriter(leecher, "l")
876         }
877         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
878                 ret = TorrentSpecFromMetaInfo(mi)
879                 ret.ChunkSize = 2
880                 return
881         }())
882         require.NoError(t, err)
883         assert.True(t, new)
884         psc := leecherGreeting.SubscribePieceStateChanges()
885         defer psc.Close()
886         leecherGreeting.DownloadAll()
887         if ps.Cancel {
888                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
889         }
890         addClientPeer(leecherGreeting, seeder)
891         completes := make(map[int]bool, 3)
892 values:
893         for {
894                 // started := time.Now()
895                 select {
896                 case _v := <-psc.Values:
897                         // log.Print(time.Since(started))
898                         v := _v.(PieceStateChange)
899                         completes[v.Index] = v.Complete
900                 case <-time.After(100 * time.Millisecond):
901                         break values
902                 }
903         }
904         if ps.Cancel {
905                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
906         } else {
907                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
908         }
909
910 }
911
912 func TestTorrentDownloadAll(t *testing.T) {
913         testDownloadCancel(t, testDownloadCancelParams{})
914 }
915
916 func TestTorrentDownloadAllThenCancel(t *testing.T) {
917         testDownloadCancel(t, testDownloadCancelParams{
918                 Cancel: true,
919         })
920 }
921
922 // Ensure that it's an error for a peer to send an invalid have message.
923 func TestPeerInvalidHave(t *testing.T) {
924         cl, err := NewClient(TestingConfig())
925         require.NoError(t, err)
926         defer cl.Close()
927         info := metainfo.Info{
928                 PieceLength: 1,
929                 Pieces:      make([]byte, 20),
930                 Files:       []metainfo.FileInfo{{Length: 1}},
931         }
932         infoBytes, err := bencode.Marshal(info)
933         require.NoError(t, err)
934         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
935                 InfoBytes: infoBytes,
936                 InfoHash:  metainfo.HashBytes(infoBytes),
937         })
938         require.NoError(t, err)
939         assert.True(t, _new)
940         defer tt.Drop()
941         cn := &connection{
942                 t: tt,
943         }
944         assert.NoError(t, cn.peerSentHave(0))
945         assert.Error(t, cn.peerSentHave(1))
946 }
947
948 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
949         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
950         defer os.RemoveAll(greetingTempDir)
951         cfg := TestingConfig()
952         cfg.DataDir = greetingTempDir
953         seeder, err := NewClient(TestingConfig())
954         require.NoError(t, err)
955         seeder.AddTorrentSpec(&TorrentSpec{
956                 InfoBytes: greetingMetainfo.InfoBytes,
957         })
958 }
959
960 func TestPrepareTrackerAnnounce(t *testing.T) {
961         cl := &Client{}
962         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
963         require.NoError(t, err)
964         assert.False(t, blocked)
965         assert.EqualValues(t, "localhost:1234", host)
966         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
967 }
968
969 // Check that when the listen port is 0, all the protocols listened on have
970 // the same port, and it isn't zero.
971 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
972         cl, err := NewClient(TestingConfig())
973         require.NoError(t, err)
974         defer cl.Close()
975         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
976         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
977 }
978
979 func TestClientDynamicListenTCPOnly(t *testing.T) {
980         cfg := TestingConfig()
981         cfg.DisableUTP = true
982         cl, err := NewClient(cfg)
983         require.NoError(t, err)
984         defer cl.Close()
985         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
986         assert.Nil(t, cl.utpSock)
987 }
988
989 func TestClientDynamicListenUTPOnly(t *testing.T) {
990         cfg := TestingConfig()
991         cfg.DisableTCP = true
992         cl, err := NewClient(cfg)
993         require.NoError(t, err)
994         defer cl.Close()
995         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
996         assert.Nil(t, cl.tcpListener)
997 }
998
999 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
1000         cfg := TestingConfig()
1001         cfg.DisableTCP = true
1002         cfg.DisableUTP = true
1003         cl, err := NewClient(cfg)
1004         require.NoError(t, err)
1005         defer cl.Close()
1006         assert.Nil(t, cl.ListenAddr())
1007 }
1008
1009 func addClientPeer(t *Torrent, cl *Client) {
1010         t.AddPeers([]Peer{
1011                 {
1012                         IP:   missinggo.AddrIP(cl.ListenAddr()),
1013                         Port: missinggo.AddrPort(cl.ListenAddr()),
1014                 },
1015         })
1016 }
1017
1018 func printConnPeerCounts(t *Torrent) {
1019         t.cl.mu.Lock()
1020         log.Println(len(t.conns), len(t.peers))
1021         t.cl.mu.Unlock()
1022 }
1023
1024 func totalConns(tts []*Torrent) (ret int) {
1025         for _, tt := range tts {
1026                 tt.cl.mu.Lock()
1027                 ret += len(tt.conns)
1028                 tt.cl.mu.Unlock()
1029         }
1030         return
1031 }
1032
1033 func TestSetMaxEstablishedConn(t *testing.T) {
1034         var tts []*Torrent
1035         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1036         for i := range iter.N(3) {
1037                 cl, err := NewClient(TestingConfig())
1038                 require.NoError(t, err)
1039                 defer cl.Close()
1040                 tt, _ := cl.AddTorrentInfoHash(ih)
1041                 tt.SetMaxEstablishedConns(2)
1042                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1043                 tts = append(tts, tt)
1044         }
1045         addPeers := func() {
1046                 for i, tt := range tts {
1047                         for _, _tt := range tts[:i] {
1048                                 addClientPeer(tt, _tt.cl)
1049                         }
1050                 }
1051         }
1052         waitTotalConns := func(num int) {
1053                 for totalConns(tts) != num {
1054                         time.Sleep(time.Millisecond)
1055                 }
1056         }
1057         addPeers()
1058         waitTotalConns(6)
1059         tts[0].SetMaxEstablishedConns(1)
1060         waitTotalConns(4)
1061         tts[0].SetMaxEstablishedConns(0)
1062         waitTotalConns(2)
1063         tts[0].SetMaxEstablishedConns(1)
1064         addPeers()
1065         waitTotalConns(4)
1066         tts[0].SetMaxEstablishedConns(2)
1067         addPeers()
1068         waitTotalConns(6)
1069 }
1070
1071 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1072         os.MkdirAll(dir, 0770)
1073         file, err := os.Create(filepath.Join(dir, name))
1074         require.NoError(t, err)
1075         file.Write([]byte(name))
1076         file.Close()
1077         mi := metainfo.MetaInfo{}
1078         mi.SetDefaults()
1079         info := metainfo.Info{PieceLength: 256 * 1024}
1080         err = info.BuildFromFilePath(filepath.Join(dir, name))
1081         require.NoError(t, err)
1082         mi.InfoBytes, err = bencode.Marshal(info)
1083         require.NoError(t, err)
1084         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1085         tr, err := cl.AddTorrent(&mi)
1086         require.NoError(t, err)
1087         assert.True(t, tr.Seeding())
1088         return magnet
1089 }
1090
1091 // https://github.com/anacrolix/torrent/issues/114
1092 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1093         cfg := TestingConfig()
1094         cfg.DisableUTP = true
1095         cfg.Seed = true
1096         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1097         cfg.Debug = true
1098         cfg.ForceEncryption = true
1099         os.Mkdir(cfg.DataDir, 0755)
1100         server, err := NewClient(cfg)
1101         require.NoError(t, err)
1102         defer server.Close()
1103         testutil.ExportStatusWriter(server, "s")
1104         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1105         makeMagnet(t, server, cfg.DataDir, "test2")
1106         cfg = TestingConfig()
1107         cfg.DisableUTP = true
1108         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1109         cfg.Debug = true
1110         cfg.ForceEncryption = true
1111         client, err := NewClient(cfg)
1112         require.NoError(t, err)
1113         defer client.Close()
1114         testutil.ExportStatusWriter(client, "c")
1115         tr, err := client.AddMagnet(magnet1)
1116         require.NoError(t, err)
1117         tr.AddPeers([]Peer{{
1118                 IP:   missinggo.AddrIP(server.ListenAddr()),
1119                 Port: missinggo.AddrPort(server.ListenAddr()),
1120         }})
1121         <-tr.GotInfo()
1122         tr.DownloadAll()
1123         client.WaitAll()
1124 }