]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Law of Demeter Client.mu
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "os"
9         "path/filepath"
10         "reflect"
11         "sync"
12         "testing"
13         "time"
14
15         "github.com/anacrolix/dht"
16         _ "github.com/anacrolix/envpprof"
17         "github.com/anacrolix/missinggo"
18         "github.com/anacrolix/missinggo/filecache"
19         "github.com/bradfitz/iter"
20         "github.com/stretchr/testify/assert"
21         "github.com/stretchr/testify/require"
22         "golang.org/x/time/rate"
23
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/internal/testutil"
26         "github.com/anacrolix/torrent/iplist"
27         "github.com/anacrolix/torrent/metainfo"
28         "github.com/anacrolix/torrent/storage"
29 )
30
31 func TestingConfig() *ClientConfig {
32         cfg := NewDefaultClientConfig()
33         cfg.ListenHost = LoopbackListenHost
34         cfg.NoDHT = true
35         cfg.DataDir = tempDir()
36         cfg.DisableTrackers = true
37         cfg.NoDefaultPortForwarding = true
38         cfg.DisableAcceptRateLimiting = true
39         return cfg
40 }
41
42 func TestClientDefault(t *testing.T) {
43         cl, err := NewClient(TestingConfig())
44         require.NoError(t, err)
45         cl.Close()
46 }
47
48 func TestClientNilConfig(t *testing.T) {
49         cl, err := NewClient(nil)
50         require.NoError(t, err)
51         cl.Close()
52 }
53
54 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
55         cfg := TestingConfig()
56         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
57         require.NoError(t, err)
58         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
59         defer ci.Close()
60         cfg.DefaultStorage = ci
61         cl, err := NewClient(cfg)
62         require.NoError(t, err)
63         cl.Close()
64         // And again, https://github.com/anacrolix/torrent/issues/158
65         cl, err = NewClient(cfg)
66         require.NoError(t, err)
67         cl.Close()
68 }
69
70 func TestAddDropTorrent(t *testing.T) {
71         cl, err := NewClient(TestingConfig())
72         require.NoError(t, err)
73         defer cl.Close()
74         dir, mi := testutil.GreetingTestTorrent()
75         defer os.RemoveAll(dir)
76         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
77         require.NoError(t, err)
78         assert.True(t, new)
79         tt.SetMaxEstablishedConns(0)
80         tt.SetMaxEstablishedConns(1)
81         tt.Drop()
82 }
83
84 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
85         // TODO?
86         t.SkipNow()
87 }
88
89 func TestAddTorrentNoUsableURLs(t *testing.T) {
90         // TODO?
91         t.SkipNow()
92 }
93
94 func TestAddPeersToUnknownTorrent(t *testing.T) {
95         // TODO?
96         t.SkipNow()
97 }
98
99 func TestPieceHashSize(t *testing.T) {
100         assert.Equal(t, 20, pieceHash.Size())
101 }
102
103 func TestTorrentInitialState(t *testing.T) {
104         dir, mi := testutil.GreetingTestTorrent()
105         defer os.RemoveAll(dir)
106         cl := &Client{
107                 config: &ClientConfig{},
108         }
109         cl.initLogger()
110         tor := cl.newTorrent(
111                 mi.HashInfoBytes(),
112                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
113         )
114         tor.setChunkSize(2)
115         tor.cl.lock()
116         err := tor.setInfoBytes(mi.InfoBytes)
117         tor.cl.unlock()
118         require.NoError(t, err)
119         require.Len(t, tor.pieces, 3)
120         tor.pendAllChunkSpecs(0)
121         tor.cl.lock()
122         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
123         tor.cl.unlock()
124         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
125 }
126
127 func TestReducedDialTimeout(t *testing.T) {
128         cfg := NewDefaultClientConfig()
129         for _, _case := range []struct {
130                 Max             time.Duration
131                 HalfOpenLimit   int
132                 PendingPeers    int
133                 ExpectedReduced time.Duration
134         }{
135                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
136                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
137                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
138                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
139                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
140                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
141         } {
142                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
143                 expected := _case.ExpectedReduced
144                 if expected < cfg.MinDialTimeout {
145                         expected = cfg.MinDialTimeout
146                 }
147                 if reduced != expected {
148                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
149                 }
150         }
151 }
152
153 func TestAddDropManyTorrents(t *testing.T) {
154         cl, err := NewClient(TestingConfig())
155         require.NoError(t, err)
156         defer cl.Close()
157         for i := range iter.N(1000) {
158                 var spec TorrentSpec
159                 binary.PutVarint(spec.InfoHash[:], int64(i))
160                 tt, new, err := cl.AddTorrentSpec(&spec)
161                 assert.NoError(t, err)
162                 assert.True(t, new)
163                 defer tt.Drop()
164         }
165 }
166
167 type FileCacheClientStorageFactoryParams struct {
168         Capacity    int64
169         SetCapacity bool
170         Wrapper     func(*filecache.Cache) storage.ClientImpl
171 }
172
173 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
174         return func(dataDir string) storage.ClientImpl {
175                 fc, err := filecache.NewCache(dataDir)
176                 if err != nil {
177                         panic(err)
178                 }
179                 if ps.SetCapacity {
180                         fc.SetCapacity(ps.Capacity)
181                 }
182                 return ps.Wrapper(fc)
183         }
184 }
185
186 type storageFactory func(string) storage.ClientImpl
187
188 func TestClientTransferDefault(t *testing.T) {
189         testClientTransfer(t, testClientTransferParams{
190                 ExportClientStatus: true,
191                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
192                         Wrapper: fileCachePieceResourceStorage,
193                 }),
194         })
195 }
196
197 func TestClientTransferRateLimitedUpload(t *testing.T) {
198         started := time.Now()
199         testClientTransfer(t, testClientTransferParams{
200                 // We are uploading 13 bytes (the length of the greeting torrent). The
201                 // chunks are 2 bytes in length. Then the smallest burst we can run
202                 // with is 2. Time taken is (13-burst)/rate.
203                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
204                 ExportClientStatus:      true,
205         })
206         require.True(t, time.Since(started) > time.Second)
207 }
208
209 func TestClientTransferRateLimitedDownload(t *testing.T) {
210         testClientTransfer(t, testClientTransferParams{
211                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
212         })
213 }
214
215 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
216         return storage.NewResourcePieces(fc.AsResourceProvider())
217 }
218
219 func TestClientTransferSmallCache(t *testing.T) {
220         testClientTransfer(t, testClientTransferParams{
221                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
222                         SetCapacity: true,
223                         // Going below the piece length means it can't complete a piece so
224                         // that it can be hashed.
225                         Capacity: 5,
226                         Wrapper:  fileCachePieceResourceStorage,
227                 }),
228                 SetReadahead: true,
229                 // Can't readahead too far or the cache will thrash and drop data we
230                 // thought we had.
231                 Readahead:          0,
232                 ExportClientStatus: true,
233         })
234 }
235
236 func TestClientTransferVarious(t *testing.T) {
237         // Leecher storage
238         for _, ls := range []storageFactory{
239                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
240                         Wrapper: fileCachePieceResourceStorage,
241                 }),
242                 storage.NewBoltDB,
243         } {
244                 // Seeder storage
245                 for _, ss := range []func(string) storage.ClientImpl{
246                         storage.NewFile,
247                         storage.NewMMap,
248                 } {
249                         for _, responsive := range []bool{false, true} {
250                                 testClientTransfer(t, testClientTransferParams{
251                                         Responsive:     responsive,
252                                         SeederStorage:  ss,
253                                         LeecherStorage: ls,
254                                 })
255                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
256                                         testClientTransfer(t, testClientTransferParams{
257                                                 SeederStorage:  ss,
258                                                 Responsive:     responsive,
259                                                 SetReadahead:   true,
260                                                 Readahead:      readahead,
261                                                 LeecherStorage: ls,
262                                         })
263                                 }
264                         }
265                 }
266         }
267 }
268
269 type testClientTransferParams struct {
270         Responsive                 bool
271         Readahead                  int64
272         SetReadahead               bool
273         ExportClientStatus         bool
274         LeecherStorage             func(string) storage.ClientImpl
275         SeederStorage              func(string) storage.ClientImpl
276         SeederUploadRateLimiter    *rate.Limiter
277         LeecherDownloadRateLimiter *rate.Limiter
278 }
279
280 // Creates a seeder and a leecher, and ensures the data transfers when a read
281 // is attempted on the leecher.
282 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
283         greetingTempDir, mi := testutil.GreetingTestTorrent()
284         defer os.RemoveAll(greetingTempDir)
285         // Create seeder and a Torrent.
286         cfg := TestingConfig()
287         cfg.Seed = true
288         if ps.SeederUploadRateLimiter != nil {
289                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
290         }
291         // cfg.ListenAddr = "localhost:4000"
292         if ps.SeederStorage != nil {
293                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
294                 defer cfg.DefaultStorage.Close()
295         } else {
296                 cfg.DataDir = greetingTempDir
297         }
298         seeder, err := NewClient(cfg)
299         require.NoError(t, err)
300         if ps.ExportClientStatus {
301                 defer testutil.ExportStatusWriter(seeder, "s")()
302         }
303         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
304         // Run a Stats right after Closing the Client. This will trigger the Stats
305         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
306         defer seederTorrent.Stats()
307         defer seeder.Close()
308         seederTorrent.VerifyData()
309         // Create leecher and a Torrent.
310         leecherDataDir, err := ioutil.TempDir("", "")
311         require.NoError(t, err)
312         defer os.RemoveAll(leecherDataDir)
313         cfg = TestingConfig()
314         if ps.LeecherStorage == nil {
315                 cfg.DataDir = leecherDataDir
316         } else {
317                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
318         }
319         if ps.LeecherDownloadRateLimiter != nil {
320                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
321         }
322         cfg.Seed = false
323         leecher, err := NewClient(cfg)
324         require.NoError(t, err)
325         defer leecher.Close()
326         if ps.ExportClientStatus {
327                 defer testutil.ExportStatusWriter(leecher, "l")()
328         }
329         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
330                 ret = TorrentSpecFromMetaInfo(mi)
331                 ret.ChunkSize = 2
332                 return
333         }())
334         require.NoError(t, err)
335         assert.True(t, new)
336         // Now do some things with leecher and seeder.
337         leecherTorrent.AddClientPeer(seeder)
338         // The Torrent should not be interested in obtaining peers, so the one we
339         // just added should be the only one.
340         assert.False(t, leecherTorrent.Seeding())
341         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
342         r := leecherTorrent.NewReader()
343         defer r.Close()
344         if ps.Responsive {
345                 r.SetResponsive()
346         }
347         if ps.SetReadahead {
348                 r.SetReadahead(ps.Readahead)
349         }
350         assertReadAllGreeting(t, r)
351
352         seederStats := seederTorrent.Stats()
353         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
354         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
355
356         leecherStats := leecherTorrent.Stats()
357         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
358         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
359
360         // Try reading through again for the cases where the torrent data size
361         // exceeds the size of the cache.
362         assertReadAllGreeting(t, r)
363 }
364
365 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
366         pos, err := r.Seek(0, io.SeekStart)
367         assert.NoError(t, err)
368         assert.EqualValues(t, 0, pos)
369         _greeting, err := ioutil.ReadAll(r)
370         assert.NoError(t, err)
371         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
372 }
373
374 // Check that after completing leeching, a leecher transitions to a seeding
375 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
376 func TestSeedAfterDownloading(t *testing.T) {
377         greetingTempDir, mi := testutil.GreetingTestTorrent()
378         defer os.RemoveAll(greetingTempDir)
379
380         cfg := TestingConfig()
381         cfg.Seed = true
382         cfg.DataDir = greetingTempDir
383         seeder, err := NewClient(cfg)
384         require.NoError(t, err)
385         defer seeder.Close()
386         defer testutil.ExportStatusWriter(seeder, "s")()
387         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
388         require.NoError(t, err)
389         assert.True(t, ok)
390         seederTorrent.VerifyData()
391
392         cfg = TestingConfig()
393         cfg.Seed = true
394         cfg.DataDir, err = ioutil.TempDir("", "")
395         require.NoError(t, err)
396         defer os.RemoveAll(cfg.DataDir)
397         leecher, err := NewClient(cfg)
398         require.NoError(t, err)
399         defer leecher.Close()
400         defer testutil.ExportStatusWriter(leecher, "l")()
401
402         cfg = TestingConfig()
403         cfg.Seed = false
404         cfg.DataDir, err = ioutil.TempDir("", "")
405         require.NoError(t, err)
406         defer os.RemoveAll(cfg.DataDir)
407         leecherLeecher, _ := NewClient(cfg)
408         require.NoError(t, err)
409         defer leecherLeecher.Close()
410         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
411         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
412                 ret = TorrentSpecFromMetaInfo(mi)
413                 ret.ChunkSize = 2
414                 return
415         }())
416         require.NoError(t, err)
417         assert.True(t, ok)
418         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
419                 ret = TorrentSpecFromMetaInfo(mi)
420                 ret.ChunkSize = 3
421                 return
422         }())
423         require.NoError(t, err)
424         assert.True(t, ok)
425         // Simultaneously DownloadAll in Leecher, and read the contents
426         // consecutively in LeecherLeecher. This non-deterministically triggered a
427         // case where the leecher wouldn't unchoke the LeecherLeecher.
428         var wg sync.WaitGroup
429         wg.Add(1)
430         go func() {
431                 defer wg.Done()
432                 r := llg.NewReader()
433                 defer r.Close()
434                 b, err := ioutil.ReadAll(r)
435                 require.NoError(t, err)
436                 assert.EqualValues(t, testutil.GreetingFileContents, b)
437         }()
438         done := make(chan struct{})
439         defer close(done)
440         go leecherGreeting.AddClientPeer(seeder)
441         go leecherGreeting.AddClientPeer(leecherLeecher)
442         wg.Add(1)
443         go func() {
444                 defer wg.Done()
445                 leecherGreeting.DownloadAll()
446                 leecher.WaitAll()
447         }()
448         wg.Wait()
449 }
450
451 func TestMergingTrackersByAddingSpecs(t *testing.T) {
452         cl, err := NewClient(TestingConfig())
453         require.NoError(t, err)
454         defer cl.Close()
455         spec := TorrentSpec{}
456         T, new, _ := cl.AddTorrentSpec(&spec)
457         if !new {
458                 t.FailNow()
459         }
460         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
461         _, new, _ = cl.AddTorrentSpec(&spec)
462         assert.False(t, new)
463         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
464         // Because trackers are disabled in TestingConfig.
465         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
466 }
467
468 // We read from a piece which is marked completed, but is missing data.
469 func TestCompletedPieceWrongSize(t *testing.T) {
470         cfg := TestingConfig()
471         cfg.DefaultStorage = badStorage{}
472         cl, err := NewClient(cfg)
473         require.NoError(t, err)
474         defer cl.Close()
475         info := metainfo.Info{
476                 PieceLength: 15,
477                 Pieces:      make([]byte, 20),
478                 Files: []metainfo.FileInfo{
479                         {Path: []string{"greeting"}, Length: 13},
480                 },
481         }
482         b, err := bencode.Marshal(info)
483         require.NoError(t, err)
484         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
485                 InfoBytes: b,
486                 InfoHash:  metainfo.HashBytes(b),
487         })
488         require.NoError(t, err)
489         defer tt.Drop()
490         assert.True(t, new)
491         r := tt.NewReader()
492         defer r.Close()
493         b, err = ioutil.ReadAll(r)
494         assert.Len(t, b, 13)
495         assert.NoError(t, err)
496 }
497
498 func BenchmarkAddLargeTorrent(b *testing.B) {
499         cfg := TestingConfig()
500         cfg.DisableTCP = true
501         cfg.DisableUTP = true
502         cfg.ListenHost = func(string) string { return "redonk" }
503         cl, err := NewClient(cfg)
504         require.NoError(b, err)
505         defer cl.Close()
506         for range iter.N(b.N) {
507                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
508                 if err != nil {
509                         b.Fatal(err)
510                 }
511                 t.Drop()
512         }
513 }
514
515 func TestResponsive(t *testing.T) {
516         seederDataDir, mi := testutil.GreetingTestTorrent()
517         defer os.RemoveAll(seederDataDir)
518         cfg := TestingConfig()
519         cfg.Seed = true
520         cfg.DataDir = seederDataDir
521         seeder, err := NewClient(cfg)
522         require.Nil(t, err)
523         defer seeder.Close()
524         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
525         seederTorrent.VerifyData()
526         leecherDataDir, err := ioutil.TempDir("", "")
527         require.Nil(t, err)
528         defer os.RemoveAll(leecherDataDir)
529         cfg = TestingConfig()
530         cfg.DataDir = leecherDataDir
531         leecher, err := NewClient(cfg)
532         require.Nil(t, err)
533         defer leecher.Close()
534         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
535                 ret = TorrentSpecFromMetaInfo(mi)
536                 ret.ChunkSize = 2
537                 return
538         }())
539         leecherTorrent.AddClientPeer(seeder)
540         reader := leecherTorrent.NewReader()
541         defer reader.Close()
542         reader.SetReadahead(0)
543         reader.SetResponsive()
544         b := make([]byte, 2)
545         _, err = reader.Seek(3, io.SeekStart)
546         require.NoError(t, err)
547         _, err = io.ReadFull(reader, b)
548         assert.Nil(t, err)
549         assert.EqualValues(t, "lo", string(b))
550         _, err = reader.Seek(11, io.SeekStart)
551         require.NoError(t, err)
552         n, err := io.ReadFull(reader, b)
553         assert.Nil(t, err)
554         assert.EqualValues(t, 2, n)
555         assert.EqualValues(t, "d\n", string(b))
556 }
557
558 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
559         seederDataDir, mi := testutil.GreetingTestTorrent()
560         defer os.RemoveAll(seederDataDir)
561         cfg := TestingConfig()
562         cfg.Seed = true
563         cfg.DataDir = seederDataDir
564         seeder, err := NewClient(cfg)
565         require.Nil(t, err)
566         defer seeder.Close()
567         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
568         seederTorrent.VerifyData()
569         leecherDataDir, err := ioutil.TempDir("", "")
570         require.Nil(t, err)
571         defer os.RemoveAll(leecherDataDir)
572         cfg = TestingConfig()
573         cfg.DataDir = leecherDataDir
574         leecher, err := NewClient(cfg)
575         require.Nil(t, err)
576         defer leecher.Close()
577         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
578                 ret = TorrentSpecFromMetaInfo(mi)
579                 ret.ChunkSize = 2
580                 return
581         }())
582         leecherTorrent.AddClientPeer(seeder)
583         reader := leecherTorrent.NewReader()
584         defer reader.Close()
585         reader.SetReadahead(0)
586         reader.SetResponsive()
587         b := make([]byte, 2)
588         _, err = reader.Seek(3, io.SeekStart)
589         require.NoError(t, err)
590         _, err = io.ReadFull(reader, b)
591         assert.Nil(t, err)
592         assert.EqualValues(t, "lo", string(b))
593         go leecherTorrent.Drop()
594         _, err = reader.Seek(11, io.SeekStart)
595         require.NoError(t, err)
596         n, err := reader.Read(b)
597         assert.EqualError(t, err, "torrent closed")
598         assert.EqualValues(t, 0, n)
599 }
600
601 func TestDHTInheritBlocklist(t *testing.T) {
602         ipl := iplist.New(nil)
603         require.NotNil(t, ipl)
604         cfg := TestingConfig()
605         cfg.IPBlocklist = ipl
606         cfg.NoDHT = false
607         cl, err := NewClient(cfg)
608         require.NoError(t, err)
609         defer cl.Close()
610         numServers := 0
611         cl.eachDhtServer(func(s *dht.Server) {
612                 assert.Equal(t, ipl, s.IPBlocklist())
613                 numServers++
614         })
615         assert.EqualValues(t, 2, numServers)
616 }
617
618 // Check that stuff is merged in subsequent AddTorrentSpec for the same
619 // infohash.
620 func TestAddTorrentSpecMerging(t *testing.T) {
621         cl, err := NewClient(TestingConfig())
622         require.NoError(t, err)
623         defer cl.Close()
624         dir, mi := testutil.GreetingTestTorrent()
625         defer os.RemoveAll(dir)
626         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
627                 InfoHash: mi.HashInfoBytes(),
628         })
629         require.NoError(t, err)
630         require.True(t, new)
631         require.Nil(t, tt.Info())
632         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
633         require.NoError(t, err)
634         require.False(t, new)
635         require.NotNil(t, tt.Info())
636 }
637
638 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
639         dir, mi := testutil.GreetingTestTorrent()
640         os.RemoveAll(dir)
641         cl, _ := NewClient(TestingConfig())
642         defer cl.Close()
643         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
644                 InfoHash: mi.HashInfoBytes(),
645         })
646         tt.Drop()
647         assert.EqualValues(t, 0, len(cl.Torrents()))
648         select {
649         case <-tt.GotInfo():
650                 t.FailNow()
651         default:
652         }
653 }
654
655 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
656         for i := range iter.N(info.NumPieces()) {
657                 p := info.Piece(i)
658                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
659         }
660 }
661
662 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
663         fileCacheDir, err := ioutil.TempDir("", "")
664         require.NoError(t, err)
665         defer os.RemoveAll(fileCacheDir)
666         fileCache, err := filecache.NewCache(fileCacheDir)
667         require.NoError(t, err)
668         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
669         defer os.RemoveAll(greetingDataTempDir)
670         filePieceStore := csf(fileCache)
671         defer filePieceStore.Close()
672         info, err := greetingMetainfo.UnmarshalInfo()
673         require.NoError(t, err)
674         ih := greetingMetainfo.HashInfoBytes()
675         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
676         require.NoError(t, err)
677         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
678         // require.Equal(t, len(testutil.GreetingFileContents), written)
679         // require.NoError(t, err)
680         for i := 0; i < info.NumPieces(); i++ {
681                 p := info.Piece(i)
682                 if alreadyCompleted {
683                         require.NoError(t, greetingData.Piece(p).MarkComplete())
684                 }
685         }
686         cfg := TestingConfig()
687         // TODO: Disable network option?
688         cfg.DisableTCP = true
689         cfg.DisableUTP = true
690         cfg.DefaultStorage = filePieceStore
691         cl, err := NewClient(cfg)
692         require.NoError(t, err)
693         defer cl.Close()
694         tt, err := cl.AddTorrent(greetingMetainfo)
695         require.NoError(t, err)
696         psrs := tt.PieceStateRuns()
697         assert.Len(t, psrs, 1)
698         assert.EqualValues(t, 3, psrs[0].Length)
699         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
700         if alreadyCompleted {
701                 r := tt.NewReader()
702                 b, err := ioutil.ReadAll(r)
703                 assert.NoError(t, err)
704                 assert.EqualValues(t, testutil.GreetingFileContents, b)
705         }
706 }
707
708 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
709         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
710 }
711
712 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
713         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
714 }
715
716 func TestAddMetainfoWithNodes(t *testing.T) {
717         cfg := TestingConfig()
718         cfg.ListenHost = func(string) string { return "" }
719         cfg.NoDHT = false
720         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
721         // For now, we want to just jam the nodes into the table, without
722         // verifying them first. Also the DHT code doesn't support mixing secure
723         // and insecure nodes if security is enabled (yet).
724         // cfg.DHTConfig.NoSecurity = true
725         cl, err := NewClient(cfg)
726         require.NoError(t, err)
727         defer cl.Close()
728         sum := func() (ret int64) {
729                 cl.eachDhtServer(func(s *dht.Server) {
730                         ret += s.Stats().OutboundQueriesAttempted
731                 })
732                 return
733         }
734         assert.EqualValues(t, 0, sum())
735         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
736         require.NoError(t, err)
737         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
738         // check if the announce-list is here instead. TODO: Add nodes.
739         assert.Len(t, tt.metainfo.AnnounceList, 5)
740         // There are 6 nodes in the torrent file.
741         assert.EqualValues(t, 6*len(cl.dhtServers), sum())
742 }
743
744 type testDownloadCancelParams struct {
745         SetLeecherStorageCapacity bool
746         LeecherStorageCapacity    int64
747         Cancel                    bool
748 }
749
750 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
751         greetingTempDir, mi := testutil.GreetingTestTorrent()
752         defer os.RemoveAll(greetingTempDir)
753         cfg := TestingConfig()
754         cfg.Seed = true
755         cfg.DataDir = greetingTempDir
756         seeder, err := NewClient(cfg)
757         require.NoError(t, err)
758         defer seeder.Close()
759         defer testutil.ExportStatusWriter(seeder, "s")()
760         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
761         seederTorrent.VerifyData()
762         leecherDataDir, err := ioutil.TempDir("", "")
763         require.NoError(t, err)
764         defer os.RemoveAll(leecherDataDir)
765         fc, err := filecache.NewCache(leecherDataDir)
766         require.NoError(t, err)
767         if ps.SetLeecherStorageCapacity {
768                 fc.SetCapacity(ps.LeecherStorageCapacity)
769         }
770         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
771         cfg.DataDir = leecherDataDir
772         leecher, _ := NewClient(cfg)
773         defer leecher.Close()
774         defer testutil.ExportStatusWriter(leecher, "l")()
775         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
776                 ret = TorrentSpecFromMetaInfo(mi)
777                 ret.ChunkSize = 2
778                 return
779         }())
780         require.NoError(t, err)
781         assert.True(t, new)
782         psc := leecherGreeting.SubscribePieceStateChanges()
783         defer psc.Close()
784
785         leecherGreeting.cl.lock()
786         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
787         if ps.Cancel {
788                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
789         }
790         leecherGreeting.cl.unlock()
791         done := make(chan struct{})
792         defer close(done)
793         go leecherGreeting.AddClientPeer(seeder)
794         completes := make(map[int]bool, 3)
795         expected := func() map[int]bool {
796                 if ps.Cancel {
797                         return map[int]bool{0: false, 1: false, 2: false}
798                 } else {
799                         return map[int]bool{0: true, 1: true, 2: true}
800                 }
801         }()
802         for !reflect.DeepEqual(completes, expected) {
803                 select {
804                 case _v := <-psc.Values:
805                         v := _v.(PieceStateChange)
806                         completes[v.Index] = v.Complete
807                 }
808         }
809 }
810
811 func TestTorrentDownloadAll(t *testing.T) {
812         testDownloadCancel(t, testDownloadCancelParams{})
813 }
814
815 func TestTorrentDownloadAllThenCancel(t *testing.T) {
816         testDownloadCancel(t, testDownloadCancelParams{
817                 Cancel: true,
818         })
819 }
820
821 // Ensure that it's an error for a peer to send an invalid have message.
822 func TestPeerInvalidHave(t *testing.T) {
823         cl, err := NewClient(TestingConfig())
824         require.NoError(t, err)
825         defer cl.Close()
826         info := metainfo.Info{
827                 PieceLength: 1,
828                 Pieces:      make([]byte, 20),
829                 Files:       []metainfo.FileInfo{{Length: 1}},
830         }
831         infoBytes, err := bencode.Marshal(info)
832         require.NoError(t, err)
833         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
834                 InfoBytes: infoBytes,
835                 InfoHash:  metainfo.HashBytes(infoBytes),
836                 Storage:   badStorage{},
837         })
838         require.NoError(t, err)
839         assert.True(t, _new)
840         defer tt.Drop()
841         cn := &connection{
842                 t: tt,
843         }
844         assert.NoError(t, cn.peerSentHave(0))
845         assert.Error(t, cn.peerSentHave(1))
846 }
847
848 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
849         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
850         defer os.RemoveAll(greetingTempDir)
851         cfg := TestingConfig()
852         cfg.DataDir = greetingTempDir
853         seeder, err := NewClient(TestingConfig())
854         require.NoError(t, err)
855         seeder.AddTorrentSpec(&TorrentSpec{
856                 InfoBytes: greetingMetainfo.InfoBytes,
857         })
858 }
859
860 // Check that when the listen port is 0, all the protocols listened on have
861 // the same port, and it isn't zero.
862 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
863         cl, err := NewClient(TestingConfig())
864         require.NoError(t, err)
865         defer cl.Close()
866         port := cl.LocalPort()
867         assert.NotEqual(t, 0, port)
868         cl.eachListener(func(s socket) bool {
869                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
870                 return true
871         })
872 }
873
874 func TestClientDynamicListenTCPOnly(t *testing.T) {
875         cfg := TestingConfig()
876         cfg.DisableUTP = true
877         cl, err := NewClient(cfg)
878         require.NoError(t, err)
879         defer cl.Close()
880         assert.NotEqual(t, 0, cl.LocalPort())
881         cl.eachListener(func(s socket) bool {
882                 assert.True(t, isTcpNetwork(s.Addr().Network()))
883                 return true
884         })
885 }
886
887 func TestClientDynamicListenUTPOnly(t *testing.T) {
888         cfg := TestingConfig()
889         cfg.DisableTCP = true
890         cl, err := NewClient(cfg)
891         require.NoError(t, err)
892         defer cl.Close()
893         assert.NotEqual(t, 0, cl.LocalPort())
894         cl.eachListener(func(s socket) bool {
895                 assert.True(t, isUtpNetwork(s.Addr().Network()))
896                 return true
897         })
898 }
899
900 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
901         cfg := TestingConfig()
902         cfg.DisableTCP = true
903         cfg.DisableUTP = true
904         cl, err := NewClient(cfg)
905         require.NoError(t, err)
906         defer cl.Close()
907         assert.Equal(t, 0, cl.LocalPort())
908 }
909
910 func totalConns(tts []*Torrent) (ret int) {
911         for _, tt := range tts {
912                 tt.cl.lock()
913                 ret += len(tt.conns)
914                 tt.cl.unlock()
915         }
916         return
917 }
918
919 func TestSetMaxEstablishedConn(t *testing.T) {
920         var tts []*Torrent
921         ih := testutil.GreetingMetaInfo().HashInfoBytes()
922         cfg := TestingConfig()
923         cfg.DisableAcceptRateLimiting = true
924         cfg.dropDuplicatePeerIds = true
925         for i := range iter.N(3) {
926                 cl, err := NewClient(cfg)
927                 require.NoError(t, err)
928                 defer cl.Close()
929                 tt, _ := cl.AddTorrentInfoHash(ih)
930                 tt.SetMaxEstablishedConns(2)
931                 defer testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))()
932                 tts = append(tts, tt)
933         }
934         addPeers := func() {
935                 for _, tt := range tts {
936                         for _, _tt := range tts {
937                                 // if tt != _tt {
938                                 tt.AddClientPeer(_tt.cl)
939                                 // }
940                         }
941                 }
942         }
943         waitTotalConns := func(num int) {
944                 for totalConns(tts) != num {
945                         addPeers()
946                         time.Sleep(time.Millisecond)
947                 }
948         }
949         addPeers()
950         waitTotalConns(6)
951         tts[0].SetMaxEstablishedConns(1)
952         waitTotalConns(4)
953         tts[0].SetMaxEstablishedConns(0)
954         waitTotalConns(2)
955         tts[0].SetMaxEstablishedConns(1)
956         addPeers()
957         waitTotalConns(4)
958         tts[0].SetMaxEstablishedConns(2)
959         addPeers()
960         waitTotalConns(6)
961 }
962
963 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
964         os.MkdirAll(dir, 0770)
965         file, err := os.Create(filepath.Join(dir, name))
966         require.NoError(t, err)
967         file.Write([]byte(name))
968         file.Close()
969         mi := metainfo.MetaInfo{}
970         mi.SetDefaults()
971         info := metainfo.Info{PieceLength: 256 * 1024}
972         err = info.BuildFromFilePath(filepath.Join(dir, name))
973         require.NoError(t, err)
974         mi.InfoBytes, err = bencode.Marshal(info)
975         require.NoError(t, err)
976         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
977         tr, err := cl.AddTorrent(&mi)
978         require.NoError(t, err)
979         require.True(t, tr.Seeding())
980         tr.VerifyData()
981         return magnet
982 }
983
984 // https://github.com/anacrolix/torrent/issues/114
985 func TestMultipleTorrentsWithEncryption(t *testing.T) {
986         cfg := TestingConfig()
987         cfg.DisableUTP = true
988         cfg.Seed = true
989         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
990         cfg.ForceEncryption = true
991         os.Mkdir(cfg.DataDir, 0755)
992         server, err := NewClient(cfg)
993         require.NoError(t, err)
994         defer server.Close()
995         defer testutil.ExportStatusWriter(server, "s")()
996         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
997         makeMagnet(t, server, cfg.DataDir, "test2")
998         cfg = TestingConfig()
999         cfg.DisableUTP = true
1000         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1001         cfg.ForceEncryption = true
1002         client, err := NewClient(cfg)
1003         require.NoError(t, err)
1004         defer client.Close()
1005         defer testutil.ExportStatusWriter(client, "c")()
1006         tr, err := client.AddMagnet(magnet1)
1007         require.NoError(t, err)
1008         tr.AddClientPeer(server)
1009         <-tr.GotInfo()
1010         tr.DownloadAll()
1011         client.WaitAll()
1012 }
1013
1014 func TestClientAddressInUse(t *testing.T) {
1015         s, _ := NewUtpSocket("udp", ":50007")
1016         if s != nil {
1017                 defer s.Close()
1018         }
1019         cfg := TestingConfig().SetListenAddr(":50007")
1020         cl, err := NewClient(cfg)
1021         require.Error(t, err)
1022         require.Nil(t, cl)
1023 }