]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Merge branch 'dev'
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "os"
9         "path/filepath"
10         "reflect"
11         "sync"
12         "testing"
13         "time"
14
15         "github.com/anacrolix/dht"
16         _ "github.com/anacrolix/envpprof"
17         "github.com/anacrolix/missinggo"
18         "github.com/anacrolix/missinggo/filecache"
19         "github.com/bradfitz/iter"
20         "github.com/stretchr/testify/assert"
21         "github.com/stretchr/testify/require"
22         "golang.org/x/time/rate"
23
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/internal/testutil"
26         "github.com/anacrolix/torrent/iplist"
27         "github.com/anacrolix/torrent/metainfo"
28         "github.com/anacrolix/torrent/storage"
29 )
30
31 func TestingConfig() *ClientConfig {
32         cfg := NewDefaultClientConfig()
33         cfg.ListenHost = LoopbackListenHost
34         cfg.NoDHT = true
35         cfg.DataDir = tempDir()
36         cfg.DisableTrackers = true
37         cfg.NoDefaultPortForwarding = true
38         cfg.DisableAcceptRateLimiting = true
39         return cfg
40 }
41
42 func TestClientDefault(t *testing.T) {
43         cl, err := NewClient(TestingConfig())
44         require.NoError(t, err)
45         cl.Close()
46 }
47
48 func TestClientNilConfig(t *testing.T) {
49         cl, err := NewClient(nil)
50         require.NoError(t, err)
51         cl.Close()
52 }
53
54 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
55         cfg := TestingConfig()
56         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
57         require.NoError(t, err)
58         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
59         defer ci.Close()
60         cfg.DefaultStorage = ci
61         cl, err := NewClient(cfg)
62         require.NoError(t, err)
63         cl.Close()
64         // And again, https://github.com/anacrolix/torrent/issues/158
65         cl, err = NewClient(cfg)
66         require.NoError(t, err)
67         cl.Close()
68 }
69
70 func TestAddDropTorrent(t *testing.T) {
71         cl, err := NewClient(TestingConfig())
72         require.NoError(t, err)
73         defer cl.Close()
74         dir, mi := testutil.GreetingTestTorrent()
75         defer os.RemoveAll(dir)
76         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
77         require.NoError(t, err)
78         assert.True(t, new)
79         tt.SetMaxEstablishedConns(0)
80         tt.SetMaxEstablishedConns(1)
81         tt.Drop()
82 }
83
84 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
85         // TODO?
86         t.SkipNow()
87 }
88
89 func TestAddTorrentNoUsableURLs(t *testing.T) {
90         // TODO?
91         t.SkipNow()
92 }
93
94 func TestAddPeersToUnknownTorrent(t *testing.T) {
95         // TODO?
96         t.SkipNow()
97 }
98
99 func TestPieceHashSize(t *testing.T) {
100         assert.Equal(t, 20, pieceHash.Size())
101 }
102
103 func TestTorrentInitialState(t *testing.T) {
104         dir, mi := testutil.GreetingTestTorrent()
105         defer os.RemoveAll(dir)
106         cl := &Client{
107                 config: &ClientConfig{},
108         }
109         cl.initLogger()
110         tor := cl.newTorrent(
111                 mi.HashInfoBytes(),
112                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
113         )
114         tor.setChunkSize(2)
115         tor.cl.mu.Lock()
116         err := tor.setInfoBytes(mi.InfoBytes)
117         tor.cl.mu.Unlock()
118         require.NoError(t, err)
119         require.Len(t, tor.pieces, 3)
120         tor.pendAllChunkSpecs(0)
121         tor.cl.mu.Lock()
122         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
123         tor.cl.mu.Unlock()
124         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
125 }
126
127 func TestUnmarshalPEXMsg(t *testing.T) {
128         var m peerExchangeMessage
129         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
130                 t.Fatal(err)
131         }
132         if len(m.Added) != 2 {
133                 t.FailNow()
134         }
135         if m.Added[0].Port != 0x506 {
136                 t.FailNow()
137         }
138 }
139
140 func TestReducedDialTimeout(t *testing.T) {
141         cfg := NewDefaultClientConfig()
142         for _, _case := range []struct {
143                 Max             time.Duration
144                 HalfOpenLimit   int
145                 PendingPeers    int
146                 ExpectedReduced time.Duration
147         }{
148                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
149                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
150                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
151                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
152                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
153                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
154         } {
155                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
156                 expected := _case.ExpectedReduced
157                 if expected < cfg.MinDialTimeout {
158                         expected = cfg.MinDialTimeout
159                 }
160                 if reduced != expected {
161                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
162                 }
163         }
164 }
165
166 func TestAddDropManyTorrents(t *testing.T) {
167         cl, err := NewClient(TestingConfig())
168         require.NoError(t, err)
169         defer cl.Close()
170         for i := range iter.N(1000) {
171                 var spec TorrentSpec
172                 binary.PutVarint(spec.InfoHash[:], int64(i))
173                 tt, new, err := cl.AddTorrentSpec(&spec)
174                 assert.NoError(t, err)
175                 assert.True(t, new)
176                 defer tt.Drop()
177         }
178 }
179
180 type FileCacheClientStorageFactoryParams struct {
181         Capacity    int64
182         SetCapacity bool
183         Wrapper     func(*filecache.Cache) storage.ClientImpl
184 }
185
186 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
187         return func(dataDir string) storage.ClientImpl {
188                 fc, err := filecache.NewCache(dataDir)
189                 if err != nil {
190                         panic(err)
191                 }
192                 if ps.SetCapacity {
193                         fc.SetCapacity(ps.Capacity)
194                 }
195                 return ps.Wrapper(fc)
196         }
197 }
198
199 type storageFactory func(string) storage.ClientImpl
200
201 func TestClientTransferDefault(t *testing.T) {
202         testClientTransfer(t, testClientTransferParams{
203                 ExportClientStatus: true,
204                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
205                         Wrapper: fileCachePieceResourceStorage,
206                 }),
207         })
208 }
209
210 func TestClientTransferRateLimitedUpload(t *testing.T) {
211         started := time.Now()
212         testClientTransfer(t, testClientTransferParams{
213                 // We are uploading 13 bytes (the length of the greeting torrent). The
214                 // chunks are 2 bytes in length. Then the smallest burst we can run
215                 // with is 2. Time taken is (13-burst)/rate.
216                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
217                 ExportClientStatus:      true,
218         })
219         require.True(t, time.Since(started) > time.Second)
220 }
221
222 func TestClientTransferRateLimitedDownload(t *testing.T) {
223         testClientTransfer(t, testClientTransferParams{
224                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
225         })
226 }
227
228 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
229         return storage.NewResourcePieces(fc.AsResourceProvider())
230 }
231
232 func TestClientTransferSmallCache(t *testing.T) {
233         testClientTransfer(t, testClientTransferParams{
234                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
235                         SetCapacity: true,
236                         // Going below the piece length means it can't complete a piece so
237                         // that it can be hashed.
238                         Capacity: 5,
239                         Wrapper:  fileCachePieceResourceStorage,
240                 }),
241                 SetReadahead: true,
242                 // Can't readahead too far or the cache will thrash and drop data we
243                 // thought we had.
244                 Readahead:          0,
245                 ExportClientStatus: true,
246         })
247 }
248
249 func TestClientTransferVarious(t *testing.T) {
250         // Leecher storage
251         for _, ls := range []storageFactory{
252                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
253                         Wrapper: fileCachePieceResourceStorage,
254                 }),
255                 storage.NewBoltDB,
256         } {
257                 // Seeder storage
258                 for _, ss := range []func(string) storage.ClientImpl{
259                         storage.NewFile,
260                         storage.NewMMap,
261                 } {
262                         for _, responsive := range []bool{false, true} {
263                                 testClientTransfer(t, testClientTransferParams{
264                                         Responsive:     responsive,
265                                         SeederStorage:  ss,
266                                         LeecherStorage: ls,
267                                 })
268                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
269                                         testClientTransfer(t, testClientTransferParams{
270                                                 SeederStorage:  ss,
271                                                 Responsive:     responsive,
272                                                 SetReadahead:   true,
273                                                 Readahead:      readahead,
274                                                 LeecherStorage: ls,
275                                         })
276                                 }
277                         }
278                 }
279         }
280 }
281
282 type testClientTransferParams struct {
283         Responsive                 bool
284         Readahead                  int64
285         SetReadahead               bool
286         ExportClientStatus         bool
287         LeecherStorage             func(string) storage.ClientImpl
288         SeederStorage              func(string) storage.ClientImpl
289         SeederUploadRateLimiter    *rate.Limiter
290         LeecherDownloadRateLimiter *rate.Limiter
291 }
292
293 // Creates a seeder and a leecher, and ensures the data transfers when a read
294 // is attempted on the leecher.
295 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
296         greetingTempDir, mi := testutil.GreetingTestTorrent()
297         defer os.RemoveAll(greetingTempDir)
298         // Create seeder and a Torrent.
299         cfg := TestingConfig()
300         cfg.Seed = true
301         if ps.SeederUploadRateLimiter != nil {
302                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
303         }
304         // cfg.ListenAddr = "localhost:4000"
305         if ps.SeederStorage != nil {
306                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
307                 defer cfg.DefaultStorage.Close()
308         } else {
309                 cfg.DataDir = greetingTempDir
310         }
311         seeder, err := NewClient(cfg)
312         require.NoError(t, err)
313         if ps.ExportClientStatus {
314                 testutil.ExportStatusWriter(seeder, "s")
315         }
316         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
317         // Run a Stats right after Closing the Client. This will trigger the Stats
318         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
319         defer seederTorrent.Stats()
320         defer seeder.Close()
321         seederTorrent.VerifyData()
322         // Create leecher and a Torrent.
323         leecherDataDir, err := ioutil.TempDir("", "")
324         require.NoError(t, err)
325         defer os.RemoveAll(leecherDataDir)
326         cfg = TestingConfig()
327         if ps.LeecherStorage == nil {
328                 cfg.DataDir = leecherDataDir
329         } else {
330                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
331         }
332         if ps.LeecherDownloadRateLimiter != nil {
333                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
334         }
335         cfg.Seed = false
336         leecher, err := NewClient(cfg)
337         require.NoError(t, err)
338         defer leecher.Close()
339         if ps.ExportClientStatus {
340                 testutil.ExportStatusWriter(leecher, "l")
341         }
342         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
343                 ret = TorrentSpecFromMetaInfo(mi)
344                 ret.ChunkSize = 2
345                 return
346         }())
347         require.NoError(t, err)
348         assert.True(t, new)
349         // Now do some things with leecher and seeder.
350         leecherTorrent.AddClientPeer(seeder)
351         // The Torrent should not be interested in obtaining peers, so the one we
352         // just added should be the only one.
353         assert.False(t, leecherTorrent.Seeding())
354         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
355         r := leecherTorrent.NewReader()
356         defer r.Close()
357         if ps.Responsive {
358                 r.SetResponsive()
359         }
360         if ps.SetReadahead {
361                 r.SetReadahead(ps.Readahead)
362         }
363         assertReadAllGreeting(t, r)
364
365         seederStats := seederTorrent.Stats()
366         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
367         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
368
369         leecherStats := leecherTorrent.Stats()
370         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
371         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
372
373         // Try reading through again for the cases where the torrent data size
374         // exceeds the size of the cache.
375         assertReadAllGreeting(t, r)
376 }
377
378 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
379         pos, err := r.Seek(0, io.SeekStart)
380         assert.NoError(t, err)
381         assert.EqualValues(t, 0, pos)
382         _greeting, err := ioutil.ReadAll(r)
383         assert.NoError(t, err)
384         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
385 }
386
387 // Check that after completing leeching, a leecher transitions to a seeding
388 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
389 func TestSeedAfterDownloading(t *testing.T) {
390         greetingTempDir, mi := testutil.GreetingTestTorrent()
391         defer os.RemoveAll(greetingTempDir)
392
393         cfg := TestingConfig()
394         cfg.Seed = true
395         cfg.DataDir = greetingTempDir
396         seeder, err := NewClient(cfg)
397         require.NoError(t, err)
398         defer seeder.Close()
399         testutil.ExportStatusWriter(seeder, "s")
400         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
401         require.NoError(t, err)
402         assert.True(t, ok)
403         seederTorrent.VerifyData()
404
405         cfg = TestingConfig()
406         cfg.Seed = true
407         cfg.DataDir, err = ioutil.TempDir("", "")
408         require.NoError(t, err)
409         defer os.RemoveAll(cfg.DataDir)
410         leecher, err := NewClient(cfg)
411         require.NoError(t, err)
412         defer leecher.Close()
413         testutil.ExportStatusWriter(leecher, "l")
414
415         cfg = TestingConfig()
416         cfg.Seed = false
417         cfg.DataDir, err = ioutil.TempDir("", "")
418         require.NoError(t, err)
419         defer os.RemoveAll(cfg.DataDir)
420         leecherLeecher, _ := NewClient(cfg)
421         require.NoError(t, err)
422         defer leecherLeecher.Close()
423         testutil.ExportStatusWriter(leecherLeecher, "ll")
424         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
425                 ret = TorrentSpecFromMetaInfo(mi)
426                 ret.ChunkSize = 2
427                 return
428         }())
429         require.NoError(t, err)
430         assert.True(t, ok)
431         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
432                 ret = TorrentSpecFromMetaInfo(mi)
433                 ret.ChunkSize = 3
434                 return
435         }())
436         require.NoError(t, err)
437         assert.True(t, ok)
438         // Simultaneously DownloadAll in Leecher, and read the contents
439         // consecutively in LeecherLeecher. This non-deterministically triggered a
440         // case where the leecher wouldn't unchoke the LeecherLeecher.
441         var wg sync.WaitGroup
442         wg.Add(1)
443         go func() {
444                 defer wg.Done()
445                 r := llg.NewReader()
446                 defer r.Close()
447                 b, err := ioutil.ReadAll(r)
448                 require.NoError(t, err)
449                 assert.EqualValues(t, testutil.GreetingFileContents, b)
450         }()
451         done := make(chan struct{})
452         defer close(done)
453         go leecherGreeting.AddClientPeer(seeder)
454         go leecherGreeting.AddClientPeer(leecherLeecher)
455         wg.Add(1)
456         go func() {
457                 defer wg.Done()
458                 leecherGreeting.DownloadAll()
459                 leecher.WaitAll()
460         }()
461         wg.Wait()
462 }
463
464 func TestMergingTrackersByAddingSpecs(t *testing.T) {
465         cl, err := NewClient(TestingConfig())
466         require.NoError(t, err)
467         defer cl.Close()
468         spec := TorrentSpec{}
469         T, new, _ := cl.AddTorrentSpec(&spec)
470         if !new {
471                 t.FailNow()
472         }
473         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
474         _, new, _ = cl.AddTorrentSpec(&spec)
475         assert.False(t, new)
476         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
477         // Because trackers are disabled in TestingConfig.
478         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
479 }
480
481 // We read from a piece which is marked completed, but is missing data.
482 func TestCompletedPieceWrongSize(t *testing.T) {
483         cfg := TestingConfig()
484         cfg.DefaultStorage = badStorage{}
485         cl, err := NewClient(cfg)
486         require.NoError(t, err)
487         defer cl.Close()
488         info := metainfo.Info{
489                 PieceLength: 15,
490                 Pieces:      make([]byte, 20),
491                 Files: []metainfo.FileInfo{
492                         {Path: []string{"greeting"}, Length: 13},
493                 },
494         }
495         b, err := bencode.Marshal(info)
496         require.NoError(t, err)
497         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
498                 InfoBytes: b,
499                 InfoHash:  metainfo.HashBytes(b),
500         })
501         require.NoError(t, err)
502         defer tt.Drop()
503         assert.True(t, new)
504         r := tt.NewReader()
505         defer r.Close()
506         b, err = ioutil.ReadAll(r)
507         assert.Len(t, b, 13)
508         assert.NoError(t, err)
509 }
510
511 func BenchmarkAddLargeTorrent(b *testing.B) {
512         cfg := TestingConfig()
513         cfg.DisableTCP = true
514         cfg.DisableUTP = true
515         cfg.ListenHost = func(string) string { return "redonk" }
516         cl, err := NewClient(cfg)
517         require.NoError(b, err)
518         defer cl.Close()
519         for range iter.N(b.N) {
520                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
521                 if err != nil {
522                         b.Fatal(err)
523                 }
524                 t.Drop()
525         }
526 }
527
528 func TestResponsive(t *testing.T) {
529         seederDataDir, mi := testutil.GreetingTestTorrent()
530         defer os.RemoveAll(seederDataDir)
531         cfg := TestingConfig()
532         cfg.Seed = true
533         cfg.DataDir = seederDataDir
534         seeder, err := NewClient(cfg)
535         require.Nil(t, err)
536         defer seeder.Close()
537         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
538         seederTorrent.VerifyData()
539         leecherDataDir, err := ioutil.TempDir("", "")
540         require.Nil(t, err)
541         defer os.RemoveAll(leecherDataDir)
542         cfg = TestingConfig()
543         cfg.DataDir = leecherDataDir
544         leecher, err := NewClient(cfg)
545         require.Nil(t, err)
546         defer leecher.Close()
547         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
548                 ret = TorrentSpecFromMetaInfo(mi)
549                 ret.ChunkSize = 2
550                 return
551         }())
552         leecherTorrent.AddClientPeer(seeder)
553         reader := leecherTorrent.NewReader()
554         defer reader.Close()
555         reader.SetReadahead(0)
556         reader.SetResponsive()
557         b := make([]byte, 2)
558         _, err = reader.Seek(3, io.SeekStart)
559         require.NoError(t, err)
560         _, err = io.ReadFull(reader, b)
561         assert.Nil(t, err)
562         assert.EqualValues(t, "lo", string(b))
563         _, err = reader.Seek(11, io.SeekStart)
564         require.NoError(t, err)
565         n, err := io.ReadFull(reader, b)
566         assert.Nil(t, err)
567         assert.EqualValues(t, 2, n)
568         assert.EqualValues(t, "d\n", string(b))
569 }
570
571 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
572         seederDataDir, mi := testutil.GreetingTestTorrent()
573         defer os.RemoveAll(seederDataDir)
574         cfg := TestingConfig()
575         cfg.Seed = true
576         cfg.DataDir = seederDataDir
577         seeder, err := NewClient(cfg)
578         require.Nil(t, err)
579         defer seeder.Close()
580         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
581         seederTorrent.VerifyData()
582         leecherDataDir, err := ioutil.TempDir("", "")
583         require.Nil(t, err)
584         defer os.RemoveAll(leecherDataDir)
585         cfg = TestingConfig()
586         cfg.DataDir = leecherDataDir
587         leecher, err := NewClient(cfg)
588         require.Nil(t, err)
589         defer leecher.Close()
590         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
591                 ret = TorrentSpecFromMetaInfo(mi)
592                 ret.ChunkSize = 2
593                 return
594         }())
595         leecherTorrent.AddClientPeer(seeder)
596         reader := leecherTorrent.NewReader()
597         defer reader.Close()
598         reader.SetReadahead(0)
599         reader.SetResponsive()
600         b := make([]byte, 2)
601         _, err = reader.Seek(3, io.SeekStart)
602         require.NoError(t, err)
603         _, err = io.ReadFull(reader, b)
604         assert.Nil(t, err)
605         assert.EqualValues(t, "lo", string(b))
606         go leecherTorrent.Drop()
607         _, err = reader.Seek(11, io.SeekStart)
608         require.NoError(t, err)
609         n, err := reader.Read(b)
610         assert.EqualError(t, err, "torrent closed")
611         assert.EqualValues(t, 0, n)
612 }
613
614 func TestDHTInheritBlocklist(t *testing.T) {
615         ipl := iplist.New(nil)
616         require.NotNil(t, ipl)
617         cfg := TestingConfig()
618         cfg.IPBlocklist = ipl
619         cfg.NoDHT = false
620         cl, err := NewClient(cfg)
621         require.NoError(t, err)
622         defer cl.Close()
623         numServers := 0
624         cl.eachDhtServer(func(s *dht.Server) {
625                 assert.Equal(t, ipl, s.IPBlocklist())
626                 numServers++
627         })
628         assert.EqualValues(t, 2, numServers)
629 }
630
631 // Check that stuff is merged in subsequent AddTorrentSpec for the same
632 // infohash.
633 func TestAddTorrentSpecMerging(t *testing.T) {
634         cl, err := NewClient(TestingConfig())
635         require.NoError(t, err)
636         defer cl.Close()
637         dir, mi := testutil.GreetingTestTorrent()
638         defer os.RemoveAll(dir)
639         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
640                 InfoHash: mi.HashInfoBytes(),
641         })
642         require.NoError(t, err)
643         require.True(t, new)
644         require.Nil(t, tt.Info())
645         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
646         require.NoError(t, err)
647         require.False(t, new)
648         require.NotNil(t, tt.Info())
649 }
650
651 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
652         dir, mi := testutil.GreetingTestTorrent()
653         os.RemoveAll(dir)
654         cl, _ := NewClient(TestingConfig())
655         defer cl.Close()
656         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
657                 InfoHash: mi.HashInfoBytes(),
658         })
659         tt.Drop()
660         assert.EqualValues(t, 0, len(cl.Torrents()))
661         select {
662         case <-tt.GotInfo():
663                 t.FailNow()
664         default:
665         }
666 }
667
668 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
669         for i := range iter.N(info.NumPieces()) {
670                 p := info.Piece(i)
671                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
672         }
673 }
674
675 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
676         fileCacheDir, err := ioutil.TempDir("", "")
677         require.NoError(t, err)
678         defer os.RemoveAll(fileCacheDir)
679         fileCache, err := filecache.NewCache(fileCacheDir)
680         require.NoError(t, err)
681         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
682         defer os.RemoveAll(greetingDataTempDir)
683         filePieceStore := csf(fileCache)
684         defer filePieceStore.Close()
685         info, err := greetingMetainfo.UnmarshalInfo()
686         require.NoError(t, err)
687         ih := greetingMetainfo.HashInfoBytes()
688         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
689         require.NoError(t, err)
690         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
691         // require.Equal(t, len(testutil.GreetingFileContents), written)
692         // require.NoError(t, err)
693         for i := 0; i < info.NumPieces(); i++ {
694                 p := info.Piece(i)
695                 if alreadyCompleted {
696                         require.NoError(t, greetingData.Piece(p).MarkComplete())
697                 }
698         }
699         cfg := TestingConfig()
700         // TODO: Disable network option?
701         cfg.DisableTCP = true
702         cfg.DisableUTP = true
703         cfg.DefaultStorage = filePieceStore
704         cl, err := NewClient(cfg)
705         require.NoError(t, err)
706         defer cl.Close()
707         tt, err := cl.AddTorrent(greetingMetainfo)
708         require.NoError(t, err)
709         psrs := tt.PieceStateRuns()
710         assert.Len(t, psrs, 1)
711         assert.EqualValues(t, 3, psrs[0].Length)
712         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
713         if alreadyCompleted {
714                 r := tt.NewReader()
715                 b, err := ioutil.ReadAll(r)
716                 assert.NoError(t, err)
717                 assert.EqualValues(t, testutil.GreetingFileContents, b)
718         }
719 }
720
721 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
722         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
723 }
724
725 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
726         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
727 }
728
729 func TestAddMetainfoWithNodes(t *testing.T) {
730         cfg := TestingConfig()
731         cfg.ListenHost = func(string) string { return "" }
732         cfg.NoDHT = false
733         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
734         // For now, we want to just jam the nodes into the table, without
735         // verifying them first. Also the DHT code doesn't support mixing secure
736         // and insecure nodes if security is enabled (yet).
737         // cfg.DHTConfig.NoSecurity = true
738         cl, err := NewClient(cfg)
739         require.NoError(t, err)
740         defer cl.Close()
741         sum := func() (ret int64) {
742                 cl.eachDhtServer(func(s *dht.Server) {
743                         ret += s.Stats().OutboundQueriesAttempted
744                 })
745                 return
746         }
747         assert.EqualValues(t, 0, sum())
748         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
749         require.NoError(t, err)
750         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
751         // check if the announce-list is here instead. TODO: Add nodes.
752         assert.Len(t, tt.metainfo.AnnounceList, 5)
753         // There are 6 nodes in the torrent file.
754         assert.EqualValues(t, 6*len(cl.dhtServers), sum())
755 }
756
757 type testDownloadCancelParams struct {
758         SetLeecherStorageCapacity bool
759         LeecherStorageCapacity    int64
760         Cancel                    bool
761 }
762
763 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
764         greetingTempDir, mi := testutil.GreetingTestTorrent()
765         defer os.RemoveAll(greetingTempDir)
766         cfg := TestingConfig()
767         cfg.Seed = true
768         cfg.DataDir = greetingTempDir
769         seeder, err := NewClient(cfg)
770         require.NoError(t, err)
771         defer seeder.Close()
772         testutil.ExportStatusWriter(seeder, "s")
773         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
774         seederTorrent.VerifyData()
775         leecherDataDir, err := ioutil.TempDir("", "")
776         require.NoError(t, err)
777         defer os.RemoveAll(leecherDataDir)
778         fc, err := filecache.NewCache(leecherDataDir)
779         require.NoError(t, err)
780         if ps.SetLeecherStorageCapacity {
781                 fc.SetCapacity(ps.LeecherStorageCapacity)
782         }
783         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
784         cfg.DataDir = leecherDataDir
785         leecher, _ := NewClient(cfg)
786         defer leecher.Close()
787         testutil.ExportStatusWriter(leecher, "l")
788         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
789                 ret = TorrentSpecFromMetaInfo(mi)
790                 ret.ChunkSize = 2
791                 return
792         }())
793         require.NoError(t, err)
794         assert.True(t, new)
795         psc := leecherGreeting.SubscribePieceStateChanges()
796         defer psc.Close()
797
798         leecherGreeting.cl.mu.Lock()
799         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
800         if ps.Cancel {
801                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
802         }
803         leecherGreeting.cl.mu.Unlock()
804         done := make(chan struct{})
805         defer close(done)
806         go leecherGreeting.AddClientPeer(seeder)
807         completes := make(map[int]bool, 3)
808         expected := func() map[int]bool {
809                 if ps.Cancel {
810                         return map[int]bool{0: false, 1: false, 2: false}
811                 } else {
812                         return map[int]bool{0: true, 1: true, 2: true}
813                 }
814         }()
815         for !reflect.DeepEqual(completes, expected) {
816                 select {
817                 case _v := <-psc.Values:
818                         v := _v.(PieceStateChange)
819                         completes[v.Index] = v.Complete
820                 }
821         }
822 }
823
824 func TestTorrentDownloadAll(t *testing.T) {
825         testDownloadCancel(t, testDownloadCancelParams{})
826 }
827
828 func TestTorrentDownloadAllThenCancel(t *testing.T) {
829         testDownloadCancel(t, testDownloadCancelParams{
830                 Cancel: true,
831         })
832 }
833
834 // Ensure that it's an error for a peer to send an invalid have message.
835 func TestPeerInvalidHave(t *testing.T) {
836         cl, err := NewClient(TestingConfig())
837         require.NoError(t, err)
838         defer cl.Close()
839         info := metainfo.Info{
840                 PieceLength: 1,
841                 Pieces:      make([]byte, 20),
842                 Files:       []metainfo.FileInfo{{Length: 1}},
843         }
844         infoBytes, err := bencode.Marshal(info)
845         require.NoError(t, err)
846         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
847                 InfoBytes: infoBytes,
848                 InfoHash:  metainfo.HashBytes(infoBytes),
849                 Storage:   badStorage{},
850         })
851         require.NoError(t, err)
852         assert.True(t, _new)
853         defer tt.Drop()
854         cn := &connection{
855                 t: tt,
856         }
857         assert.NoError(t, cn.peerSentHave(0))
858         assert.Error(t, cn.peerSentHave(1))
859 }
860
861 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
862         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
863         defer os.RemoveAll(greetingTempDir)
864         cfg := TestingConfig()
865         cfg.DataDir = greetingTempDir
866         seeder, err := NewClient(TestingConfig())
867         require.NoError(t, err)
868         seeder.AddTorrentSpec(&TorrentSpec{
869                 InfoBytes: greetingMetainfo.InfoBytes,
870         })
871 }
872
873 // Check that when the listen port is 0, all the protocols listened on have
874 // the same port, and it isn't zero.
875 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
876         cl, err := NewClient(TestingConfig())
877         require.NoError(t, err)
878         defer cl.Close()
879         port := cl.LocalPort()
880         assert.NotEqual(t, 0, port)
881         cl.eachListener(func(s socket) bool {
882                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
883                 return true
884         })
885 }
886
887 func TestClientDynamicListenTCPOnly(t *testing.T) {
888         cfg := TestingConfig()
889         cfg.DisableUTP = true
890         cl, err := NewClient(cfg)
891         require.NoError(t, err)
892         defer cl.Close()
893         assert.NotEqual(t, 0, cl.LocalPort())
894         cl.eachListener(func(s socket) bool {
895                 assert.True(t, isTcpNetwork(s.Addr().Network()))
896                 return true
897         })
898 }
899
900 func TestClientDynamicListenUTPOnly(t *testing.T) {
901         cfg := TestingConfig()
902         cfg.DisableTCP = true
903         cl, err := NewClient(cfg)
904         require.NoError(t, err)
905         defer cl.Close()
906         assert.NotEqual(t, 0, cl.LocalPort())
907         cl.eachListener(func(s socket) bool {
908                 assert.True(t, isUtpNetwork(s.Addr().Network()))
909                 return true
910         })
911 }
912
913 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
914         cfg := TestingConfig()
915         cfg.DisableTCP = true
916         cfg.DisableUTP = true
917         cl, err := NewClient(cfg)
918         require.NoError(t, err)
919         defer cl.Close()
920         assert.Equal(t, 0, cl.LocalPort())
921 }
922
923 func totalConns(tts []*Torrent) (ret int) {
924         for _, tt := range tts {
925                 tt.cl.mu.Lock()
926                 ret += len(tt.conns)
927                 tt.cl.mu.Unlock()
928         }
929         return
930 }
931
932 func TestSetMaxEstablishedConn(t *testing.T) {
933         ss := testutil.NewStatusServer(t)
934         defer ss.Close()
935         var tts []*Torrent
936         ih := testutil.GreetingMetaInfo().HashInfoBytes()
937         cfg := TestingConfig()
938         cfg.DisableAcceptRateLimiting = true
939         cfg.dropDuplicatePeerIds = true
940         for i := range iter.N(3) {
941                 cl, err := NewClient(cfg)
942                 require.NoError(t, err)
943                 defer cl.Close()
944                 tt, _ := cl.AddTorrentInfoHash(ih)
945                 tt.SetMaxEstablishedConns(2)
946                 ss.HandleStatusWriter(cl, fmt.Sprintf("/%d", i))
947                 tts = append(tts, tt)
948         }
949         addPeers := func() {
950                 for _, tt := range tts {
951                         for _, _tt := range tts {
952                                 // if tt != _tt {
953                                 tt.AddClientPeer(_tt.cl)
954                                 // }
955                         }
956                 }
957         }
958         waitTotalConns := func(num int) {
959                 for totalConns(tts) != num {
960                         addPeers()
961                         time.Sleep(time.Millisecond)
962                 }
963         }
964         addPeers()
965         waitTotalConns(6)
966         tts[0].SetMaxEstablishedConns(1)
967         waitTotalConns(4)
968         tts[0].SetMaxEstablishedConns(0)
969         waitTotalConns(2)
970         tts[0].SetMaxEstablishedConns(1)
971         addPeers()
972         waitTotalConns(4)
973         tts[0].SetMaxEstablishedConns(2)
974         addPeers()
975         waitTotalConns(6)
976 }
977
978 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
979         os.MkdirAll(dir, 0770)
980         file, err := os.Create(filepath.Join(dir, name))
981         require.NoError(t, err)
982         file.Write([]byte(name))
983         file.Close()
984         mi := metainfo.MetaInfo{}
985         mi.SetDefaults()
986         info := metainfo.Info{PieceLength: 256 * 1024}
987         err = info.BuildFromFilePath(filepath.Join(dir, name))
988         require.NoError(t, err)
989         mi.InfoBytes, err = bencode.Marshal(info)
990         require.NoError(t, err)
991         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
992         tr, err := cl.AddTorrent(&mi)
993         require.NoError(t, err)
994         require.True(t, tr.Seeding())
995         tr.VerifyData()
996         return magnet
997 }
998
999 // https://github.com/anacrolix/torrent/issues/114
1000 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1001         cfg := TestingConfig()
1002         cfg.DisableUTP = true
1003         cfg.Seed = true
1004         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1005         cfg.ForceEncryption = true
1006         os.Mkdir(cfg.DataDir, 0755)
1007         server, err := NewClient(cfg)
1008         require.NoError(t, err)
1009         defer server.Close()
1010         testutil.ExportStatusWriter(server, "s")
1011         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1012         makeMagnet(t, server, cfg.DataDir, "test2")
1013         cfg = TestingConfig()
1014         cfg.DisableUTP = true
1015         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1016         cfg.ForceEncryption = true
1017         client, err := NewClient(cfg)
1018         require.NoError(t, err)
1019         defer client.Close()
1020         testutil.ExportStatusWriter(client, "c")
1021         tr, err := client.AddMagnet(magnet1)
1022         require.NoError(t, err)
1023         tr.AddClientPeer(server)
1024         <-tr.GotInfo()
1025         tr.DownloadAll()
1026         client.WaitAll()
1027 }
1028
1029 func TestClientAddressInUse(t *testing.T) {
1030         s, _ := NewUtpSocket("udp", ":50007")
1031         if s != nil {
1032                 defer s.Close()
1033         }
1034         cfg := TestingConfig().SetListenAddr(":50007")
1035         cl, err := NewClient(cfg)
1036         require.Error(t, err)
1037         require.Nil(t, cl)
1038 }