]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Remove TestUTPRawConn
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "os"
9         "path/filepath"
10         "reflect"
11         "sync"
12         "testing"
13         "time"
14
15         "github.com/anacrolix/dht"
16         _ "github.com/anacrolix/envpprof"
17         "github.com/anacrolix/missinggo"
18         "github.com/anacrolix/missinggo/filecache"
19         "github.com/bradfitz/iter"
20         "github.com/stretchr/testify/assert"
21         "github.com/stretchr/testify/require"
22         "golang.org/x/time/rate"
23
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/internal/testutil"
26         "github.com/anacrolix/torrent/iplist"
27         "github.com/anacrolix/torrent/metainfo"
28         "github.com/anacrolix/torrent/storage"
29 )
30
31 func TestingConfig() *ClientConfig {
32         cfg := NewDefaultClientConfig()
33         cfg.ListenHost = LoopbackListenHost
34         cfg.NoDHT = true
35         cfg.DataDir = tempDir()
36         cfg.DisableTrackers = true
37         cfg.NoDefaultPortForwarding = true
38         return cfg
39 }
40
41 func TestClientDefault(t *testing.T) {
42         cl, err := NewClient(TestingConfig())
43         require.NoError(t, err)
44         cl.Close()
45 }
46
47 func TestClientNilConfig(t *testing.T) {
48         cl, err := NewClient(nil)
49         require.NoError(t, err)
50         cl.Close()
51 }
52
53 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
54         cfg := TestingConfig()
55         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
56         require.NoError(t, err)
57         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
58         defer ci.Close()
59         cfg.DefaultStorage = ci
60         cl, err := NewClient(cfg)
61         require.NoError(t, err)
62         cl.Close()
63         // And again, https://github.com/anacrolix/torrent/issues/158
64         cl, err = NewClient(cfg)
65         require.NoError(t, err)
66         cl.Close()
67 }
68
69 func TestAddDropTorrent(t *testing.T) {
70         cl, err := NewClient(TestingConfig())
71         require.NoError(t, err)
72         defer cl.Close()
73         dir, mi := testutil.GreetingTestTorrent()
74         defer os.RemoveAll(dir)
75         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
76         require.NoError(t, err)
77         assert.True(t, new)
78         tt.SetMaxEstablishedConns(0)
79         tt.SetMaxEstablishedConns(1)
80         tt.Drop()
81 }
82
83 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
84         // TODO?
85         t.SkipNow()
86 }
87
88 func TestAddTorrentNoUsableURLs(t *testing.T) {
89         // TODO?
90         t.SkipNow()
91 }
92
93 func TestAddPeersToUnknownTorrent(t *testing.T) {
94         // TODO?
95         t.SkipNow()
96 }
97
98 func TestPieceHashSize(t *testing.T) {
99         assert.Equal(t, 20, pieceHash.Size())
100 }
101
102 func TestTorrentInitialState(t *testing.T) {
103         dir, mi := testutil.GreetingTestTorrent()
104         defer os.RemoveAll(dir)
105         cl := &Client{
106                 config: &ClientConfig{},
107         }
108         cl.initLogger()
109         tor := cl.newTorrent(
110                 mi.HashInfoBytes(),
111                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
112         )
113         tor.setChunkSize(2)
114         tor.cl.mu.Lock()
115         err := tor.setInfoBytes(mi.InfoBytes)
116         tor.cl.mu.Unlock()
117         require.NoError(t, err)
118         require.Len(t, tor.pieces, 3)
119         tor.pendAllChunkSpecs(0)
120         tor.cl.mu.Lock()
121         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
122         tor.cl.mu.Unlock()
123         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
124 }
125
126 func TestUnmarshalPEXMsg(t *testing.T) {
127         var m peerExchangeMessage
128         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
129                 t.Fatal(err)
130         }
131         if len(m.Added) != 2 {
132                 t.FailNow()
133         }
134         if m.Added[0].Port != 0x506 {
135                 t.FailNow()
136         }
137 }
138
139 func TestReducedDialTimeout(t *testing.T) {
140         cfg := NewDefaultClientConfig()
141         for _, _case := range []struct {
142                 Max             time.Duration
143                 HalfOpenLimit   int
144                 PendingPeers    int
145                 ExpectedReduced time.Duration
146         }{
147                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
148                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
149                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
150                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
151                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
152                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
153         } {
154                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
155                 expected := _case.ExpectedReduced
156                 if expected < cfg.MinDialTimeout {
157                         expected = cfg.MinDialTimeout
158                 }
159                 if reduced != expected {
160                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
161                 }
162         }
163 }
164
165 func TestAddDropManyTorrents(t *testing.T) {
166         cl, err := NewClient(TestingConfig())
167         require.NoError(t, err)
168         defer cl.Close()
169         for i := range iter.N(1000) {
170                 var spec TorrentSpec
171                 binary.PutVarint(spec.InfoHash[:], int64(i))
172                 tt, new, err := cl.AddTorrentSpec(&spec)
173                 assert.NoError(t, err)
174                 assert.True(t, new)
175                 defer tt.Drop()
176         }
177 }
178
179 type FileCacheClientStorageFactoryParams struct {
180         Capacity    int64
181         SetCapacity bool
182         Wrapper     func(*filecache.Cache) storage.ClientImpl
183 }
184
185 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
186         return func(dataDir string) storage.ClientImpl {
187                 fc, err := filecache.NewCache(dataDir)
188                 if err != nil {
189                         panic(err)
190                 }
191                 if ps.SetCapacity {
192                         fc.SetCapacity(ps.Capacity)
193                 }
194                 return ps.Wrapper(fc)
195         }
196 }
197
198 type storageFactory func(string) storage.ClientImpl
199
200 func TestClientTransferDefault(t *testing.T) {
201         testClientTransfer(t, testClientTransferParams{
202                 ExportClientStatus: true,
203                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
204                         Wrapper: fileCachePieceResourceStorage,
205                 }),
206         })
207 }
208
209 func TestClientTransferRateLimitedUpload(t *testing.T) {
210         started := time.Now()
211         testClientTransfer(t, testClientTransferParams{
212                 // We are uploading 13 bytes (the length of the greeting torrent). The
213                 // chunks are 2 bytes in length. Then the smallest burst we can run
214                 // with is 2. Time taken is (13-burst)/rate.
215                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
216                 ExportClientStatus:      true,
217         })
218         require.True(t, time.Since(started) > time.Second)
219 }
220
221 func TestClientTransferRateLimitedDownload(t *testing.T) {
222         testClientTransfer(t, testClientTransferParams{
223                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
224         })
225 }
226
227 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
228         return storage.NewResourcePieces(fc.AsResourceProvider())
229 }
230
231 func TestClientTransferSmallCache(t *testing.T) {
232         testClientTransfer(t, testClientTransferParams{
233                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
234                         SetCapacity: true,
235                         // Going below the piece length means it can't complete a piece so
236                         // that it can be hashed.
237                         Capacity: 5,
238                         Wrapper:  fileCachePieceResourceStorage,
239                 }),
240                 SetReadahead: true,
241                 // Can't readahead too far or the cache will thrash and drop data we
242                 // thought we had.
243                 Readahead:          0,
244                 ExportClientStatus: true,
245         })
246 }
247
248 func TestClientTransferVarious(t *testing.T) {
249         // Leecher storage
250         for _, ls := range []storageFactory{
251                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
252                         Wrapper: fileCachePieceResourceStorage,
253                 }),
254                 storage.NewBoltDB,
255         } {
256                 // Seeder storage
257                 for _, ss := range []func(string) storage.ClientImpl{
258                         storage.NewFile,
259                         storage.NewMMap,
260                 } {
261                         for _, responsive := range []bool{false, true} {
262                                 testClientTransfer(t, testClientTransferParams{
263                                         Responsive:     responsive,
264                                         SeederStorage:  ss,
265                                         LeecherStorage: ls,
266                                 })
267                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
268                                         testClientTransfer(t, testClientTransferParams{
269                                                 SeederStorage:  ss,
270                                                 Responsive:     responsive,
271                                                 SetReadahead:   true,
272                                                 Readahead:      readahead,
273                                                 LeecherStorage: ls,
274                                         })
275                                 }
276                         }
277                 }
278         }
279 }
280
281 type testClientTransferParams struct {
282         Responsive                 bool
283         Readahead                  int64
284         SetReadahead               bool
285         ExportClientStatus         bool
286         LeecherStorage             func(string) storage.ClientImpl
287         SeederStorage              func(string) storage.ClientImpl
288         SeederUploadRateLimiter    *rate.Limiter
289         LeecherDownloadRateLimiter *rate.Limiter
290 }
291
292 // Creates a seeder and a leecher, and ensures the data transfers when a read
293 // is attempted on the leecher.
294 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
295         greetingTempDir, mi := testutil.GreetingTestTorrent()
296         defer os.RemoveAll(greetingTempDir)
297         // Create seeder and a Torrent.
298         cfg := TestingConfig()
299         cfg.Seed = true
300         if ps.SeederUploadRateLimiter != nil {
301                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
302         }
303         // cfg.ListenAddr = "localhost:4000"
304         if ps.SeederStorage != nil {
305                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
306                 defer cfg.DefaultStorage.Close()
307         } else {
308                 cfg.DataDir = greetingTempDir
309         }
310         seeder, err := NewClient(cfg)
311         require.NoError(t, err)
312         if ps.ExportClientStatus {
313                 testutil.ExportStatusWriter(seeder, "s")
314         }
315         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
316         // Run a Stats right after Closing the Client. This will trigger the Stats
317         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
318         defer seederTorrent.Stats()
319         defer seeder.Close()
320         seederTorrent.VerifyData()
321         // Create leecher and a Torrent.
322         leecherDataDir, err := ioutil.TempDir("", "")
323         require.NoError(t, err)
324         defer os.RemoveAll(leecherDataDir)
325         cfg = TestingConfig()
326         if ps.LeecherStorage == nil {
327                 cfg.DataDir = leecherDataDir
328         } else {
329                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
330         }
331         if ps.LeecherDownloadRateLimiter != nil {
332                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
333         }
334         cfg.Seed = false
335         leecher, err := NewClient(cfg)
336         require.NoError(t, err)
337         defer leecher.Close()
338         if ps.ExportClientStatus {
339                 testutil.ExportStatusWriter(leecher, "l")
340         }
341         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
342                 ret = TorrentSpecFromMetaInfo(mi)
343                 ret.ChunkSize = 2
344                 return
345         }())
346         require.NoError(t, err)
347         assert.True(t, new)
348         // Now do some things with leecher and seeder.
349         leecherTorrent.AddClientPeer(seeder)
350         // The Torrent should not be interested in obtaining peers, so the one we
351         // just added should be the only one.
352         assert.False(t, leecherTorrent.Seeding())
353         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
354         r := leecherTorrent.NewReader()
355         defer r.Close()
356         if ps.Responsive {
357                 r.SetResponsive()
358         }
359         if ps.SetReadahead {
360                 r.SetReadahead(ps.Readahead)
361         }
362         assertReadAllGreeting(t, r)
363
364         seederStats := seederTorrent.Stats()
365         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
366         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
367
368         leecherStats := leecherTorrent.Stats()
369         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
370         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
371
372         // Try reading through again for the cases where the torrent data size
373         // exceeds the size of the cache.
374         assertReadAllGreeting(t, r)
375 }
376
377 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
378         pos, err := r.Seek(0, io.SeekStart)
379         assert.NoError(t, err)
380         assert.EqualValues(t, 0, pos)
381         _greeting, err := ioutil.ReadAll(r)
382         assert.NoError(t, err)
383         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
384 }
385
386 // Check that after completing leeching, a leecher transitions to a seeding
387 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
388 func TestSeedAfterDownloading(t *testing.T) {
389         greetingTempDir, mi := testutil.GreetingTestTorrent()
390         defer os.RemoveAll(greetingTempDir)
391
392         cfg := TestingConfig()
393         cfg.Seed = true
394         cfg.DataDir = greetingTempDir
395         seeder, err := NewClient(cfg)
396         require.NoError(t, err)
397         defer seeder.Close()
398         testutil.ExportStatusWriter(seeder, "s")
399         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
400         require.NoError(t, err)
401         assert.True(t, ok)
402         seederTorrent.VerifyData()
403
404         cfg = TestingConfig()
405         cfg.Seed = true
406         cfg.DataDir, err = ioutil.TempDir("", "")
407         require.NoError(t, err)
408         defer os.RemoveAll(cfg.DataDir)
409         leecher, err := NewClient(cfg)
410         require.NoError(t, err)
411         defer leecher.Close()
412         testutil.ExportStatusWriter(leecher, "l")
413
414         cfg = TestingConfig()
415         cfg.Seed = false
416         cfg.DataDir, err = ioutil.TempDir("", "")
417         require.NoError(t, err)
418         defer os.RemoveAll(cfg.DataDir)
419         leecherLeecher, _ := NewClient(cfg)
420         require.NoError(t, err)
421         defer leecherLeecher.Close()
422         testutil.ExportStatusWriter(leecherLeecher, "ll")
423         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
424                 ret = TorrentSpecFromMetaInfo(mi)
425                 ret.ChunkSize = 2
426                 return
427         }())
428         require.NoError(t, err)
429         assert.True(t, ok)
430         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
431                 ret = TorrentSpecFromMetaInfo(mi)
432                 ret.ChunkSize = 3
433                 return
434         }())
435         require.NoError(t, err)
436         assert.True(t, ok)
437         // Simultaneously DownloadAll in Leecher, and read the contents
438         // consecutively in LeecherLeecher. This non-deterministically triggered a
439         // case where the leecher wouldn't unchoke the LeecherLeecher.
440         var wg sync.WaitGroup
441         wg.Add(1)
442         go func() {
443                 defer wg.Done()
444                 r := llg.NewReader()
445                 defer r.Close()
446                 b, err := ioutil.ReadAll(r)
447                 require.NoError(t, err)
448                 assert.EqualValues(t, testutil.GreetingFileContents, b)
449         }()
450         done := make(chan struct{})
451         defer close(done)
452         go leecherGreeting.AddClientPeer(seeder)
453         go leecherGreeting.AddClientPeer(leecherLeecher)
454         wg.Add(1)
455         go func() {
456                 defer wg.Done()
457                 leecherGreeting.DownloadAll()
458                 leecher.WaitAll()
459         }()
460         wg.Wait()
461 }
462
463 func TestMergingTrackersByAddingSpecs(t *testing.T) {
464         cl, err := NewClient(TestingConfig())
465         require.NoError(t, err)
466         defer cl.Close()
467         spec := TorrentSpec{}
468         T, new, _ := cl.AddTorrentSpec(&spec)
469         if !new {
470                 t.FailNow()
471         }
472         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
473         _, new, _ = cl.AddTorrentSpec(&spec)
474         assert.False(t, new)
475         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
476         // Because trackers are disabled in TestingConfig.
477         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
478 }
479
480 // We read from a piece which is marked completed, but is missing data.
481 func TestCompletedPieceWrongSize(t *testing.T) {
482         cfg := TestingConfig()
483         cfg.DefaultStorage = badStorage{}
484         cl, err := NewClient(cfg)
485         require.NoError(t, err)
486         defer cl.Close()
487         info := metainfo.Info{
488                 PieceLength: 15,
489                 Pieces:      make([]byte, 20),
490                 Files: []metainfo.FileInfo{
491                         {Path: []string{"greeting"}, Length: 13},
492                 },
493         }
494         b, err := bencode.Marshal(info)
495         require.NoError(t, err)
496         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
497                 InfoBytes: b,
498                 InfoHash:  metainfo.HashBytes(b),
499         })
500         require.NoError(t, err)
501         defer tt.Drop()
502         assert.True(t, new)
503         r := tt.NewReader()
504         defer r.Close()
505         b, err = ioutil.ReadAll(r)
506         assert.Len(t, b, 13)
507         assert.NoError(t, err)
508 }
509
510 func BenchmarkAddLargeTorrent(b *testing.B) {
511         cfg := TestingConfig()
512         cfg.DisableTCP = true
513         cfg.DisableUTP = true
514         cfg.ListenHost = func(string) string { return "redonk" }
515         cl, err := NewClient(cfg)
516         require.NoError(b, err)
517         defer cl.Close()
518         for range iter.N(b.N) {
519                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
520                 if err != nil {
521                         b.Fatal(err)
522                 }
523                 t.Drop()
524         }
525 }
526
527 func TestResponsive(t *testing.T) {
528         seederDataDir, mi := testutil.GreetingTestTorrent()
529         defer os.RemoveAll(seederDataDir)
530         cfg := TestingConfig()
531         cfg.Seed = true
532         cfg.DataDir = seederDataDir
533         seeder, err := NewClient(cfg)
534         require.Nil(t, err)
535         defer seeder.Close()
536         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
537         seederTorrent.VerifyData()
538         leecherDataDir, err := ioutil.TempDir("", "")
539         require.Nil(t, err)
540         defer os.RemoveAll(leecherDataDir)
541         cfg = TestingConfig()
542         cfg.DataDir = leecherDataDir
543         leecher, err := NewClient(cfg)
544         require.Nil(t, err)
545         defer leecher.Close()
546         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
547                 ret = TorrentSpecFromMetaInfo(mi)
548                 ret.ChunkSize = 2
549                 return
550         }())
551         leecherTorrent.AddClientPeer(seeder)
552         reader := leecherTorrent.NewReader()
553         defer reader.Close()
554         reader.SetReadahead(0)
555         reader.SetResponsive()
556         b := make([]byte, 2)
557         _, err = reader.Seek(3, io.SeekStart)
558         require.NoError(t, err)
559         _, err = io.ReadFull(reader, b)
560         assert.Nil(t, err)
561         assert.EqualValues(t, "lo", string(b))
562         _, err = reader.Seek(11, io.SeekStart)
563         require.NoError(t, err)
564         n, err := io.ReadFull(reader, b)
565         assert.Nil(t, err)
566         assert.EqualValues(t, 2, n)
567         assert.EqualValues(t, "d\n", string(b))
568 }
569
570 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
571         seederDataDir, mi := testutil.GreetingTestTorrent()
572         defer os.RemoveAll(seederDataDir)
573         cfg := TestingConfig()
574         cfg.Seed = true
575         cfg.DataDir = seederDataDir
576         seeder, err := NewClient(cfg)
577         require.Nil(t, err)
578         defer seeder.Close()
579         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
580         seederTorrent.VerifyData()
581         leecherDataDir, err := ioutil.TempDir("", "")
582         require.Nil(t, err)
583         defer os.RemoveAll(leecherDataDir)
584         cfg = TestingConfig()
585         cfg.DataDir = leecherDataDir
586         leecher, err := NewClient(cfg)
587         require.Nil(t, err)
588         defer leecher.Close()
589         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
590                 ret = TorrentSpecFromMetaInfo(mi)
591                 ret.ChunkSize = 2
592                 return
593         }())
594         leecherTorrent.AddClientPeer(seeder)
595         reader := leecherTorrent.NewReader()
596         defer reader.Close()
597         reader.SetReadahead(0)
598         reader.SetResponsive()
599         b := make([]byte, 2)
600         _, err = reader.Seek(3, io.SeekStart)
601         require.NoError(t, err)
602         _, err = io.ReadFull(reader, b)
603         assert.Nil(t, err)
604         assert.EqualValues(t, "lo", string(b))
605         go leecherTorrent.Drop()
606         _, err = reader.Seek(11, io.SeekStart)
607         require.NoError(t, err)
608         n, err := reader.Read(b)
609         assert.EqualError(t, err, "torrent closed")
610         assert.EqualValues(t, 0, n)
611 }
612
613 func TestDHTInheritBlocklist(t *testing.T) {
614         ipl := iplist.New(nil)
615         require.NotNil(t, ipl)
616         cfg := TestingConfig()
617         cfg.IPBlocklist = ipl
618         cfg.NoDHT = false
619         cl, err := NewClient(cfg)
620         require.NoError(t, err)
621         defer cl.Close()
622         numServers := 0
623         cl.eachDhtServer(func(s *dht.Server) {
624                 assert.Equal(t, ipl, s.IPBlocklist())
625                 numServers++
626         })
627         assert.EqualValues(t, 2, numServers)
628 }
629
630 // Check that stuff is merged in subsequent AddTorrentSpec for the same
631 // infohash.
632 func TestAddTorrentSpecMerging(t *testing.T) {
633         cl, err := NewClient(TestingConfig())
634         require.NoError(t, err)
635         defer cl.Close()
636         dir, mi := testutil.GreetingTestTorrent()
637         defer os.RemoveAll(dir)
638         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
639                 InfoHash: mi.HashInfoBytes(),
640         })
641         require.NoError(t, err)
642         require.True(t, new)
643         require.Nil(t, tt.Info())
644         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
645         require.NoError(t, err)
646         require.False(t, new)
647         require.NotNil(t, tt.Info())
648 }
649
650 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
651         dir, mi := testutil.GreetingTestTorrent()
652         os.RemoveAll(dir)
653         cl, _ := NewClient(TestingConfig())
654         defer cl.Close()
655         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
656                 InfoHash: mi.HashInfoBytes(),
657         })
658         tt.Drop()
659         assert.EqualValues(t, 0, len(cl.Torrents()))
660         select {
661         case <-tt.GotInfo():
662                 t.FailNow()
663         default:
664         }
665 }
666
667 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
668         for i := range iter.N(info.NumPieces()) {
669                 p := info.Piece(i)
670                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
671         }
672 }
673
674 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
675         fileCacheDir, err := ioutil.TempDir("", "")
676         require.NoError(t, err)
677         defer os.RemoveAll(fileCacheDir)
678         fileCache, err := filecache.NewCache(fileCacheDir)
679         require.NoError(t, err)
680         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
681         defer os.RemoveAll(greetingDataTempDir)
682         filePieceStore := csf(fileCache)
683         defer filePieceStore.Close()
684         info, err := greetingMetainfo.UnmarshalInfo()
685         require.NoError(t, err)
686         ih := greetingMetainfo.HashInfoBytes()
687         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
688         require.NoError(t, err)
689         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
690         // require.Equal(t, len(testutil.GreetingFileContents), written)
691         // require.NoError(t, err)
692         for i := 0; i < info.NumPieces(); i++ {
693                 p := info.Piece(i)
694                 if alreadyCompleted {
695                         require.NoError(t, greetingData.Piece(p).MarkComplete())
696                 }
697         }
698         cfg := TestingConfig()
699         // TODO: Disable network option?
700         cfg.DisableTCP = true
701         cfg.DisableUTP = true
702         cfg.DefaultStorage = filePieceStore
703         cl, err := NewClient(cfg)
704         require.NoError(t, err)
705         defer cl.Close()
706         tt, err := cl.AddTorrent(greetingMetainfo)
707         require.NoError(t, err)
708         psrs := tt.PieceStateRuns()
709         assert.Len(t, psrs, 1)
710         assert.EqualValues(t, 3, psrs[0].Length)
711         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
712         if alreadyCompleted {
713                 r := tt.NewReader()
714                 b, err := ioutil.ReadAll(r)
715                 assert.NoError(t, err)
716                 assert.EqualValues(t, testutil.GreetingFileContents, b)
717         }
718 }
719
720 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
721         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
722 }
723
724 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
725         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
726 }
727
728 func TestAddMetainfoWithNodes(t *testing.T) {
729         cfg := TestingConfig()
730         cfg.ListenHost = func(string) string { return "" }
731         cfg.NoDHT = false
732         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
733         // For now, we want to just jam the nodes into the table, without
734         // verifying them first. Also the DHT code doesn't support mixing secure
735         // and insecure nodes if security is enabled (yet).
736         // cfg.DHTConfig.NoSecurity = true
737         cl, err := NewClient(cfg)
738         require.NoError(t, err)
739         defer cl.Close()
740         sum := func() (ret int64) {
741                 cl.eachDhtServer(func(s *dht.Server) {
742                         ret += s.Stats().OutboundQueriesAttempted
743                 })
744                 return
745         }
746         assert.EqualValues(t, 0, sum())
747         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
748         require.NoError(t, err)
749         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
750         // check if the announce-list is here instead. TODO: Add nodes.
751         assert.Len(t, tt.metainfo.AnnounceList, 5)
752         // There are 6 nodes in the torrent file.
753         assert.EqualValues(t, 6*len(cl.dhtServers), sum())
754 }
755
756 type testDownloadCancelParams struct {
757         SetLeecherStorageCapacity bool
758         LeecherStorageCapacity    int64
759         Cancel                    bool
760 }
761
762 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
763         greetingTempDir, mi := testutil.GreetingTestTorrent()
764         defer os.RemoveAll(greetingTempDir)
765         cfg := TestingConfig()
766         cfg.Seed = true
767         cfg.DataDir = greetingTempDir
768         seeder, err := NewClient(cfg)
769         require.NoError(t, err)
770         defer seeder.Close()
771         testutil.ExportStatusWriter(seeder, "s")
772         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
773         seederTorrent.VerifyData()
774         leecherDataDir, err := ioutil.TempDir("", "")
775         require.NoError(t, err)
776         defer os.RemoveAll(leecherDataDir)
777         fc, err := filecache.NewCache(leecherDataDir)
778         require.NoError(t, err)
779         if ps.SetLeecherStorageCapacity {
780                 fc.SetCapacity(ps.LeecherStorageCapacity)
781         }
782         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
783         cfg.DataDir = leecherDataDir
784         leecher, _ := NewClient(cfg)
785         defer leecher.Close()
786         testutil.ExportStatusWriter(leecher, "l")
787         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
788                 ret = TorrentSpecFromMetaInfo(mi)
789                 ret.ChunkSize = 2
790                 return
791         }())
792         require.NoError(t, err)
793         assert.True(t, new)
794         psc := leecherGreeting.SubscribePieceStateChanges()
795         defer psc.Close()
796
797         leecherGreeting.cl.mu.Lock()
798         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
799         if ps.Cancel {
800                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
801         }
802         leecherGreeting.cl.mu.Unlock()
803         done := make(chan struct{})
804         defer close(done)
805         go leecherGreeting.AddClientPeer(seeder)
806         completes := make(map[int]bool, 3)
807         expected := func() map[int]bool {
808                 if ps.Cancel {
809                         return map[int]bool{0: false, 1: false, 2: false}
810                 } else {
811                         return map[int]bool{0: true, 1: true, 2: true}
812                 }
813         }()
814         for !reflect.DeepEqual(completes, expected) {
815                 select {
816                 case _v := <-psc.Values:
817                         v := _v.(PieceStateChange)
818                         completes[v.Index] = v.Complete
819                 }
820         }
821 }
822
823 func TestTorrentDownloadAll(t *testing.T) {
824         testDownloadCancel(t, testDownloadCancelParams{})
825 }
826
827 func TestTorrentDownloadAllThenCancel(t *testing.T) {
828         testDownloadCancel(t, testDownloadCancelParams{
829                 Cancel: true,
830         })
831 }
832
833 // Ensure that it's an error for a peer to send an invalid have message.
834 func TestPeerInvalidHave(t *testing.T) {
835         cl, err := NewClient(TestingConfig())
836         require.NoError(t, err)
837         defer cl.Close()
838         info := metainfo.Info{
839                 PieceLength: 1,
840                 Pieces:      make([]byte, 20),
841                 Files:       []metainfo.FileInfo{{Length: 1}},
842         }
843         infoBytes, err := bencode.Marshal(info)
844         require.NoError(t, err)
845         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
846                 InfoBytes: infoBytes,
847                 InfoHash:  metainfo.HashBytes(infoBytes),
848                 Storage:   badStorage{},
849         })
850         require.NoError(t, err)
851         assert.True(t, _new)
852         defer tt.Drop()
853         cn := &connection{
854                 t: tt,
855         }
856         assert.NoError(t, cn.peerSentHave(0))
857         assert.Error(t, cn.peerSentHave(1))
858 }
859
860 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
861         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
862         defer os.RemoveAll(greetingTempDir)
863         cfg := TestingConfig()
864         cfg.DataDir = greetingTempDir
865         seeder, err := NewClient(TestingConfig())
866         require.NoError(t, err)
867         seeder.AddTorrentSpec(&TorrentSpec{
868                 InfoBytes: greetingMetainfo.InfoBytes,
869         })
870 }
871
872 // Check that when the listen port is 0, all the protocols listened on have
873 // the same port, and it isn't zero.
874 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
875         cl, err := NewClient(TestingConfig())
876         require.NoError(t, err)
877         defer cl.Close()
878         port := cl.LocalPort()
879         assert.NotEqual(t, 0, port)
880         cl.eachListener(func(s socket) bool {
881                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
882                 return true
883         })
884 }
885
886 func TestClientDynamicListenTCPOnly(t *testing.T) {
887         cfg := TestingConfig()
888         cfg.DisableUTP = true
889         cl, err := NewClient(cfg)
890         require.NoError(t, err)
891         defer cl.Close()
892         assert.NotEqual(t, 0, cl.LocalPort())
893         cl.eachListener(func(s socket) bool {
894                 assert.True(t, isTcpNetwork(s.Addr().Network()))
895                 return true
896         })
897 }
898
899 func TestClientDynamicListenUTPOnly(t *testing.T) {
900         cfg := TestingConfig()
901         cfg.DisableTCP = true
902         cl, err := NewClient(cfg)
903         require.NoError(t, err)
904         defer cl.Close()
905         assert.NotEqual(t, 0, cl.LocalPort())
906         cl.eachListener(func(s socket) bool {
907                 assert.True(t, isUtpNetwork(s.Addr().Network()))
908                 return true
909         })
910 }
911
912 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
913         cfg := TestingConfig()
914         cfg.DisableTCP = true
915         cfg.DisableUTP = true
916         cl, err := NewClient(cfg)
917         require.NoError(t, err)
918         defer cl.Close()
919         assert.Equal(t, 0, cl.LocalPort())
920 }
921
922 func totalConns(tts []*Torrent) (ret int) {
923         for _, tt := range tts {
924                 tt.cl.mu.Lock()
925                 ret += len(tt.conns)
926                 tt.cl.mu.Unlock()
927         }
928         return
929 }
930
931 func TestSetMaxEstablishedConn(t *testing.T) {
932         ss := testutil.NewStatusServer(t)
933         defer ss.Close()
934         var tts []*Torrent
935         ih := testutil.GreetingMetaInfo().HashInfoBytes()
936         cfg := TestingConfig()
937         cfg.DisableAcceptRateLimiting = true
938         cfg.dropDuplicatePeerIds = true
939         for i := range iter.N(3) {
940                 cl, err := NewClient(cfg)
941                 require.NoError(t, err)
942                 defer cl.Close()
943                 tt, _ := cl.AddTorrentInfoHash(ih)
944                 tt.SetMaxEstablishedConns(2)
945                 ss.HandleStatusWriter(cl, fmt.Sprintf("/%d", i))
946                 tts = append(tts, tt)
947         }
948         addPeers := func() {
949                 for _, tt := range tts {
950                         for _, _tt := range tts {
951                                 // if tt != _tt {
952                                 tt.AddClientPeer(_tt.cl)
953                                 // }
954                         }
955                 }
956         }
957         waitTotalConns := func(num int) {
958                 for totalConns(tts) != num {
959                         addPeers()
960                         time.Sleep(time.Millisecond)
961                 }
962         }
963         addPeers()
964         waitTotalConns(6)
965         tts[0].SetMaxEstablishedConns(1)
966         waitTotalConns(4)
967         tts[0].SetMaxEstablishedConns(0)
968         waitTotalConns(2)
969         tts[0].SetMaxEstablishedConns(1)
970         addPeers()
971         waitTotalConns(4)
972         tts[0].SetMaxEstablishedConns(2)
973         addPeers()
974         waitTotalConns(6)
975 }
976
977 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
978         os.MkdirAll(dir, 0770)
979         file, err := os.Create(filepath.Join(dir, name))
980         require.NoError(t, err)
981         file.Write([]byte(name))
982         file.Close()
983         mi := metainfo.MetaInfo{}
984         mi.SetDefaults()
985         info := metainfo.Info{PieceLength: 256 * 1024}
986         err = info.BuildFromFilePath(filepath.Join(dir, name))
987         require.NoError(t, err)
988         mi.InfoBytes, err = bencode.Marshal(info)
989         require.NoError(t, err)
990         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
991         tr, err := cl.AddTorrent(&mi)
992         require.NoError(t, err)
993         require.True(t, tr.Seeding())
994         tr.VerifyData()
995         return magnet
996 }
997
998 // https://github.com/anacrolix/torrent/issues/114
999 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1000         cfg := TestingConfig()
1001         cfg.DisableUTP = true
1002         cfg.Seed = true
1003         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1004         cfg.ForceEncryption = true
1005         os.Mkdir(cfg.DataDir, 0755)
1006         server, err := NewClient(cfg)
1007         require.NoError(t, err)
1008         defer server.Close()
1009         testutil.ExportStatusWriter(server, "s")
1010         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1011         makeMagnet(t, server, cfg.DataDir, "test2")
1012         cfg = TestingConfig()
1013         cfg.DisableUTP = true
1014         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1015         cfg.ForceEncryption = true
1016         client, err := NewClient(cfg)
1017         require.NoError(t, err)
1018         defer client.Close()
1019         testutil.ExportStatusWriter(client, "c")
1020         tr, err := client.AddMagnet(magnet1)
1021         require.NoError(t, err)
1022         tr.AddClientPeer(server)
1023         <-tr.GotInfo()
1024         tr.DownloadAll()
1025         client.WaitAll()
1026 }
1027
1028 func TestClientAddressInUse(t *testing.T) {
1029         s, _ := NewUtpSocket("udp", ":50007")
1030         if s != nil {
1031                 defer s.Close()
1032         }
1033         cfg := TestingConfig().SetListenAddr(":50007")
1034         cl, err := NewClient(cfg)
1035         require.Error(t, err)
1036         require.Nil(t, cl)
1037 }