]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Add code for dumping log Msg values to TestingConfig
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "os"
9         "path/filepath"
10         "reflect"
11         "sync"
12         "testing"
13         "time"
14
15         "github.com/bradfitz/iter"
16         "github.com/stretchr/testify/assert"
17         "github.com/stretchr/testify/require"
18         "golang.org/x/time/rate"
19
20         "github.com/anacrolix/dht/v2"
21         _ "github.com/anacrolix/envpprof"
22         "github.com/anacrolix/missinggo"
23         "github.com/anacrolix/missinggo/filecache"
24
25         "github.com/anacrolix/torrent/bencode"
26         "github.com/anacrolix/torrent/internal/testutil"
27         "github.com/anacrolix/torrent/iplist"
28         "github.com/anacrolix/torrent/metainfo"
29         "github.com/anacrolix/torrent/storage"
30 )
31
32 func TestingConfig() *ClientConfig {
33         cfg := NewDefaultClientConfig()
34         cfg.ListenHost = LoopbackListenHost
35         cfg.NoDHT = true
36         cfg.DataDir = tempDir()
37         cfg.DisableTrackers = true
38         cfg.NoDefaultPortForwarding = true
39         cfg.DisableAcceptRateLimiting = true
40         cfg.ListenPort = 0
41         //cfg.Debug = true
42         //cfg.Logger = cfg.Logger.WithText(func(m log.Msg) string {
43         //      t := m.Text()
44         //      m.Values(func(i interface{}) bool {
45         //              t += fmt.Sprintf("\n%[1]T: %[1]v", i)
46         //              return true
47         //      })
48         //      return t
49         //})
50         return cfg
51 }
52
53 func TestClientDefault(t *testing.T) {
54         cl, err := NewClient(TestingConfig())
55         require.NoError(t, err)
56         cl.Close()
57 }
58
59 func TestClientNilConfig(t *testing.T) {
60         cl, err := NewClient(nil)
61         require.NoError(t, err)
62         cl.Close()
63 }
64
65 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
66         cfg := TestingConfig()
67         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
68         require.NoError(t, err)
69         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
70         defer ci.Close()
71         cfg.DefaultStorage = ci
72         cl, err := NewClient(cfg)
73         require.NoError(t, err)
74         cl.Close()
75         // And again, https://github.com/anacrolix/torrent/issues/158
76         cl, err = NewClient(cfg)
77         require.NoError(t, err)
78         cl.Close()
79 }
80
81 func TestAddDropTorrent(t *testing.T) {
82         cl, err := NewClient(TestingConfig())
83         require.NoError(t, err)
84         defer cl.Close()
85         dir, mi := testutil.GreetingTestTorrent()
86         defer os.RemoveAll(dir)
87         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
88         require.NoError(t, err)
89         assert.True(t, new)
90         tt.SetMaxEstablishedConns(0)
91         tt.SetMaxEstablishedConns(1)
92         tt.Drop()
93 }
94
95 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
96         // TODO?
97         t.SkipNow()
98 }
99
100 func TestAddTorrentNoUsableURLs(t *testing.T) {
101         // TODO?
102         t.SkipNow()
103 }
104
105 func TestAddPeersToUnknownTorrent(t *testing.T) {
106         // TODO?
107         t.SkipNow()
108 }
109
110 func TestPieceHashSize(t *testing.T) {
111         assert.Equal(t, 20, pieceHash.Size())
112 }
113
114 func TestTorrentInitialState(t *testing.T) {
115         dir, mi := testutil.GreetingTestTorrent()
116         defer os.RemoveAll(dir)
117         cl := &Client{
118                 config: TestingConfig(),
119         }
120         cl.initLogger()
121         tor := cl.newTorrent(
122                 mi.HashInfoBytes(),
123                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
124         )
125         tor.setChunkSize(2)
126         tor.cl.lock()
127         err := tor.setInfoBytes(mi.InfoBytes)
128         tor.cl.unlock()
129         require.NoError(t, err)
130         require.Len(t, tor.pieces, 3)
131         tor.pendAllChunkSpecs(0)
132         tor.cl.lock()
133         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
134         tor.cl.unlock()
135         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
136 }
137
138 func TestReducedDialTimeout(t *testing.T) {
139         cfg := NewDefaultClientConfig()
140         for _, _case := range []struct {
141                 Max             time.Duration
142                 HalfOpenLimit   int
143                 PendingPeers    int
144                 ExpectedReduced time.Duration
145         }{
146                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
147                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
148                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
149                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
150                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
151                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
152         } {
153                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
154                 expected := _case.ExpectedReduced
155                 if expected < cfg.MinDialTimeout {
156                         expected = cfg.MinDialTimeout
157                 }
158                 if reduced != expected {
159                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
160                 }
161         }
162 }
163
164 func TestAddDropManyTorrents(t *testing.T) {
165         cl, err := NewClient(TestingConfig())
166         require.NoError(t, err)
167         defer cl.Close()
168         for i := range iter.N(1000) {
169                 var spec TorrentSpec
170                 binary.PutVarint(spec.InfoHash[:], int64(i))
171                 tt, new, err := cl.AddTorrentSpec(&spec)
172                 assert.NoError(t, err)
173                 assert.True(t, new)
174                 defer tt.Drop()
175         }
176 }
177
178 type FileCacheClientStorageFactoryParams struct {
179         Capacity    int64
180         SetCapacity bool
181         Wrapper     func(*filecache.Cache) storage.ClientImpl
182 }
183
184 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
185         return func(dataDir string) storage.ClientImpl {
186                 fc, err := filecache.NewCache(dataDir)
187                 if err != nil {
188                         panic(err)
189                 }
190                 if ps.SetCapacity {
191                         fc.SetCapacity(ps.Capacity)
192                 }
193                 return ps.Wrapper(fc)
194         }
195 }
196
197 type storageFactory func(string) storage.ClientImpl
198
199 func TestClientTransferDefault(t *testing.T) {
200         testClientTransfer(t, testClientTransferParams{
201                 ExportClientStatus: true,
202                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
203                         Wrapper: fileCachePieceResourceStorage,
204                 }),
205         })
206 }
207
208 func TestClientTransferRateLimitedUpload(t *testing.T) {
209         started := time.Now()
210         testClientTransfer(t, testClientTransferParams{
211                 // We are uploading 13 bytes (the length of the greeting torrent). The
212                 // chunks are 2 bytes in length. Then the smallest burst we can run
213                 // with is 2. Time taken is (13-burst)/rate.
214                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
215                 ExportClientStatus:      true,
216         })
217         require.True(t, time.Since(started) > time.Second)
218 }
219
220 func TestClientTransferRateLimitedDownload(t *testing.T) {
221         testClientTransfer(t, testClientTransferParams{
222                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
223         })
224 }
225
226 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
227         return storage.NewResourcePieces(fc.AsResourceProvider())
228 }
229
230 func TestClientTransferSmallCache(t *testing.T) {
231         testClientTransfer(t, testClientTransferParams{
232                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
233                         SetCapacity: true,
234                         // Going below the piece length means it can't complete a piece so
235                         // that it can be hashed.
236                         Capacity: 5,
237                         Wrapper:  fileCachePieceResourceStorage,
238                 }),
239                 SetReadahead: true,
240                 // Can't readahead too far or the cache will thrash and drop data we
241                 // thought we had.
242                 Readahead:          0,
243                 ExportClientStatus: true,
244         })
245 }
246
247 func TestClientTransferVarious(t *testing.T) {
248         // Leecher storage
249         for _, ls := range []storageFactory{
250                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
251                         Wrapper: fileCachePieceResourceStorage,
252                 }),
253                 storage.NewBoltDB,
254         } {
255                 // Seeder storage
256                 for _, ss := range []func(string) storage.ClientImpl{
257                         storage.NewFile,
258                         storage.NewMMap,
259                 } {
260                         for _, responsive := range []bool{false, true} {
261                                 testClientTransfer(t, testClientTransferParams{
262                                         Responsive:     responsive,
263                                         SeederStorage:  ss,
264                                         LeecherStorage: ls,
265                                 })
266                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
267                                         testClientTransfer(t, testClientTransferParams{
268                                                 SeederStorage:  ss,
269                                                 Responsive:     responsive,
270                                                 SetReadahead:   true,
271                                                 Readahead:      readahead,
272                                                 LeecherStorage: ls,
273                                         })
274                                 }
275                         }
276                 }
277         }
278 }
279
280 type testClientTransferParams struct {
281         Responsive                 bool
282         Readahead                  int64
283         SetReadahead               bool
284         ExportClientStatus         bool
285         LeecherStorage             func(string) storage.ClientImpl
286         SeederStorage              func(string) storage.ClientImpl
287         SeederUploadRateLimiter    *rate.Limiter
288         LeecherDownloadRateLimiter *rate.Limiter
289 }
290
291 // Creates a seeder and a leecher, and ensures the data transfers when a read
292 // is attempted on the leecher.
293 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
294         greetingTempDir, mi := testutil.GreetingTestTorrent()
295         defer os.RemoveAll(greetingTempDir)
296         // Create seeder and a Torrent.
297         cfg := TestingConfig()
298         cfg.Seed = true
299         if ps.SeederUploadRateLimiter != nil {
300                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
301         }
302         // cfg.ListenAddr = "localhost:4000"
303         if ps.SeederStorage != nil {
304                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
305                 defer cfg.DefaultStorage.Close()
306         } else {
307                 cfg.DataDir = greetingTempDir
308         }
309         seeder, err := NewClient(cfg)
310         require.NoError(t, err)
311         if ps.ExportClientStatus {
312                 defer testutil.ExportStatusWriter(seeder, "s")()
313         }
314         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
315         // Run a Stats right after Closing the Client. This will trigger the Stats
316         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
317         defer seederTorrent.Stats()
318         defer seeder.Close()
319         seederTorrent.VerifyData()
320         // Create leecher and a Torrent.
321         leecherDataDir, err := ioutil.TempDir("", "")
322         require.NoError(t, err)
323         defer os.RemoveAll(leecherDataDir)
324         cfg = TestingConfig()
325         if ps.LeecherStorage == nil {
326                 cfg.DataDir = leecherDataDir
327         } else {
328                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
329         }
330         if ps.LeecherDownloadRateLimiter != nil {
331                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
332         }
333         cfg.Seed = false
334         leecher, err := NewClient(cfg)
335         require.NoError(t, err)
336         defer leecher.Close()
337         if ps.ExportClientStatus {
338                 defer testutil.ExportStatusWriter(leecher, "l")()
339         }
340         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
341                 ret = TorrentSpecFromMetaInfo(mi)
342                 ret.ChunkSize = 2
343                 return
344         }())
345         require.NoError(t, err)
346         assert.True(t, new)
347         // Now do some things with leecher and seeder.
348         leecherTorrent.AddClientPeer(seeder)
349         // The Torrent should not be interested in obtaining peers, so the one we
350         // just added should be the only one.
351         assert.False(t, leecherTorrent.Seeding())
352         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
353         r := leecherTorrent.NewReader()
354         defer r.Close()
355         if ps.Responsive {
356                 r.SetResponsive()
357         }
358         if ps.SetReadahead {
359                 r.SetReadahead(ps.Readahead)
360         }
361         assertReadAllGreeting(t, r)
362
363         seederStats := seederTorrent.Stats()
364         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
365         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
366
367         leecherStats := leecherTorrent.Stats()
368         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
369         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
370
371         // Try reading through again for the cases where the torrent data size
372         // exceeds the size of the cache.
373         assertReadAllGreeting(t, r)
374 }
375
376 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
377         pos, err := r.Seek(0, io.SeekStart)
378         assert.NoError(t, err)
379         assert.EqualValues(t, 0, pos)
380         _greeting, err := ioutil.ReadAll(r)
381         assert.NoError(t, err)
382         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
383 }
384
385 // Check that after completing leeching, a leecher transitions to a seeding
386 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
387 func TestSeedAfterDownloading(t *testing.T) {
388         greetingTempDir, mi := testutil.GreetingTestTorrent()
389         defer os.RemoveAll(greetingTempDir)
390
391         cfg := TestingConfig()
392         cfg.Seed = true
393         cfg.DataDir = greetingTempDir
394         seeder, err := NewClient(cfg)
395         require.NoError(t, err)
396         defer seeder.Close()
397         defer testutil.ExportStatusWriter(seeder, "s")()
398         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
399         require.NoError(t, err)
400         assert.True(t, ok)
401         seederTorrent.VerifyData()
402
403         cfg = TestingConfig()
404         cfg.Seed = true
405         cfg.DataDir, err = ioutil.TempDir("", "")
406         require.NoError(t, err)
407         defer os.RemoveAll(cfg.DataDir)
408         leecher, err := NewClient(cfg)
409         require.NoError(t, err)
410         defer leecher.Close()
411         defer testutil.ExportStatusWriter(leecher, "l")()
412
413         cfg = TestingConfig()
414         cfg.Seed = false
415         cfg.DataDir, err = ioutil.TempDir("", "")
416         require.NoError(t, err)
417         defer os.RemoveAll(cfg.DataDir)
418         leecherLeecher, _ := NewClient(cfg)
419         require.NoError(t, err)
420         defer leecherLeecher.Close()
421         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
422         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
423                 ret = TorrentSpecFromMetaInfo(mi)
424                 ret.ChunkSize = 2
425                 return
426         }())
427         require.NoError(t, err)
428         assert.True(t, ok)
429         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
430                 ret = TorrentSpecFromMetaInfo(mi)
431                 ret.ChunkSize = 3
432                 return
433         }())
434         require.NoError(t, err)
435         assert.True(t, ok)
436         // Simultaneously DownloadAll in Leecher, and read the contents
437         // consecutively in LeecherLeecher. This non-deterministically triggered a
438         // case where the leecher wouldn't unchoke the LeecherLeecher.
439         var wg sync.WaitGroup
440         wg.Add(1)
441         go func() {
442                 defer wg.Done()
443                 r := llg.NewReader()
444                 defer r.Close()
445                 b, err := ioutil.ReadAll(r)
446                 require.NoError(t, err)
447                 assert.EqualValues(t, testutil.GreetingFileContents, b)
448         }()
449         done := make(chan struct{})
450         defer close(done)
451         go leecherGreeting.AddClientPeer(seeder)
452         go leecherGreeting.AddClientPeer(leecherLeecher)
453         wg.Add(1)
454         go func() {
455                 defer wg.Done()
456                 leecherGreeting.DownloadAll()
457                 leecher.WaitAll()
458         }()
459         wg.Wait()
460 }
461
462 func TestMergingTrackersByAddingSpecs(t *testing.T) {
463         cl, err := NewClient(TestingConfig())
464         require.NoError(t, err)
465         defer cl.Close()
466         spec := TorrentSpec{}
467         T, new, _ := cl.AddTorrentSpec(&spec)
468         if !new {
469                 t.FailNow()
470         }
471         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
472         _, new, _ = cl.AddTorrentSpec(&spec)
473         assert.False(t, new)
474         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
475         // Because trackers are disabled in TestingConfig.
476         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
477 }
478
479 // We read from a piece which is marked completed, but is missing data.
480 func TestCompletedPieceWrongSize(t *testing.T) {
481         cfg := TestingConfig()
482         cfg.DefaultStorage = badStorage{}
483         cl, err := NewClient(cfg)
484         require.NoError(t, err)
485         defer cl.Close()
486         info := metainfo.Info{
487                 PieceLength: 15,
488                 Pieces:      make([]byte, 20),
489                 Files: []metainfo.FileInfo{
490                         {Path: []string{"greeting"}, Length: 13},
491                 },
492         }
493         b, err := bencode.Marshal(info)
494         require.NoError(t, err)
495         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
496                 InfoBytes: b,
497                 InfoHash:  metainfo.HashBytes(b),
498         })
499         require.NoError(t, err)
500         defer tt.Drop()
501         assert.True(t, new)
502         r := tt.NewReader()
503         defer r.Close()
504         b, err = ioutil.ReadAll(r)
505         assert.Len(t, b, 13)
506         assert.NoError(t, err)
507 }
508
509 func BenchmarkAddLargeTorrent(b *testing.B) {
510         cfg := TestingConfig()
511         cfg.DisableTCP = true
512         cfg.DisableUTP = true
513         cl, err := NewClient(cfg)
514         require.NoError(b, err)
515         defer cl.Close()
516         b.ReportAllocs()
517         for range iter.N(b.N) {
518                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
519                 if err != nil {
520                         b.Fatal(err)
521                 }
522                 t.Drop()
523         }
524 }
525
526 func TestResponsive(t *testing.T) {
527         seederDataDir, mi := testutil.GreetingTestTorrent()
528         defer os.RemoveAll(seederDataDir)
529         cfg := TestingConfig()
530         cfg.Seed = true
531         cfg.DataDir = seederDataDir
532         seeder, err := NewClient(cfg)
533         require.Nil(t, err)
534         defer seeder.Close()
535         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
536         seederTorrent.VerifyData()
537         leecherDataDir, err := ioutil.TempDir("", "")
538         require.Nil(t, err)
539         defer os.RemoveAll(leecherDataDir)
540         cfg = TestingConfig()
541         cfg.DataDir = leecherDataDir
542         leecher, err := NewClient(cfg)
543         require.Nil(t, err)
544         defer leecher.Close()
545         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
546                 ret = TorrentSpecFromMetaInfo(mi)
547                 ret.ChunkSize = 2
548                 return
549         }())
550         leecherTorrent.AddClientPeer(seeder)
551         reader := leecherTorrent.NewReader()
552         defer reader.Close()
553         reader.SetReadahead(0)
554         reader.SetResponsive()
555         b := make([]byte, 2)
556         _, err = reader.Seek(3, io.SeekStart)
557         require.NoError(t, err)
558         _, err = io.ReadFull(reader, b)
559         assert.Nil(t, err)
560         assert.EqualValues(t, "lo", string(b))
561         _, err = reader.Seek(11, io.SeekStart)
562         require.NoError(t, err)
563         n, err := io.ReadFull(reader, b)
564         assert.Nil(t, err)
565         assert.EqualValues(t, 2, n)
566         assert.EqualValues(t, "d\n", string(b))
567 }
568
569 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
570         seederDataDir, mi := testutil.GreetingTestTorrent()
571         defer os.RemoveAll(seederDataDir)
572         cfg := TestingConfig()
573         cfg.Seed = true
574         cfg.DataDir = seederDataDir
575         seeder, err := NewClient(cfg)
576         require.Nil(t, err)
577         defer seeder.Close()
578         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
579         seederTorrent.VerifyData()
580         leecherDataDir, err := ioutil.TempDir("", "")
581         require.Nil(t, err)
582         defer os.RemoveAll(leecherDataDir)
583         cfg = TestingConfig()
584         cfg.DataDir = leecherDataDir
585         leecher, err := NewClient(cfg)
586         require.Nil(t, err)
587         defer leecher.Close()
588         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
589                 ret = TorrentSpecFromMetaInfo(mi)
590                 ret.ChunkSize = 2
591                 return
592         }())
593         leecherTorrent.AddClientPeer(seeder)
594         reader := leecherTorrent.NewReader()
595         defer reader.Close()
596         reader.SetReadahead(0)
597         reader.SetResponsive()
598         b := make([]byte, 2)
599         _, err = reader.Seek(3, io.SeekStart)
600         require.NoError(t, err)
601         _, err = io.ReadFull(reader, b)
602         assert.Nil(t, err)
603         assert.EqualValues(t, "lo", string(b))
604         go leecherTorrent.Drop()
605         _, err = reader.Seek(11, io.SeekStart)
606         require.NoError(t, err)
607         n, err := reader.Read(b)
608         assert.EqualError(t, err, "torrent closed")
609         assert.EqualValues(t, 0, n)
610 }
611
612 func TestDHTInheritBlocklist(t *testing.T) {
613         ipl := iplist.New(nil)
614         require.NotNil(t, ipl)
615         cfg := TestingConfig()
616         cfg.IPBlocklist = ipl
617         cfg.NoDHT = false
618         cl, err := NewClient(cfg)
619         require.NoError(t, err)
620         defer cl.Close()
621         numServers := 0
622         cl.eachDhtServer(func(s *dht.Server) {
623                 assert.Equal(t, ipl, s.IPBlocklist())
624                 numServers++
625         })
626         assert.EqualValues(t, 2, numServers)
627 }
628
629 // Check that stuff is merged in subsequent AddTorrentSpec for the same
630 // infohash.
631 func TestAddTorrentSpecMerging(t *testing.T) {
632         cl, err := NewClient(TestingConfig())
633         require.NoError(t, err)
634         defer cl.Close()
635         dir, mi := testutil.GreetingTestTorrent()
636         defer os.RemoveAll(dir)
637         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
638                 InfoHash: mi.HashInfoBytes(),
639         })
640         require.NoError(t, err)
641         require.True(t, new)
642         require.Nil(t, tt.Info())
643         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
644         require.NoError(t, err)
645         require.False(t, new)
646         require.NotNil(t, tt.Info())
647 }
648
649 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
650         dir, mi := testutil.GreetingTestTorrent()
651         os.RemoveAll(dir)
652         cl, _ := NewClient(TestingConfig())
653         defer cl.Close()
654         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
655                 InfoHash: mi.HashInfoBytes(),
656         })
657         tt.Drop()
658         assert.EqualValues(t, 0, len(cl.Torrents()))
659         select {
660         case <-tt.GotInfo():
661                 t.FailNow()
662         default:
663         }
664 }
665
666 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
667         for i := range iter.N(info.NumPieces()) {
668                 p := info.Piece(i)
669                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
670         }
671 }
672
673 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
674         fileCacheDir, err := ioutil.TempDir("", "")
675         require.NoError(t, err)
676         defer os.RemoveAll(fileCacheDir)
677         fileCache, err := filecache.NewCache(fileCacheDir)
678         require.NoError(t, err)
679         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
680         defer os.RemoveAll(greetingDataTempDir)
681         filePieceStore := csf(fileCache)
682         defer filePieceStore.Close()
683         info, err := greetingMetainfo.UnmarshalInfo()
684         require.NoError(t, err)
685         ih := greetingMetainfo.HashInfoBytes()
686         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
687         require.NoError(t, err)
688         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
689         // require.Equal(t, len(testutil.GreetingFileContents), written)
690         // require.NoError(t, err)
691         for i := 0; i < info.NumPieces(); i++ {
692                 p := info.Piece(i)
693                 if alreadyCompleted {
694                         require.NoError(t, greetingData.Piece(p).MarkComplete())
695                 }
696         }
697         cfg := TestingConfig()
698         // TODO: Disable network option?
699         cfg.DisableTCP = true
700         cfg.DisableUTP = true
701         cfg.DefaultStorage = filePieceStore
702         cl, err := NewClient(cfg)
703         require.NoError(t, err)
704         defer cl.Close()
705         tt, err := cl.AddTorrent(greetingMetainfo)
706         require.NoError(t, err)
707         psrs := tt.PieceStateRuns()
708         assert.Len(t, psrs, 1)
709         assert.EqualValues(t, 3, psrs[0].Length)
710         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
711         if alreadyCompleted {
712                 r := tt.NewReader()
713                 b, err := ioutil.ReadAll(r)
714                 assert.NoError(t, err)
715                 assert.EqualValues(t, testutil.GreetingFileContents, b)
716         }
717 }
718
719 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
720         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
721 }
722
723 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
724         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
725 }
726
727 func TestAddMetainfoWithNodes(t *testing.T) {
728         cfg := TestingConfig()
729         cfg.ListenHost = func(string) string { return "" }
730         cfg.NoDHT = false
731         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
732         // For now, we want to just jam the nodes into the table, without
733         // verifying them first. Also the DHT code doesn't support mixing secure
734         // and insecure nodes if security is enabled (yet).
735         // cfg.DHTConfig.NoSecurity = true
736         cl, err := NewClient(cfg)
737         require.NoError(t, err)
738         defer cl.Close()
739         sum := func() (ret int64) {
740                 cl.eachDhtServer(func(s *dht.Server) {
741                         ret += s.Stats().OutboundQueriesAttempted
742                 })
743                 return
744         }
745         assert.EqualValues(t, 0, sum())
746         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
747         require.NoError(t, err)
748         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
749         // check if the announce-list is here instead. TODO: Add nodes.
750         assert.Len(t, tt.metainfo.AnnounceList, 5)
751         // There are 6 nodes in the torrent file.
752         for sum() != int64(6*len(cl.dhtServers)) {
753                 time.Sleep(time.Millisecond)
754         }
755 }
756
757 type testDownloadCancelParams struct {
758         SetLeecherStorageCapacity bool
759         LeecherStorageCapacity    int64
760         Cancel                    bool
761 }
762
763 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
764         greetingTempDir, mi := testutil.GreetingTestTorrent()
765         defer os.RemoveAll(greetingTempDir)
766         cfg := TestingConfig()
767         cfg.Seed = true
768         cfg.DataDir = greetingTempDir
769         seeder, err := NewClient(cfg)
770         require.NoError(t, err)
771         defer seeder.Close()
772         defer testutil.ExportStatusWriter(seeder, "s")()
773         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
774         seederTorrent.VerifyData()
775         leecherDataDir, err := ioutil.TempDir("", "")
776         require.NoError(t, err)
777         defer os.RemoveAll(leecherDataDir)
778         fc, err := filecache.NewCache(leecherDataDir)
779         require.NoError(t, err)
780         if ps.SetLeecherStorageCapacity {
781                 fc.SetCapacity(ps.LeecherStorageCapacity)
782         }
783         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
784         cfg.DataDir = leecherDataDir
785         leecher, err := NewClient(cfg)
786         require.NoError(t, err)
787         defer leecher.Close()
788         defer testutil.ExportStatusWriter(leecher, "l")()
789         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
790                 ret = TorrentSpecFromMetaInfo(mi)
791                 ret.ChunkSize = 2
792                 return
793         }())
794         require.NoError(t, err)
795         assert.True(t, new)
796         psc := leecherGreeting.SubscribePieceStateChanges()
797         defer psc.Close()
798
799         leecherGreeting.cl.lock()
800         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
801         if ps.Cancel {
802                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
803         }
804         leecherGreeting.cl.unlock()
805         done := make(chan struct{})
806         defer close(done)
807         go leecherGreeting.AddClientPeer(seeder)
808         completes := make(map[int]bool, 3)
809         expected := func() map[int]bool {
810                 if ps.Cancel {
811                         return map[int]bool{0: false, 1: false, 2: false}
812                 } else {
813                         return map[int]bool{0: true, 1: true, 2: true}
814                 }
815         }()
816         for !reflect.DeepEqual(completes, expected) {
817                 _v := <-psc.Values
818                 v := _v.(PieceStateChange)
819                 completes[v.Index] = v.Complete
820         }
821 }
822
823 func TestTorrentDownloadAll(t *testing.T) {
824         testDownloadCancel(t, testDownloadCancelParams{})
825 }
826
827 func TestTorrentDownloadAllThenCancel(t *testing.T) {
828         testDownloadCancel(t, testDownloadCancelParams{
829                 Cancel: true,
830         })
831 }
832
833 // Ensure that it's an error for a peer to send an invalid have message.
834 func TestPeerInvalidHave(t *testing.T) {
835         cl, err := NewClient(TestingConfig())
836         require.NoError(t, err)
837         defer cl.Close()
838         info := metainfo.Info{
839                 PieceLength: 1,
840                 Pieces:      make([]byte, 20),
841                 Files:       []metainfo.FileInfo{{Length: 1}},
842         }
843         infoBytes, err := bencode.Marshal(info)
844         require.NoError(t, err)
845         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
846                 InfoBytes: infoBytes,
847                 InfoHash:  metainfo.HashBytes(infoBytes),
848                 Storage:   badStorage{},
849         })
850         require.NoError(t, err)
851         assert.True(t, _new)
852         defer tt.Drop()
853         cn := &connection{
854                 t: tt,
855         }
856         assert.NoError(t, cn.peerSentHave(0))
857         assert.Error(t, cn.peerSentHave(1))
858 }
859
860 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
861         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
862         defer os.RemoveAll(greetingTempDir)
863         cfg := TestingConfig()
864         cfg.DataDir = greetingTempDir
865         seeder, err := NewClient(TestingConfig())
866         require.NoError(t, err)
867         seeder.AddTorrentSpec(&TorrentSpec{
868                 InfoBytes: greetingMetainfo.InfoBytes,
869         })
870 }
871
872 // Check that when the listen port is 0, all the protocols listened on have
873 // the same port, and it isn't zero.
874 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
875         cl, err := NewClient(TestingConfig())
876         require.NoError(t, err)
877         defer cl.Close()
878         port := cl.LocalPort()
879         assert.NotEqual(t, 0, port)
880         cl.eachListener(func(s socket) bool {
881                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
882                 return true
883         })
884 }
885
886 func TestClientDynamicListenTCPOnly(t *testing.T) {
887         cfg := TestingConfig()
888         cfg.DisableUTP = true
889         cfg.DisableTCP = false
890         cl, err := NewClient(cfg)
891         require.NoError(t, err)
892         defer cl.Close()
893         assert.NotEqual(t, 0, cl.LocalPort())
894 }
895
896 func TestClientDynamicListenUTPOnly(t *testing.T) {
897         cfg := TestingConfig()
898         cfg.DisableTCP = true
899         cfg.DisableUTP = false
900         cl, err := NewClient(cfg)
901         require.NoError(t, err)
902         defer cl.Close()
903         assert.NotEqual(t, 0, cl.LocalPort())
904 }
905
906 func totalConns(tts []*Torrent) (ret int) {
907         for _, tt := range tts {
908                 tt.cl.lock()
909                 ret += len(tt.conns)
910                 tt.cl.unlock()
911         }
912         return
913 }
914
915 func TestSetMaxEstablishedConn(t *testing.T) {
916         var tts []*Torrent
917         ih := testutil.GreetingMetaInfo().HashInfoBytes()
918         cfg := TestingConfig()
919         cfg.DisableAcceptRateLimiting = true
920         cfg.dropDuplicatePeerIds = true
921         for i := range iter.N(3) {
922                 cl, err := NewClient(cfg)
923                 require.NoError(t, err)
924                 defer cl.Close()
925                 tt, _ := cl.AddTorrentInfoHash(ih)
926                 tt.SetMaxEstablishedConns(2)
927                 defer testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))()
928                 tts = append(tts, tt)
929         }
930         addPeers := func() {
931                 for _, tt := range tts {
932                         for _, _tt := range tts {
933                                 // if tt != _tt {
934                                 tt.AddClientPeer(_tt.cl)
935                                 // }
936                         }
937                 }
938         }
939         waitTotalConns := func(num int) {
940                 for totalConns(tts) != num {
941                         addPeers()
942                         time.Sleep(time.Millisecond)
943                 }
944         }
945         addPeers()
946         waitTotalConns(6)
947         tts[0].SetMaxEstablishedConns(1)
948         waitTotalConns(4)
949         tts[0].SetMaxEstablishedConns(0)
950         waitTotalConns(2)
951         tts[0].SetMaxEstablishedConns(1)
952         addPeers()
953         waitTotalConns(4)
954         tts[0].SetMaxEstablishedConns(2)
955         addPeers()
956         waitTotalConns(6)
957 }
958
959 // Creates a file containing its own name as data. Make a metainfo from that, adds it to the given
960 // client, and returns a magnet link.
961 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
962         os.MkdirAll(dir, 0770)
963         file, err := os.Create(filepath.Join(dir, name))
964         require.NoError(t, err)
965         file.Write([]byte(name))
966         file.Close()
967         mi := metainfo.MetaInfo{}
968         mi.SetDefaults()
969         info := metainfo.Info{PieceLength: 256 * 1024}
970         err = info.BuildFromFilePath(filepath.Join(dir, name))
971         require.NoError(t, err)
972         mi.InfoBytes, err = bencode.Marshal(info)
973         require.NoError(t, err)
974         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
975         tr, err := cl.AddTorrent(&mi)
976         require.NoError(t, err)
977         require.True(t, tr.Seeding())
978         tr.VerifyData()
979         return magnet
980 }
981
982 // https://github.com/anacrolix/torrent/issues/114
983 func TestMultipleTorrentsWithEncryption(t *testing.T) {
984         testSeederLeecherPair(
985                 t,
986                 func(cfg *ClientConfig) {
987                         cfg.HeaderObfuscationPolicy.Preferred = true
988                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
989                 },
990                 func(cfg *ClientConfig) {
991                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
992                 },
993         )
994 }
995
996 // Test that the leecher can download a torrent in its entirety from the seeder. Note that the
997 // seeder config is done first.
998 func testSeederLeecherPair(t *testing.T, seeder func(*ClientConfig), leecher func(*ClientConfig)) {
999         cfg := TestingConfig()
1000         cfg.Seed = true
1001         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1002         os.Mkdir(cfg.DataDir, 0755)
1003         seeder(cfg)
1004         server, err := NewClient(cfg)
1005         require.NoError(t, err)
1006         defer server.Close()
1007         defer testutil.ExportStatusWriter(server, "s")()
1008         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1009         // Extra torrents are added to test the seeder having to match incoming obfuscated headers
1010         // against more than one torrent. See issue #114
1011         makeMagnet(t, server, cfg.DataDir, "test2")
1012         for i := 0; i < 100; i++ {
1013                 makeMagnet(t, server, cfg.DataDir, fmt.Sprintf("test%d", i+2))
1014         }
1015         cfg = TestingConfig()
1016         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1017         leecher(cfg)
1018         client, err := NewClient(cfg)
1019         require.NoError(t, err)
1020         defer client.Close()
1021         defer testutil.ExportStatusWriter(client, "c")()
1022         tr, err := client.AddMagnet(magnet1)
1023         require.NoError(t, err)
1024         tr.AddClientPeer(server)
1025         <-tr.GotInfo()
1026         tr.DownloadAll()
1027         client.WaitAll()
1028 }
1029
1030 // This appears to be the situation with the S3 BitTorrent client.
1031 func TestObfuscatedHeaderFallbackSeederDisallowsLeecherPrefers(t *testing.T) {
1032         // Leecher prefers obfuscation, but the seeder does not allow it.
1033         testSeederLeecherPair(
1034                 t,
1035                 func(cfg *ClientConfig) {
1036                         cfg.HeaderObfuscationPolicy.Preferred = false
1037                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1038                 },
1039                 func(cfg *ClientConfig) {
1040                         cfg.HeaderObfuscationPolicy.Preferred = true
1041                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1042                 },
1043         )
1044 }
1045
1046 func TestObfuscatedHeaderFallbackSeederRequiresLeecherPrefersNot(t *testing.T) {
1047         // Leecher prefers no obfuscation, but the seeder enforces it.
1048         testSeederLeecherPair(
1049                 t,
1050                 func(cfg *ClientConfig) {
1051                         cfg.HeaderObfuscationPolicy.Preferred = true
1052                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1053                 },
1054                 func(cfg *ClientConfig) {
1055                         cfg.HeaderObfuscationPolicy.Preferred = false
1056                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1057                 },
1058         )
1059 }
1060
1061 func TestClientAddressInUse(t *testing.T) {
1062         s, _ := NewUtpSocket("udp", ":50007", nil)
1063         if s != nil {
1064                 defer s.Close()
1065         }
1066         cfg := TestingConfig().SetListenAddr(":50007")
1067         cl, err := NewClient(cfg)
1068         require.Error(t, err)
1069         require.Nil(t, cl)
1070 }
1071
1072 func TestClientHasDhtServersWhenUtpDisabled(t *testing.T) {
1073         cc := TestingConfig()
1074         cc.DisableUTP = true
1075         cc.NoDHT = false
1076         cl, err := NewClient(cc)
1077         require.NoError(t, err)
1078         defer cl.Close()
1079         assert.NotEmpty(t, cl.DhtServers())
1080 }