]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Add default DHT servers when UTP disabled
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "os"
9         "path/filepath"
10         "reflect"
11         "sync"
12         "testing"
13         "time"
14
15         "github.com/anacrolix/dht"
16         _ "github.com/anacrolix/envpprof"
17         "github.com/anacrolix/missinggo"
18         "github.com/anacrolix/missinggo/filecache"
19         "github.com/anacrolix/torrent/bencode"
20         "github.com/anacrolix/torrent/internal/testutil"
21         "github.com/anacrolix/torrent/iplist"
22         "github.com/anacrolix/torrent/metainfo"
23         "github.com/anacrolix/torrent/storage"
24         "github.com/bradfitz/iter"
25         "github.com/stretchr/testify/assert"
26         "github.com/stretchr/testify/require"
27         "golang.org/x/time/rate"
28 )
29
30 func TestingConfig() *ClientConfig {
31         cfg := NewDefaultClientConfig()
32         cfg.ListenHost = LoopbackListenHost
33         cfg.NoDHT = true
34         cfg.DataDir = tempDir()
35         cfg.DisableTrackers = true
36         cfg.NoDefaultPortForwarding = true
37         cfg.DisableAcceptRateLimiting = true
38         return cfg
39 }
40
41 func TestClientDefault(t *testing.T) {
42         cl, err := NewClient(TestingConfig())
43         require.NoError(t, err)
44         cl.Close()
45 }
46
47 func TestClientNilConfig(t *testing.T) {
48         cl, err := NewClient(nil)
49         require.NoError(t, err)
50         cl.Close()
51 }
52
53 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
54         cfg := TestingConfig()
55         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
56         require.NoError(t, err)
57         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
58         defer ci.Close()
59         cfg.DefaultStorage = ci
60         cl, err := NewClient(cfg)
61         require.NoError(t, err)
62         cl.Close()
63         // And again, https://github.com/anacrolix/torrent/issues/158
64         cl, err = NewClient(cfg)
65         require.NoError(t, err)
66         cl.Close()
67 }
68
69 func TestAddDropTorrent(t *testing.T) {
70         cl, err := NewClient(TestingConfig())
71         require.NoError(t, err)
72         defer cl.Close()
73         dir, mi := testutil.GreetingTestTorrent()
74         defer os.RemoveAll(dir)
75         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
76         require.NoError(t, err)
77         assert.True(t, new)
78         tt.SetMaxEstablishedConns(0)
79         tt.SetMaxEstablishedConns(1)
80         tt.Drop()
81 }
82
83 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
84         // TODO?
85         t.SkipNow()
86 }
87
88 func TestAddTorrentNoUsableURLs(t *testing.T) {
89         // TODO?
90         t.SkipNow()
91 }
92
93 func TestAddPeersToUnknownTorrent(t *testing.T) {
94         // TODO?
95         t.SkipNow()
96 }
97
98 func TestPieceHashSize(t *testing.T) {
99         assert.Equal(t, 20, pieceHash.Size())
100 }
101
102 func TestTorrentInitialState(t *testing.T) {
103         dir, mi := testutil.GreetingTestTorrent()
104         defer os.RemoveAll(dir)
105         cl := &Client{
106                 config: &ClientConfig{},
107         }
108         cl.initLogger()
109         tor := cl.newTorrent(
110                 mi.HashInfoBytes(),
111                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
112         )
113         tor.setChunkSize(2)
114         tor.cl.lock()
115         err := tor.setInfoBytes(mi.InfoBytes)
116         tor.cl.unlock()
117         require.NoError(t, err)
118         require.Len(t, tor.pieces, 3)
119         tor.pendAllChunkSpecs(0)
120         tor.cl.lock()
121         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
122         tor.cl.unlock()
123         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
124 }
125
126 func TestReducedDialTimeout(t *testing.T) {
127         cfg := NewDefaultClientConfig()
128         for _, _case := range []struct {
129                 Max             time.Duration
130                 HalfOpenLimit   int
131                 PendingPeers    int
132                 ExpectedReduced time.Duration
133         }{
134                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
135                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
136                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
137                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
138                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
139                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
140         } {
141                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
142                 expected := _case.ExpectedReduced
143                 if expected < cfg.MinDialTimeout {
144                         expected = cfg.MinDialTimeout
145                 }
146                 if reduced != expected {
147                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
148                 }
149         }
150 }
151
152 func TestAddDropManyTorrents(t *testing.T) {
153         cl, err := NewClient(TestingConfig())
154         require.NoError(t, err)
155         defer cl.Close()
156         for i := range iter.N(1000) {
157                 var spec TorrentSpec
158                 binary.PutVarint(spec.InfoHash[:], int64(i))
159                 tt, new, err := cl.AddTorrentSpec(&spec)
160                 assert.NoError(t, err)
161                 assert.True(t, new)
162                 defer tt.Drop()
163         }
164 }
165
166 type FileCacheClientStorageFactoryParams struct {
167         Capacity    int64
168         SetCapacity bool
169         Wrapper     func(*filecache.Cache) storage.ClientImpl
170 }
171
172 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
173         return func(dataDir string) storage.ClientImpl {
174                 fc, err := filecache.NewCache(dataDir)
175                 if err != nil {
176                         panic(err)
177                 }
178                 if ps.SetCapacity {
179                         fc.SetCapacity(ps.Capacity)
180                 }
181                 return ps.Wrapper(fc)
182         }
183 }
184
185 type storageFactory func(string) storage.ClientImpl
186
187 func TestClientTransferDefault(t *testing.T) {
188         testClientTransfer(t, testClientTransferParams{
189                 ExportClientStatus: true,
190                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
191                         Wrapper: fileCachePieceResourceStorage,
192                 }),
193         })
194 }
195
196 func TestClientTransferRateLimitedUpload(t *testing.T) {
197         started := time.Now()
198         testClientTransfer(t, testClientTransferParams{
199                 // We are uploading 13 bytes (the length of the greeting torrent). The
200                 // chunks are 2 bytes in length. Then the smallest burst we can run
201                 // with is 2. Time taken is (13-burst)/rate.
202                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
203                 ExportClientStatus:      true,
204         })
205         require.True(t, time.Since(started) > time.Second)
206 }
207
208 func TestClientTransferRateLimitedDownload(t *testing.T) {
209         testClientTransfer(t, testClientTransferParams{
210                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
211         })
212 }
213
214 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
215         return storage.NewResourcePieces(fc.AsResourceProvider())
216 }
217
218 func TestClientTransferSmallCache(t *testing.T) {
219         testClientTransfer(t, testClientTransferParams{
220                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
221                         SetCapacity: true,
222                         // Going below the piece length means it can't complete a piece so
223                         // that it can be hashed.
224                         Capacity: 5,
225                         Wrapper:  fileCachePieceResourceStorage,
226                 }),
227                 SetReadahead: true,
228                 // Can't readahead too far or the cache will thrash and drop data we
229                 // thought we had.
230                 Readahead:          0,
231                 ExportClientStatus: true,
232         })
233 }
234
235 func TestClientTransferVarious(t *testing.T) {
236         // Leecher storage
237         for _, ls := range []storageFactory{
238                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
239                         Wrapper: fileCachePieceResourceStorage,
240                 }),
241                 storage.NewBoltDB,
242         } {
243                 // Seeder storage
244                 for _, ss := range []func(string) storage.ClientImpl{
245                         storage.NewFile,
246                         storage.NewMMap,
247                 } {
248                         for _, responsive := range []bool{false, true} {
249                                 testClientTransfer(t, testClientTransferParams{
250                                         Responsive:     responsive,
251                                         SeederStorage:  ss,
252                                         LeecherStorage: ls,
253                                 })
254                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
255                                         testClientTransfer(t, testClientTransferParams{
256                                                 SeederStorage:  ss,
257                                                 Responsive:     responsive,
258                                                 SetReadahead:   true,
259                                                 Readahead:      readahead,
260                                                 LeecherStorage: ls,
261                                         })
262                                 }
263                         }
264                 }
265         }
266 }
267
268 type testClientTransferParams struct {
269         Responsive                 bool
270         Readahead                  int64
271         SetReadahead               bool
272         ExportClientStatus         bool
273         LeecherStorage             func(string) storage.ClientImpl
274         SeederStorage              func(string) storage.ClientImpl
275         SeederUploadRateLimiter    *rate.Limiter
276         LeecherDownloadRateLimiter *rate.Limiter
277 }
278
279 // Creates a seeder and a leecher, and ensures the data transfers when a read
280 // is attempted on the leecher.
281 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
282         greetingTempDir, mi := testutil.GreetingTestTorrent()
283         defer os.RemoveAll(greetingTempDir)
284         // Create seeder and a Torrent.
285         cfg := TestingConfig()
286         cfg.Seed = true
287         if ps.SeederUploadRateLimiter != nil {
288                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
289         }
290         // cfg.ListenAddr = "localhost:4000"
291         if ps.SeederStorage != nil {
292                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
293                 defer cfg.DefaultStorage.Close()
294         } else {
295                 cfg.DataDir = greetingTempDir
296         }
297         seeder, err := NewClient(cfg)
298         require.NoError(t, err)
299         if ps.ExportClientStatus {
300                 defer testutil.ExportStatusWriter(seeder, "s")()
301         }
302         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
303         // Run a Stats right after Closing the Client. This will trigger the Stats
304         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
305         defer seederTorrent.Stats()
306         defer seeder.Close()
307         seederTorrent.VerifyData()
308         // Create leecher and a Torrent.
309         leecherDataDir, err := ioutil.TempDir("", "")
310         require.NoError(t, err)
311         defer os.RemoveAll(leecherDataDir)
312         cfg = TestingConfig()
313         if ps.LeecherStorage == nil {
314                 cfg.DataDir = leecherDataDir
315         } else {
316                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
317         }
318         if ps.LeecherDownloadRateLimiter != nil {
319                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
320         }
321         cfg.Seed = false
322         leecher, err := NewClient(cfg)
323         require.NoError(t, err)
324         defer leecher.Close()
325         if ps.ExportClientStatus {
326                 defer testutil.ExportStatusWriter(leecher, "l")()
327         }
328         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
329                 ret = TorrentSpecFromMetaInfo(mi)
330                 ret.ChunkSize = 2
331                 return
332         }())
333         require.NoError(t, err)
334         assert.True(t, new)
335         // Now do some things with leecher and seeder.
336         leecherTorrent.AddClientPeer(seeder)
337         // The Torrent should not be interested in obtaining peers, so the one we
338         // just added should be the only one.
339         assert.False(t, leecherTorrent.Seeding())
340         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
341         r := leecherTorrent.NewReader()
342         defer r.Close()
343         if ps.Responsive {
344                 r.SetResponsive()
345         }
346         if ps.SetReadahead {
347                 r.SetReadahead(ps.Readahead)
348         }
349         assertReadAllGreeting(t, r)
350
351         seederStats := seederTorrent.Stats()
352         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
353         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
354
355         leecherStats := leecherTorrent.Stats()
356         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
357         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
358
359         // Try reading through again for the cases where the torrent data size
360         // exceeds the size of the cache.
361         assertReadAllGreeting(t, r)
362 }
363
364 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
365         pos, err := r.Seek(0, io.SeekStart)
366         assert.NoError(t, err)
367         assert.EqualValues(t, 0, pos)
368         _greeting, err := ioutil.ReadAll(r)
369         assert.NoError(t, err)
370         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
371 }
372
373 // Check that after completing leeching, a leecher transitions to a seeding
374 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
375 func TestSeedAfterDownloading(t *testing.T) {
376         greetingTempDir, mi := testutil.GreetingTestTorrent()
377         defer os.RemoveAll(greetingTempDir)
378
379         cfg := TestingConfig()
380         cfg.Seed = true
381         cfg.DataDir = greetingTempDir
382         seeder, err := NewClient(cfg)
383         require.NoError(t, err)
384         defer seeder.Close()
385         defer testutil.ExportStatusWriter(seeder, "s")()
386         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
387         require.NoError(t, err)
388         assert.True(t, ok)
389         seederTorrent.VerifyData()
390
391         cfg = TestingConfig()
392         cfg.Seed = true
393         cfg.DataDir, err = ioutil.TempDir("", "")
394         require.NoError(t, err)
395         defer os.RemoveAll(cfg.DataDir)
396         leecher, err := NewClient(cfg)
397         require.NoError(t, err)
398         defer leecher.Close()
399         defer testutil.ExportStatusWriter(leecher, "l")()
400
401         cfg = TestingConfig()
402         cfg.Seed = false
403         cfg.DataDir, err = ioutil.TempDir("", "")
404         require.NoError(t, err)
405         defer os.RemoveAll(cfg.DataDir)
406         leecherLeecher, _ := NewClient(cfg)
407         require.NoError(t, err)
408         defer leecherLeecher.Close()
409         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
410         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
411                 ret = TorrentSpecFromMetaInfo(mi)
412                 ret.ChunkSize = 2
413                 return
414         }())
415         require.NoError(t, err)
416         assert.True(t, ok)
417         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
418                 ret = TorrentSpecFromMetaInfo(mi)
419                 ret.ChunkSize = 3
420                 return
421         }())
422         require.NoError(t, err)
423         assert.True(t, ok)
424         // Simultaneously DownloadAll in Leecher, and read the contents
425         // consecutively in LeecherLeecher. This non-deterministically triggered a
426         // case where the leecher wouldn't unchoke the LeecherLeecher.
427         var wg sync.WaitGroup
428         wg.Add(1)
429         go func() {
430                 defer wg.Done()
431                 r := llg.NewReader()
432                 defer r.Close()
433                 b, err := ioutil.ReadAll(r)
434                 require.NoError(t, err)
435                 assert.EqualValues(t, testutil.GreetingFileContents, b)
436         }()
437         done := make(chan struct{})
438         defer close(done)
439         go leecherGreeting.AddClientPeer(seeder)
440         go leecherGreeting.AddClientPeer(leecherLeecher)
441         wg.Add(1)
442         go func() {
443                 defer wg.Done()
444                 leecherGreeting.DownloadAll()
445                 leecher.WaitAll()
446         }()
447         wg.Wait()
448 }
449
450 func TestMergingTrackersByAddingSpecs(t *testing.T) {
451         cl, err := NewClient(TestingConfig())
452         require.NoError(t, err)
453         defer cl.Close()
454         spec := TorrentSpec{}
455         T, new, _ := cl.AddTorrentSpec(&spec)
456         if !new {
457                 t.FailNow()
458         }
459         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
460         _, new, _ = cl.AddTorrentSpec(&spec)
461         assert.False(t, new)
462         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
463         // Because trackers are disabled in TestingConfig.
464         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
465 }
466
467 // We read from a piece which is marked completed, but is missing data.
468 func TestCompletedPieceWrongSize(t *testing.T) {
469         cfg := TestingConfig()
470         cfg.DefaultStorage = badStorage{}
471         cl, err := NewClient(cfg)
472         require.NoError(t, err)
473         defer cl.Close()
474         info := metainfo.Info{
475                 PieceLength: 15,
476                 Pieces:      make([]byte, 20),
477                 Files: []metainfo.FileInfo{
478                         {Path: []string{"greeting"}, Length: 13},
479                 },
480         }
481         b, err := bencode.Marshal(info)
482         require.NoError(t, err)
483         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
484                 InfoBytes: b,
485                 InfoHash:  metainfo.HashBytes(b),
486         })
487         require.NoError(t, err)
488         defer tt.Drop()
489         assert.True(t, new)
490         r := tt.NewReader()
491         defer r.Close()
492         b, err = ioutil.ReadAll(r)
493         assert.Len(t, b, 13)
494         assert.NoError(t, err)
495 }
496
497 func BenchmarkAddLargeTorrent(b *testing.B) {
498         cfg := TestingConfig()
499         cfg.DisableTCP = true
500         cfg.DisableUTP = true
501         cfg.ListenHost = func(string) string { return "redonk" }
502         cl, err := NewClient(cfg)
503         require.NoError(b, err)
504         defer cl.Close()
505         for range iter.N(b.N) {
506                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
507                 if err != nil {
508                         b.Fatal(err)
509                 }
510                 t.Drop()
511         }
512 }
513
514 func TestResponsive(t *testing.T) {
515         seederDataDir, mi := testutil.GreetingTestTorrent()
516         defer os.RemoveAll(seederDataDir)
517         cfg := TestingConfig()
518         cfg.Seed = true
519         cfg.DataDir = seederDataDir
520         seeder, err := NewClient(cfg)
521         require.Nil(t, err)
522         defer seeder.Close()
523         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
524         seederTorrent.VerifyData()
525         leecherDataDir, err := ioutil.TempDir("", "")
526         require.Nil(t, err)
527         defer os.RemoveAll(leecherDataDir)
528         cfg = TestingConfig()
529         cfg.DataDir = leecherDataDir
530         leecher, err := NewClient(cfg)
531         require.Nil(t, err)
532         defer leecher.Close()
533         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
534                 ret = TorrentSpecFromMetaInfo(mi)
535                 ret.ChunkSize = 2
536                 return
537         }())
538         leecherTorrent.AddClientPeer(seeder)
539         reader := leecherTorrent.NewReader()
540         defer reader.Close()
541         reader.SetReadahead(0)
542         reader.SetResponsive()
543         b := make([]byte, 2)
544         _, err = reader.Seek(3, io.SeekStart)
545         require.NoError(t, err)
546         _, err = io.ReadFull(reader, b)
547         assert.Nil(t, err)
548         assert.EqualValues(t, "lo", string(b))
549         _, err = reader.Seek(11, io.SeekStart)
550         require.NoError(t, err)
551         n, err := io.ReadFull(reader, b)
552         assert.Nil(t, err)
553         assert.EqualValues(t, 2, n)
554         assert.EqualValues(t, "d\n", string(b))
555 }
556
557 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
558         seederDataDir, mi := testutil.GreetingTestTorrent()
559         defer os.RemoveAll(seederDataDir)
560         cfg := TestingConfig()
561         cfg.Seed = true
562         cfg.DataDir = seederDataDir
563         seeder, err := NewClient(cfg)
564         require.Nil(t, err)
565         defer seeder.Close()
566         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
567         seederTorrent.VerifyData()
568         leecherDataDir, err := ioutil.TempDir("", "")
569         require.Nil(t, err)
570         defer os.RemoveAll(leecherDataDir)
571         cfg = TestingConfig()
572         cfg.DataDir = leecherDataDir
573         leecher, err := NewClient(cfg)
574         require.Nil(t, err)
575         defer leecher.Close()
576         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
577                 ret = TorrentSpecFromMetaInfo(mi)
578                 ret.ChunkSize = 2
579                 return
580         }())
581         leecherTorrent.AddClientPeer(seeder)
582         reader := leecherTorrent.NewReader()
583         defer reader.Close()
584         reader.SetReadahead(0)
585         reader.SetResponsive()
586         b := make([]byte, 2)
587         _, err = reader.Seek(3, io.SeekStart)
588         require.NoError(t, err)
589         _, err = io.ReadFull(reader, b)
590         assert.Nil(t, err)
591         assert.EqualValues(t, "lo", string(b))
592         go leecherTorrent.Drop()
593         _, err = reader.Seek(11, io.SeekStart)
594         require.NoError(t, err)
595         n, err := reader.Read(b)
596         assert.EqualError(t, err, "torrent closed")
597         assert.EqualValues(t, 0, n)
598 }
599
600 func TestDHTInheritBlocklist(t *testing.T) {
601         ipl := iplist.New(nil)
602         require.NotNil(t, ipl)
603         cfg := TestingConfig()
604         cfg.IPBlocklist = ipl
605         cfg.NoDHT = false
606         cl, err := NewClient(cfg)
607         require.NoError(t, err)
608         defer cl.Close()
609         numServers := 0
610         cl.eachDhtServer(func(s *dht.Server) {
611                 assert.Equal(t, ipl, s.IPBlocklist())
612                 numServers++
613         })
614         assert.EqualValues(t, 2, numServers)
615 }
616
617 // Check that stuff is merged in subsequent AddTorrentSpec for the same
618 // infohash.
619 func TestAddTorrentSpecMerging(t *testing.T) {
620         cl, err := NewClient(TestingConfig())
621         require.NoError(t, err)
622         defer cl.Close()
623         dir, mi := testutil.GreetingTestTorrent()
624         defer os.RemoveAll(dir)
625         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
626                 InfoHash: mi.HashInfoBytes(),
627         })
628         require.NoError(t, err)
629         require.True(t, new)
630         require.Nil(t, tt.Info())
631         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
632         require.NoError(t, err)
633         require.False(t, new)
634         require.NotNil(t, tt.Info())
635 }
636
637 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
638         dir, mi := testutil.GreetingTestTorrent()
639         os.RemoveAll(dir)
640         cl, _ := NewClient(TestingConfig())
641         defer cl.Close()
642         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
643                 InfoHash: mi.HashInfoBytes(),
644         })
645         tt.Drop()
646         assert.EqualValues(t, 0, len(cl.Torrents()))
647         select {
648         case <-tt.GotInfo():
649                 t.FailNow()
650         default:
651         }
652 }
653
654 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
655         for i := range iter.N(info.NumPieces()) {
656                 p := info.Piece(i)
657                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
658         }
659 }
660
661 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
662         fileCacheDir, err := ioutil.TempDir("", "")
663         require.NoError(t, err)
664         defer os.RemoveAll(fileCacheDir)
665         fileCache, err := filecache.NewCache(fileCacheDir)
666         require.NoError(t, err)
667         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
668         defer os.RemoveAll(greetingDataTempDir)
669         filePieceStore := csf(fileCache)
670         defer filePieceStore.Close()
671         info, err := greetingMetainfo.UnmarshalInfo()
672         require.NoError(t, err)
673         ih := greetingMetainfo.HashInfoBytes()
674         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
675         require.NoError(t, err)
676         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
677         // require.Equal(t, len(testutil.GreetingFileContents), written)
678         // require.NoError(t, err)
679         for i := 0; i < info.NumPieces(); i++ {
680                 p := info.Piece(i)
681                 if alreadyCompleted {
682                         require.NoError(t, greetingData.Piece(p).MarkComplete())
683                 }
684         }
685         cfg := TestingConfig()
686         // TODO: Disable network option?
687         cfg.DisableTCP = true
688         cfg.DisableUTP = true
689         cfg.DefaultStorage = filePieceStore
690         cl, err := NewClient(cfg)
691         require.NoError(t, err)
692         defer cl.Close()
693         tt, err := cl.AddTorrent(greetingMetainfo)
694         require.NoError(t, err)
695         psrs := tt.PieceStateRuns()
696         assert.Len(t, psrs, 1)
697         assert.EqualValues(t, 3, psrs[0].Length)
698         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
699         if alreadyCompleted {
700                 r := tt.NewReader()
701                 b, err := ioutil.ReadAll(r)
702                 assert.NoError(t, err)
703                 assert.EqualValues(t, testutil.GreetingFileContents, b)
704         }
705 }
706
707 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
708         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
709 }
710
711 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
712         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
713 }
714
715 func TestAddMetainfoWithNodes(t *testing.T) {
716         cfg := TestingConfig()
717         cfg.ListenHost = func(string) string { return "" }
718         cfg.NoDHT = false
719         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
720         // For now, we want to just jam the nodes into the table, without
721         // verifying them first. Also the DHT code doesn't support mixing secure
722         // and insecure nodes if security is enabled (yet).
723         // cfg.DHTConfig.NoSecurity = true
724         cl, err := NewClient(cfg)
725         require.NoError(t, err)
726         defer cl.Close()
727         sum := func() (ret int64) {
728                 cl.eachDhtServer(func(s *dht.Server) {
729                         ret += s.Stats().OutboundQueriesAttempted
730                 })
731                 return
732         }
733         assert.EqualValues(t, 0, sum())
734         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
735         require.NoError(t, err)
736         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
737         // check if the announce-list is here instead. TODO: Add nodes.
738         assert.Len(t, tt.metainfo.AnnounceList, 5)
739         // There are 6 nodes in the torrent file.
740         assert.EqualValues(t, 6*len(cl.dhtServers), sum())
741 }
742
743 type testDownloadCancelParams struct {
744         SetLeecherStorageCapacity bool
745         LeecherStorageCapacity    int64
746         Cancel                    bool
747 }
748
749 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
750         greetingTempDir, mi := testutil.GreetingTestTorrent()
751         defer os.RemoveAll(greetingTempDir)
752         cfg := TestingConfig()
753         cfg.Seed = true
754         cfg.DataDir = greetingTempDir
755         seeder, err := NewClient(cfg)
756         require.NoError(t, err)
757         defer seeder.Close()
758         defer testutil.ExportStatusWriter(seeder, "s")()
759         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
760         seederTorrent.VerifyData()
761         leecherDataDir, err := ioutil.TempDir("", "")
762         require.NoError(t, err)
763         defer os.RemoveAll(leecherDataDir)
764         fc, err := filecache.NewCache(leecherDataDir)
765         require.NoError(t, err)
766         if ps.SetLeecherStorageCapacity {
767                 fc.SetCapacity(ps.LeecherStorageCapacity)
768         }
769         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
770         cfg.DataDir = leecherDataDir
771         leecher, _ := NewClient(cfg)
772         defer leecher.Close()
773         defer testutil.ExportStatusWriter(leecher, "l")()
774         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
775                 ret = TorrentSpecFromMetaInfo(mi)
776                 ret.ChunkSize = 2
777                 return
778         }())
779         require.NoError(t, err)
780         assert.True(t, new)
781         psc := leecherGreeting.SubscribePieceStateChanges()
782         defer psc.Close()
783
784         leecherGreeting.cl.lock()
785         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
786         if ps.Cancel {
787                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
788         }
789         leecherGreeting.cl.unlock()
790         done := make(chan struct{})
791         defer close(done)
792         go leecherGreeting.AddClientPeer(seeder)
793         completes := make(map[int]bool, 3)
794         expected := func() map[int]bool {
795                 if ps.Cancel {
796                         return map[int]bool{0: false, 1: false, 2: false}
797                 } else {
798                         return map[int]bool{0: true, 1: true, 2: true}
799                 }
800         }()
801         for !reflect.DeepEqual(completes, expected) {
802                 select {
803                 case _v := <-psc.Values:
804                         v := _v.(PieceStateChange)
805                         completes[v.Index] = v.Complete
806                 }
807         }
808 }
809
810 func TestTorrentDownloadAll(t *testing.T) {
811         testDownloadCancel(t, testDownloadCancelParams{})
812 }
813
814 func TestTorrentDownloadAllThenCancel(t *testing.T) {
815         testDownloadCancel(t, testDownloadCancelParams{
816                 Cancel: true,
817         })
818 }
819
820 // Ensure that it's an error for a peer to send an invalid have message.
821 func TestPeerInvalidHave(t *testing.T) {
822         cl, err := NewClient(TestingConfig())
823         require.NoError(t, err)
824         defer cl.Close()
825         info := metainfo.Info{
826                 PieceLength: 1,
827                 Pieces:      make([]byte, 20),
828                 Files:       []metainfo.FileInfo{{Length: 1}},
829         }
830         infoBytes, err := bencode.Marshal(info)
831         require.NoError(t, err)
832         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
833                 InfoBytes: infoBytes,
834                 InfoHash:  metainfo.HashBytes(infoBytes),
835                 Storage:   badStorage{},
836         })
837         require.NoError(t, err)
838         assert.True(t, _new)
839         defer tt.Drop()
840         cn := &connection{
841                 t: tt,
842         }
843         assert.NoError(t, cn.peerSentHave(0))
844         assert.Error(t, cn.peerSentHave(1))
845 }
846
847 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
848         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
849         defer os.RemoveAll(greetingTempDir)
850         cfg := TestingConfig()
851         cfg.DataDir = greetingTempDir
852         seeder, err := NewClient(TestingConfig())
853         require.NoError(t, err)
854         seeder.AddTorrentSpec(&TorrentSpec{
855                 InfoBytes: greetingMetainfo.InfoBytes,
856         })
857 }
858
859 // Check that when the listen port is 0, all the protocols listened on have
860 // the same port, and it isn't zero.
861 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
862         cl, err := NewClient(TestingConfig())
863         require.NoError(t, err)
864         defer cl.Close()
865         port := cl.LocalPort()
866         assert.NotEqual(t, 0, port)
867         cl.eachListener(func(s socket) bool {
868                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
869                 return true
870         })
871 }
872
873 func TestClientDynamicListenTCPOnly(t *testing.T) {
874         cfg := TestingConfig()
875         cfg.DisableUTP = true
876         cfg.DisableTCP = false
877         cl, err := NewClient(cfg)
878         require.NoError(t, err)
879         defer cl.Close()
880         assert.NotEqual(t, 0, cl.LocalPort())
881 }
882
883 func TestClientDynamicListenUTPOnly(t *testing.T) {
884         cfg := TestingConfig()
885         cfg.DisableTCP = true
886         cfg.DisableUTP = false
887         cl, err := NewClient(cfg)
888         require.NoError(t, err)
889         defer cl.Close()
890         assert.NotEqual(t, 0, cl.LocalPort())
891 }
892
893 func totalConns(tts []*Torrent) (ret int) {
894         for _, tt := range tts {
895                 tt.cl.lock()
896                 ret += len(tt.conns)
897                 tt.cl.unlock()
898         }
899         return
900 }
901
902 func TestSetMaxEstablishedConn(t *testing.T) {
903         var tts []*Torrent
904         ih := testutil.GreetingMetaInfo().HashInfoBytes()
905         cfg := TestingConfig()
906         cfg.DisableAcceptRateLimiting = true
907         cfg.dropDuplicatePeerIds = true
908         for i := range iter.N(3) {
909                 cl, err := NewClient(cfg)
910                 require.NoError(t, err)
911                 defer cl.Close()
912                 tt, _ := cl.AddTorrentInfoHash(ih)
913                 tt.SetMaxEstablishedConns(2)
914                 defer testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))()
915                 tts = append(tts, tt)
916         }
917         addPeers := func() {
918                 for _, tt := range tts {
919                         for _, _tt := range tts {
920                                 // if tt != _tt {
921                                 tt.AddClientPeer(_tt.cl)
922                                 // }
923                         }
924                 }
925         }
926         waitTotalConns := func(num int) {
927                 for totalConns(tts) != num {
928                         addPeers()
929                         time.Sleep(time.Millisecond)
930                 }
931         }
932         addPeers()
933         waitTotalConns(6)
934         tts[0].SetMaxEstablishedConns(1)
935         waitTotalConns(4)
936         tts[0].SetMaxEstablishedConns(0)
937         waitTotalConns(2)
938         tts[0].SetMaxEstablishedConns(1)
939         addPeers()
940         waitTotalConns(4)
941         tts[0].SetMaxEstablishedConns(2)
942         addPeers()
943         waitTotalConns(6)
944 }
945
946 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
947         os.MkdirAll(dir, 0770)
948         file, err := os.Create(filepath.Join(dir, name))
949         require.NoError(t, err)
950         file.Write([]byte(name))
951         file.Close()
952         mi := metainfo.MetaInfo{}
953         mi.SetDefaults()
954         info := metainfo.Info{PieceLength: 256 * 1024}
955         err = info.BuildFromFilePath(filepath.Join(dir, name))
956         require.NoError(t, err)
957         mi.InfoBytes, err = bencode.Marshal(info)
958         require.NoError(t, err)
959         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
960         tr, err := cl.AddTorrent(&mi)
961         require.NoError(t, err)
962         require.True(t, tr.Seeding())
963         tr.VerifyData()
964         return magnet
965 }
966
967 // https://github.com/anacrolix/torrent/issues/114
968 func TestMultipleTorrentsWithEncryption(t *testing.T) {
969         cfg := TestingConfig()
970         cfg.DisableUTP = true
971         cfg.Seed = true
972         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
973         cfg.ForceEncryption = true
974         os.Mkdir(cfg.DataDir, 0755)
975         server, err := NewClient(cfg)
976         require.NoError(t, err)
977         defer server.Close()
978         defer testutil.ExportStatusWriter(server, "s")()
979         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
980         makeMagnet(t, server, cfg.DataDir, "test2")
981         cfg = TestingConfig()
982         cfg.DisableUTP = true
983         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
984         cfg.ForceEncryption = true
985         client, err := NewClient(cfg)
986         require.NoError(t, err)
987         defer client.Close()
988         defer testutil.ExportStatusWriter(client, "c")()
989         tr, err := client.AddMagnet(magnet1)
990         require.NoError(t, err)
991         tr.AddClientPeer(server)
992         <-tr.GotInfo()
993         tr.DownloadAll()
994         client.WaitAll()
995 }
996
997 func TestClientAddressInUse(t *testing.T) {
998         s, _ := NewUtpSocket("udp", ":50007", nil)
999         if s != nil {
1000                 defer s.Close()
1001         }
1002         cfg := TestingConfig().SetListenAddr(":50007")
1003         cl, err := NewClient(cfg)
1004         require.Error(t, err)
1005         require.Nil(t, cl)
1006 }
1007
1008 func TestClientHasDhtServersWhenUtpDisabled(t *testing.T) {
1009         cc := TestingConfig()
1010         cc.DisableUTP = true
1011         cc.NoDHT = false
1012         cl, err := NewClient(cc)
1013         require.NoError(t, err)
1014         defer cl.Close()
1015         assert.NotEmpty(t, cl.DhtServers())
1016 }