]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
goimports -local
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "os"
9         "path/filepath"
10         "reflect"
11         "sync"
12         "testing"
13         "time"
14
15         "github.com/bradfitz/iter"
16         "github.com/stretchr/testify/assert"
17         "github.com/stretchr/testify/require"
18         "golang.org/x/time/rate"
19
20         "github.com/anacrolix/dht/v2"
21         _ "github.com/anacrolix/envpprof"
22         "github.com/anacrolix/missinggo"
23         "github.com/anacrolix/missinggo/filecache"
24
25         "github.com/anacrolix/torrent/bencode"
26         "github.com/anacrolix/torrent/internal/testutil"
27         "github.com/anacrolix/torrent/iplist"
28         "github.com/anacrolix/torrent/metainfo"
29         "github.com/anacrolix/torrent/storage"
30 )
31
32 func TestingConfig() *ClientConfig {
33         cfg := NewDefaultClientConfig()
34         cfg.ListenHost = LoopbackListenHost
35         cfg.NoDHT = true
36         cfg.DataDir = tempDir()
37         cfg.DisableTrackers = true
38         cfg.NoDefaultPortForwarding = true
39         cfg.DisableAcceptRateLimiting = true
40         cfg.ListenPort = 0
41         return cfg
42 }
43
44 func TestClientDefault(t *testing.T) {
45         cl, err := NewClient(TestingConfig())
46         require.NoError(t, err)
47         cl.Close()
48 }
49
50 func TestClientNilConfig(t *testing.T) {
51         cl, err := NewClient(nil)
52         require.NoError(t, err)
53         cl.Close()
54 }
55
56 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
57         cfg := TestingConfig()
58         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
59         require.NoError(t, err)
60         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
61         defer ci.Close()
62         cfg.DefaultStorage = ci
63         cl, err := NewClient(cfg)
64         require.NoError(t, err)
65         cl.Close()
66         // And again, https://github.com/anacrolix/torrent/issues/158
67         cl, err = NewClient(cfg)
68         require.NoError(t, err)
69         cl.Close()
70 }
71
72 func TestAddDropTorrent(t *testing.T) {
73         cl, err := NewClient(TestingConfig())
74         require.NoError(t, err)
75         defer cl.Close()
76         dir, mi := testutil.GreetingTestTorrent()
77         defer os.RemoveAll(dir)
78         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
79         require.NoError(t, err)
80         assert.True(t, new)
81         tt.SetMaxEstablishedConns(0)
82         tt.SetMaxEstablishedConns(1)
83         tt.Drop()
84 }
85
86 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
87         // TODO?
88         t.SkipNow()
89 }
90
91 func TestAddTorrentNoUsableURLs(t *testing.T) {
92         // TODO?
93         t.SkipNow()
94 }
95
96 func TestAddPeersToUnknownTorrent(t *testing.T) {
97         // TODO?
98         t.SkipNow()
99 }
100
101 func TestPieceHashSize(t *testing.T) {
102         assert.Equal(t, 20, pieceHash.Size())
103 }
104
105 func TestTorrentInitialState(t *testing.T) {
106         dir, mi := testutil.GreetingTestTorrent()
107         defer os.RemoveAll(dir)
108         cl := &Client{
109                 config: &ClientConfig{},
110         }
111         cl.initLogger()
112         tor := cl.newTorrent(
113                 mi.HashInfoBytes(),
114                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
115         )
116         tor.setChunkSize(2)
117         tor.cl.lock()
118         err := tor.setInfoBytes(mi.InfoBytes)
119         tor.cl.unlock()
120         require.NoError(t, err)
121         require.Len(t, tor.pieces, 3)
122         tor.pendAllChunkSpecs(0)
123         tor.cl.lock()
124         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
125         tor.cl.unlock()
126         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
127 }
128
129 func TestReducedDialTimeout(t *testing.T) {
130         cfg := NewDefaultClientConfig()
131         for _, _case := range []struct {
132                 Max             time.Duration
133                 HalfOpenLimit   int
134                 PendingPeers    int
135                 ExpectedReduced time.Duration
136         }{
137                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
138                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
139                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
140                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
141                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
142                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
143         } {
144                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
145                 expected := _case.ExpectedReduced
146                 if expected < cfg.MinDialTimeout {
147                         expected = cfg.MinDialTimeout
148                 }
149                 if reduced != expected {
150                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
151                 }
152         }
153 }
154
155 func TestAddDropManyTorrents(t *testing.T) {
156         cl, err := NewClient(TestingConfig())
157         require.NoError(t, err)
158         defer cl.Close()
159         for i := range iter.N(1000) {
160                 var spec TorrentSpec
161                 binary.PutVarint(spec.InfoHash[:], int64(i))
162                 tt, new, err := cl.AddTorrentSpec(&spec)
163                 assert.NoError(t, err)
164                 assert.True(t, new)
165                 defer tt.Drop()
166         }
167 }
168
169 type FileCacheClientStorageFactoryParams struct {
170         Capacity    int64
171         SetCapacity bool
172         Wrapper     func(*filecache.Cache) storage.ClientImpl
173 }
174
175 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
176         return func(dataDir string) storage.ClientImpl {
177                 fc, err := filecache.NewCache(dataDir)
178                 if err != nil {
179                         panic(err)
180                 }
181                 if ps.SetCapacity {
182                         fc.SetCapacity(ps.Capacity)
183                 }
184                 return ps.Wrapper(fc)
185         }
186 }
187
188 type storageFactory func(string) storage.ClientImpl
189
190 func TestClientTransferDefault(t *testing.T) {
191         testClientTransfer(t, testClientTransferParams{
192                 ExportClientStatus: true,
193                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
194                         Wrapper: fileCachePieceResourceStorage,
195                 }),
196         })
197 }
198
199 func TestClientTransferRateLimitedUpload(t *testing.T) {
200         started := time.Now()
201         testClientTransfer(t, testClientTransferParams{
202                 // We are uploading 13 bytes (the length of the greeting torrent). The
203                 // chunks are 2 bytes in length. Then the smallest burst we can run
204                 // with is 2. Time taken is (13-burst)/rate.
205                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
206                 ExportClientStatus:      true,
207         })
208         require.True(t, time.Since(started) > time.Second)
209 }
210
211 func TestClientTransferRateLimitedDownload(t *testing.T) {
212         testClientTransfer(t, testClientTransferParams{
213                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
214         })
215 }
216
217 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
218         return storage.NewResourcePieces(fc.AsResourceProvider())
219 }
220
221 func TestClientTransferSmallCache(t *testing.T) {
222         testClientTransfer(t, testClientTransferParams{
223                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
224                         SetCapacity: true,
225                         // Going below the piece length means it can't complete a piece so
226                         // that it can be hashed.
227                         Capacity: 5,
228                         Wrapper:  fileCachePieceResourceStorage,
229                 }),
230                 SetReadahead: true,
231                 // Can't readahead too far or the cache will thrash and drop data we
232                 // thought we had.
233                 Readahead:          0,
234                 ExportClientStatus: true,
235         })
236 }
237
238 func TestClientTransferVarious(t *testing.T) {
239         // Leecher storage
240         for _, ls := range []storageFactory{
241                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
242                         Wrapper: fileCachePieceResourceStorage,
243                 }),
244                 storage.NewBoltDB,
245         } {
246                 // Seeder storage
247                 for _, ss := range []func(string) storage.ClientImpl{
248                         storage.NewFile,
249                         storage.NewMMap,
250                 } {
251                         for _, responsive := range []bool{false, true} {
252                                 testClientTransfer(t, testClientTransferParams{
253                                         Responsive:     responsive,
254                                         SeederStorage:  ss,
255                                         LeecherStorage: ls,
256                                 })
257                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
258                                         testClientTransfer(t, testClientTransferParams{
259                                                 SeederStorage:  ss,
260                                                 Responsive:     responsive,
261                                                 SetReadahead:   true,
262                                                 Readahead:      readahead,
263                                                 LeecherStorage: ls,
264                                         })
265                                 }
266                         }
267                 }
268         }
269 }
270
271 type testClientTransferParams struct {
272         Responsive                 bool
273         Readahead                  int64
274         SetReadahead               bool
275         ExportClientStatus         bool
276         LeecherStorage             func(string) storage.ClientImpl
277         SeederStorage              func(string) storage.ClientImpl
278         SeederUploadRateLimiter    *rate.Limiter
279         LeecherDownloadRateLimiter *rate.Limiter
280 }
281
282 // Creates a seeder and a leecher, and ensures the data transfers when a read
283 // is attempted on the leecher.
284 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
285         greetingTempDir, mi := testutil.GreetingTestTorrent()
286         defer os.RemoveAll(greetingTempDir)
287         // Create seeder and a Torrent.
288         cfg := TestingConfig()
289         cfg.Seed = true
290         if ps.SeederUploadRateLimiter != nil {
291                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
292         }
293         // cfg.ListenAddr = "localhost:4000"
294         if ps.SeederStorage != nil {
295                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
296                 defer cfg.DefaultStorage.Close()
297         } else {
298                 cfg.DataDir = greetingTempDir
299         }
300         seeder, err := NewClient(cfg)
301         require.NoError(t, err)
302         if ps.ExportClientStatus {
303                 defer testutil.ExportStatusWriter(seeder, "s")()
304         }
305         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
306         // Run a Stats right after Closing the Client. This will trigger the Stats
307         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
308         defer seederTorrent.Stats()
309         defer seeder.Close()
310         seederTorrent.VerifyData()
311         // Create leecher and a Torrent.
312         leecherDataDir, err := ioutil.TempDir("", "")
313         require.NoError(t, err)
314         defer os.RemoveAll(leecherDataDir)
315         cfg = TestingConfig()
316         if ps.LeecherStorage == nil {
317                 cfg.DataDir = leecherDataDir
318         } else {
319                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
320         }
321         if ps.LeecherDownloadRateLimiter != nil {
322                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
323         }
324         cfg.Seed = false
325         leecher, err := NewClient(cfg)
326         require.NoError(t, err)
327         defer leecher.Close()
328         if ps.ExportClientStatus {
329                 defer testutil.ExportStatusWriter(leecher, "l")()
330         }
331         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
332                 ret = TorrentSpecFromMetaInfo(mi)
333                 ret.ChunkSize = 2
334                 return
335         }())
336         require.NoError(t, err)
337         assert.True(t, new)
338         // Now do some things with leecher and seeder.
339         leecherTorrent.AddClientPeer(seeder)
340         // The Torrent should not be interested in obtaining peers, so the one we
341         // just added should be the only one.
342         assert.False(t, leecherTorrent.Seeding())
343         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
344         r := leecherTorrent.NewReader()
345         defer r.Close()
346         if ps.Responsive {
347                 r.SetResponsive()
348         }
349         if ps.SetReadahead {
350                 r.SetReadahead(ps.Readahead)
351         }
352         assertReadAllGreeting(t, r)
353
354         seederStats := seederTorrent.Stats()
355         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
356         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
357
358         leecherStats := leecherTorrent.Stats()
359         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
360         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
361
362         // Try reading through again for the cases where the torrent data size
363         // exceeds the size of the cache.
364         assertReadAllGreeting(t, r)
365 }
366
367 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
368         pos, err := r.Seek(0, io.SeekStart)
369         assert.NoError(t, err)
370         assert.EqualValues(t, 0, pos)
371         _greeting, err := ioutil.ReadAll(r)
372         assert.NoError(t, err)
373         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
374 }
375
376 // Check that after completing leeching, a leecher transitions to a seeding
377 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
378 func TestSeedAfterDownloading(t *testing.T) {
379         greetingTempDir, mi := testutil.GreetingTestTorrent()
380         defer os.RemoveAll(greetingTempDir)
381
382         cfg := TestingConfig()
383         cfg.Seed = true
384         cfg.DataDir = greetingTempDir
385         seeder, err := NewClient(cfg)
386         require.NoError(t, err)
387         defer seeder.Close()
388         defer testutil.ExportStatusWriter(seeder, "s")()
389         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
390         require.NoError(t, err)
391         assert.True(t, ok)
392         seederTorrent.VerifyData()
393
394         cfg = TestingConfig()
395         cfg.Seed = true
396         cfg.DataDir, err = ioutil.TempDir("", "")
397         require.NoError(t, err)
398         defer os.RemoveAll(cfg.DataDir)
399         leecher, err := NewClient(cfg)
400         require.NoError(t, err)
401         defer leecher.Close()
402         defer testutil.ExportStatusWriter(leecher, "l")()
403
404         cfg = TestingConfig()
405         cfg.Seed = false
406         cfg.DataDir, err = ioutil.TempDir("", "")
407         require.NoError(t, err)
408         defer os.RemoveAll(cfg.DataDir)
409         leecherLeecher, _ := NewClient(cfg)
410         require.NoError(t, err)
411         defer leecherLeecher.Close()
412         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
413         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
414                 ret = TorrentSpecFromMetaInfo(mi)
415                 ret.ChunkSize = 2
416                 return
417         }())
418         require.NoError(t, err)
419         assert.True(t, ok)
420         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
421                 ret = TorrentSpecFromMetaInfo(mi)
422                 ret.ChunkSize = 3
423                 return
424         }())
425         require.NoError(t, err)
426         assert.True(t, ok)
427         // Simultaneously DownloadAll in Leecher, and read the contents
428         // consecutively in LeecherLeecher. This non-deterministically triggered a
429         // case where the leecher wouldn't unchoke the LeecherLeecher.
430         var wg sync.WaitGroup
431         wg.Add(1)
432         go func() {
433                 defer wg.Done()
434                 r := llg.NewReader()
435                 defer r.Close()
436                 b, err := ioutil.ReadAll(r)
437                 require.NoError(t, err)
438                 assert.EqualValues(t, testutil.GreetingFileContents, b)
439         }()
440         done := make(chan struct{})
441         defer close(done)
442         go leecherGreeting.AddClientPeer(seeder)
443         go leecherGreeting.AddClientPeer(leecherLeecher)
444         wg.Add(1)
445         go func() {
446                 defer wg.Done()
447                 leecherGreeting.DownloadAll()
448                 leecher.WaitAll()
449         }()
450         wg.Wait()
451 }
452
453 func TestMergingTrackersByAddingSpecs(t *testing.T) {
454         cl, err := NewClient(TestingConfig())
455         require.NoError(t, err)
456         defer cl.Close()
457         spec := TorrentSpec{}
458         T, new, _ := cl.AddTorrentSpec(&spec)
459         if !new {
460                 t.FailNow()
461         }
462         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
463         _, new, _ = cl.AddTorrentSpec(&spec)
464         assert.False(t, new)
465         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
466         // Because trackers are disabled in TestingConfig.
467         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
468 }
469
470 // We read from a piece which is marked completed, but is missing data.
471 func TestCompletedPieceWrongSize(t *testing.T) {
472         cfg := TestingConfig()
473         cfg.DefaultStorage = badStorage{}
474         cl, err := NewClient(cfg)
475         require.NoError(t, err)
476         defer cl.Close()
477         info := metainfo.Info{
478                 PieceLength: 15,
479                 Pieces:      make([]byte, 20),
480                 Files: []metainfo.FileInfo{
481                         {Path: []string{"greeting"}, Length: 13},
482                 },
483         }
484         b, err := bencode.Marshal(info)
485         require.NoError(t, err)
486         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
487                 InfoBytes: b,
488                 InfoHash:  metainfo.HashBytes(b),
489         })
490         require.NoError(t, err)
491         defer tt.Drop()
492         assert.True(t, new)
493         r := tt.NewReader()
494         defer r.Close()
495         b, err = ioutil.ReadAll(r)
496         assert.Len(t, b, 13)
497         assert.NoError(t, err)
498 }
499
500 func BenchmarkAddLargeTorrent(b *testing.B) {
501         cfg := TestingConfig()
502         cfg.DisableTCP = true
503         cfg.DisableUTP = true
504         cl, err := NewClient(cfg)
505         require.NoError(b, err)
506         defer cl.Close()
507         b.ReportAllocs()
508         for range iter.N(b.N) {
509                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
510                 if err != nil {
511                         b.Fatal(err)
512                 }
513                 t.Drop()
514         }
515 }
516
517 func TestResponsive(t *testing.T) {
518         seederDataDir, mi := testutil.GreetingTestTorrent()
519         defer os.RemoveAll(seederDataDir)
520         cfg := TestingConfig()
521         cfg.Seed = true
522         cfg.DataDir = seederDataDir
523         seeder, err := NewClient(cfg)
524         require.Nil(t, err)
525         defer seeder.Close()
526         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
527         seederTorrent.VerifyData()
528         leecherDataDir, err := ioutil.TempDir("", "")
529         require.Nil(t, err)
530         defer os.RemoveAll(leecherDataDir)
531         cfg = TestingConfig()
532         cfg.DataDir = leecherDataDir
533         leecher, err := NewClient(cfg)
534         require.Nil(t, err)
535         defer leecher.Close()
536         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
537                 ret = TorrentSpecFromMetaInfo(mi)
538                 ret.ChunkSize = 2
539                 return
540         }())
541         leecherTorrent.AddClientPeer(seeder)
542         reader := leecherTorrent.NewReader()
543         defer reader.Close()
544         reader.SetReadahead(0)
545         reader.SetResponsive()
546         b := make([]byte, 2)
547         _, err = reader.Seek(3, io.SeekStart)
548         require.NoError(t, err)
549         _, err = io.ReadFull(reader, b)
550         assert.Nil(t, err)
551         assert.EqualValues(t, "lo", string(b))
552         _, err = reader.Seek(11, io.SeekStart)
553         require.NoError(t, err)
554         n, err := io.ReadFull(reader, b)
555         assert.Nil(t, err)
556         assert.EqualValues(t, 2, n)
557         assert.EqualValues(t, "d\n", string(b))
558 }
559
560 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
561         seederDataDir, mi := testutil.GreetingTestTorrent()
562         defer os.RemoveAll(seederDataDir)
563         cfg := TestingConfig()
564         cfg.Seed = true
565         cfg.DataDir = seederDataDir
566         seeder, err := NewClient(cfg)
567         require.Nil(t, err)
568         defer seeder.Close()
569         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
570         seederTorrent.VerifyData()
571         leecherDataDir, err := ioutil.TempDir("", "")
572         require.Nil(t, err)
573         defer os.RemoveAll(leecherDataDir)
574         cfg = TestingConfig()
575         cfg.DataDir = leecherDataDir
576         leecher, err := NewClient(cfg)
577         require.Nil(t, err)
578         defer leecher.Close()
579         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
580                 ret = TorrentSpecFromMetaInfo(mi)
581                 ret.ChunkSize = 2
582                 return
583         }())
584         leecherTorrent.AddClientPeer(seeder)
585         reader := leecherTorrent.NewReader()
586         defer reader.Close()
587         reader.SetReadahead(0)
588         reader.SetResponsive()
589         b := make([]byte, 2)
590         _, err = reader.Seek(3, io.SeekStart)
591         require.NoError(t, err)
592         _, err = io.ReadFull(reader, b)
593         assert.Nil(t, err)
594         assert.EqualValues(t, "lo", string(b))
595         go leecherTorrent.Drop()
596         _, err = reader.Seek(11, io.SeekStart)
597         require.NoError(t, err)
598         n, err := reader.Read(b)
599         assert.EqualError(t, err, "torrent closed")
600         assert.EqualValues(t, 0, n)
601 }
602
603 func TestDHTInheritBlocklist(t *testing.T) {
604         ipl := iplist.New(nil)
605         require.NotNil(t, ipl)
606         cfg := TestingConfig()
607         cfg.IPBlocklist = ipl
608         cfg.NoDHT = false
609         cl, err := NewClient(cfg)
610         require.NoError(t, err)
611         defer cl.Close()
612         numServers := 0
613         cl.eachDhtServer(func(s *dht.Server) {
614                 assert.Equal(t, ipl, s.IPBlocklist())
615                 numServers++
616         })
617         assert.EqualValues(t, 2, numServers)
618 }
619
620 // Check that stuff is merged in subsequent AddTorrentSpec for the same
621 // infohash.
622 func TestAddTorrentSpecMerging(t *testing.T) {
623         cl, err := NewClient(TestingConfig())
624         require.NoError(t, err)
625         defer cl.Close()
626         dir, mi := testutil.GreetingTestTorrent()
627         defer os.RemoveAll(dir)
628         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
629                 InfoHash: mi.HashInfoBytes(),
630         })
631         require.NoError(t, err)
632         require.True(t, new)
633         require.Nil(t, tt.Info())
634         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
635         require.NoError(t, err)
636         require.False(t, new)
637         require.NotNil(t, tt.Info())
638 }
639
640 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
641         dir, mi := testutil.GreetingTestTorrent()
642         os.RemoveAll(dir)
643         cl, _ := NewClient(TestingConfig())
644         defer cl.Close()
645         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
646                 InfoHash: mi.HashInfoBytes(),
647         })
648         tt.Drop()
649         assert.EqualValues(t, 0, len(cl.Torrents()))
650         select {
651         case <-tt.GotInfo():
652                 t.FailNow()
653         default:
654         }
655 }
656
657 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
658         for i := range iter.N(info.NumPieces()) {
659                 p := info.Piece(i)
660                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
661         }
662 }
663
664 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
665         fileCacheDir, err := ioutil.TempDir("", "")
666         require.NoError(t, err)
667         defer os.RemoveAll(fileCacheDir)
668         fileCache, err := filecache.NewCache(fileCacheDir)
669         require.NoError(t, err)
670         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
671         defer os.RemoveAll(greetingDataTempDir)
672         filePieceStore := csf(fileCache)
673         defer filePieceStore.Close()
674         info, err := greetingMetainfo.UnmarshalInfo()
675         require.NoError(t, err)
676         ih := greetingMetainfo.HashInfoBytes()
677         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
678         require.NoError(t, err)
679         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
680         // require.Equal(t, len(testutil.GreetingFileContents), written)
681         // require.NoError(t, err)
682         for i := 0; i < info.NumPieces(); i++ {
683                 p := info.Piece(i)
684                 if alreadyCompleted {
685                         require.NoError(t, greetingData.Piece(p).MarkComplete())
686                 }
687         }
688         cfg := TestingConfig()
689         // TODO: Disable network option?
690         cfg.DisableTCP = true
691         cfg.DisableUTP = true
692         cfg.DefaultStorage = filePieceStore
693         cl, err := NewClient(cfg)
694         require.NoError(t, err)
695         defer cl.Close()
696         tt, err := cl.AddTorrent(greetingMetainfo)
697         require.NoError(t, err)
698         psrs := tt.PieceStateRuns()
699         assert.Len(t, psrs, 1)
700         assert.EqualValues(t, 3, psrs[0].Length)
701         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
702         if alreadyCompleted {
703                 r := tt.NewReader()
704                 b, err := ioutil.ReadAll(r)
705                 assert.NoError(t, err)
706                 assert.EqualValues(t, testutil.GreetingFileContents, b)
707         }
708 }
709
710 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
711         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
712 }
713
714 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
715         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
716 }
717
718 func TestAddMetainfoWithNodes(t *testing.T) {
719         cfg := TestingConfig()
720         cfg.ListenHost = func(string) string { return "" }
721         cfg.NoDHT = false
722         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
723         // For now, we want to just jam the nodes into the table, without
724         // verifying them first. Also the DHT code doesn't support mixing secure
725         // and insecure nodes if security is enabled (yet).
726         // cfg.DHTConfig.NoSecurity = true
727         cl, err := NewClient(cfg)
728         require.NoError(t, err)
729         defer cl.Close()
730         sum := func() (ret int64) {
731                 cl.eachDhtServer(func(s *dht.Server) {
732                         ret += s.Stats().OutboundQueriesAttempted
733                 })
734                 return
735         }
736         assert.EqualValues(t, 0, sum())
737         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
738         require.NoError(t, err)
739         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
740         // check if the announce-list is here instead. TODO: Add nodes.
741         assert.Len(t, tt.metainfo.AnnounceList, 5)
742         // There are 6 nodes in the torrent file.
743         for sum() != int64(6*len(cl.dhtServers)) {
744                 time.Sleep(time.Millisecond)
745         }
746 }
747
748 type testDownloadCancelParams struct {
749         SetLeecherStorageCapacity bool
750         LeecherStorageCapacity    int64
751         Cancel                    bool
752 }
753
754 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
755         greetingTempDir, mi := testutil.GreetingTestTorrent()
756         defer os.RemoveAll(greetingTempDir)
757         cfg := TestingConfig()
758         cfg.Seed = true
759         cfg.DataDir = greetingTempDir
760         seeder, err := NewClient(cfg)
761         require.NoError(t, err)
762         defer seeder.Close()
763         defer testutil.ExportStatusWriter(seeder, "s")()
764         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
765         seederTorrent.VerifyData()
766         leecherDataDir, err := ioutil.TempDir("", "")
767         require.NoError(t, err)
768         defer os.RemoveAll(leecherDataDir)
769         fc, err := filecache.NewCache(leecherDataDir)
770         require.NoError(t, err)
771         if ps.SetLeecherStorageCapacity {
772                 fc.SetCapacity(ps.LeecherStorageCapacity)
773         }
774         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
775         cfg.DataDir = leecherDataDir
776         leecher, err := NewClient(cfg)
777         require.NoError(t, err)
778         defer leecher.Close()
779         defer testutil.ExportStatusWriter(leecher, "l")()
780         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
781                 ret = TorrentSpecFromMetaInfo(mi)
782                 ret.ChunkSize = 2
783                 return
784         }())
785         require.NoError(t, err)
786         assert.True(t, new)
787         psc := leecherGreeting.SubscribePieceStateChanges()
788         defer psc.Close()
789
790         leecherGreeting.cl.lock()
791         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
792         if ps.Cancel {
793                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
794         }
795         leecherGreeting.cl.unlock()
796         done := make(chan struct{})
797         defer close(done)
798         go leecherGreeting.AddClientPeer(seeder)
799         completes := make(map[int]bool, 3)
800         expected := func() map[int]bool {
801                 if ps.Cancel {
802                         return map[int]bool{0: false, 1: false, 2: false}
803                 } else {
804                         return map[int]bool{0: true, 1: true, 2: true}
805                 }
806         }()
807         for !reflect.DeepEqual(completes, expected) {
808                 _v := <-psc.Values
809                 v := _v.(PieceStateChange)
810                 completes[v.Index] = v.Complete
811         }
812 }
813
814 func TestTorrentDownloadAll(t *testing.T) {
815         testDownloadCancel(t, testDownloadCancelParams{})
816 }
817
818 func TestTorrentDownloadAllThenCancel(t *testing.T) {
819         testDownloadCancel(t, testDownloadCancelParams{
820                 Cancel: true,
821         })
822 }
823
824 // Ensure that it's an error for a peer to send an invalid have message.
825 func TestPeerInvalidHave(t *testing.T) {
826         cl, err := NewClient(TestingConfig())
827         require.NoError(t, err)
828         defer cl.Close()
829         info := metainfo.Info{
830                 PieceLength: 1,
831                 Pieces:      make([]byte, 20),
832                 Files:       []metainfo.FileInfo{{Length: 1}},
833         }
834         infoBytes, err := bencode.Marshal(info)
835         require.NoError(t, err)
836         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
837                 InfoBytes: infoBytes,
838                 InfoHash:  metainfo.HashBytes(infoBytes),
839                 Storage:   badStorage{},
840         })
841         require.NoError(t, err)
842         assert.True(t, _new)
843         defer tt.Drop()
844         cn := &connection{
845                 t: tt,
846         }
847         assert.NoError(t, cn.peerSentHave(0))
848         assert.Error(t, cn.peerSentHave(1))
849 }
850
851 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
852         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
853         defer os.RemoveAll(greetingTempDir)
854         cfg := TestingConfig()
855         cfg.DataDir = greetingTempDir
856         seeder, err := NewClient(TestingConfig())
857         require.NoError(t, err)
858         seeder.AddTorrentSpec(&TorrentSpec{
859                 InfoBytes: greetingMetainfo.InfoBytes,
860         })
861 }
862
863 // Check that when the listen port is 0, all the protocols listened on have
864 // the same port, and it isn't zero.
865 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
866         cl, err := NewClient(TestingConfig())
867         require.NoError(t, err)
868         defer cl.Close()
869         port := cl.LocalPort()
870         assert.NotEqual(t, 0, port)
871         cl.eachListener(func(s socket) bool {
872                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
873                 return true
874         })
875 }
876
877 func TestClientDynamicListenTCPOnly(t *testing.T) {
878         cfg := TestingConfig()
879         cfg.DisableUTP = true
880         cfg.DisableTCP = false
881         cl, err := NewClient(cfg)
882         require.NoError(t, err)
883         defer cl.Close()
884         assert.NotEqual(t, 0, cl.LocalPort())
885 }
886
887 func TestClientDynamicListenUTPOnly(t *testing.T) {
888         cfg := TestingConfig()
889         cfg.DisableTCP = true
890         cfg.DisableUTP = false
891         cl, err := NewClient(cfg)
892         require.NoError(t, err)
893         defer cl.Close()
894         assert.NotEqual(t, 0, cl.LocalPort())
895 }
896
897 func totalConns(tts []*Torrent) (ret int) {
898         for _, tt := range tts {
899                 tt.cl.lock()
900                 ret += len(tt.conns)
901                 tt.cl.unlock()
902         }
903         return
904 }
905
906 func TestSetMaxEstablishedConn(t *testing.T) {
907         var tts []*Torrent
908         ih := testutil.GreetingMetaInfo().HashInfoBytes()
909         cfg := TestingConfig()
910         cfg.DisableAcceptRateLimiting = true
911         cfg.dropDuplicatePeerIds = true
912         for i := range iter.N(3) {
913                 cl, err := NewClient(cfg)
914                 require.NoError(t, err)
915                 defer cl.Close()
916                 tt, _ := cl.AddTorrentInfoHash(ih)
917                 tt.SetMaxEstablishedConns(2)
918                 defer testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))()
919                 tts = append(tts, tt)
920         }
921         addPeers := func() {
922                 for _, tt := range tts {
923                         for _, _tt := range tts {
924                                 // if tt != _tt {
925                                 tt.AddClientPeer(_tt.cl)
926                                 // }
927                         }
928                 }
929         }
930         waitTotalConns := func(num int) {
931                 for totalConns(tts) != num {
932                         addPeers()
933                         time.Sleep(time.Millisecond)
934                 }
935         }
936         addPeers()
937         waitTotalConns(6)
938         tts[0].SetMaxEstablishedConns(1)
939         waitTotalConns(4)
940         tts[0].SetMaxEstablishedConns(0)
941         waitTotalConns(2)
942         tts[0].SetMaxEstablishedConns(1)
943         addPeers()
944         waitTotalConns(4)
945         tts[0].SetMaxEstablishedConns(2)
946         addPeers()
947         waitTotalConns(6)
948 }
949
950 // Creates a file containing its own name as data. Make a metainfo from that, adds it to the given
951 // client, and returns a magnet link.
952 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
953         os.MkdirAll(dir, 0770)
954         file, err := os.Create(filepath.Join(dir, name))
955         require.NoError(t, err)
956         file.Write([]byte(name))
957         file.Close()
958         mi := metainfo.MetaInfo{}
959         mi.SetDefaults()
960         info := metainfo.Info{PieceLength: 256 * 1024}
961         err = info.BuildFromFilePath(filepath.Join(dir, name))
962         require.NoError(t, err)
963         mi.InfoBytes, err = bencode.Marshal(info)
964         require.NoError(t, err)
965         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
966         tr, err := cl.AddTorrent(&mi)
967         require.NoError(t, err)
968         require.True(t, tr.Seeding())
969         tr.VerifyData()
970         return magnet
971 }
972
973 // https://github.com/anacrolix/torrent/issues/114
974 func TestMultipleTorrentsWithEncryption(t *testing.T) {
975         testSeederLeecherPair(
976                 t,
977                 func(cfg *ClientConfig) {
978                         cfg.HeaderObfuscationPolicy.Preferred = true
979                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
980                 },
981                 func(cfg *ClientConfig) {
982                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
983                 },
984         )
985 }
986
987 // Test that the leecher can download a torrent in its entirety from the seeder. Note that the
988 // seeder config is done first.
989 func testSeederLeecherPair(t *testing.T, seeder func(*ClientConfig), leecher func(*ClientConfig)) {
990         cfg := TestingConfig()
991         cfg.Seed = true
992         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
993         os.Mkdir(cfg.DataDir, 0755)
994         seeder(cfg)
995         server, err := NewClient(cfg)
996         require.NoError(t, err)
997         defer server.Close()
998         defer testutil.ExportStatusWriter(server, "s")()
999         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1000         // Extra torrents are added to test the seeder having to match incoming obfuscated headers
1001         // against more than one torrent. See issue #114
1002         makeMagnet(t, server, cfg.DataDir, "test2")
1003         for i := 0; i < 100; i++ {
1004                 makeMagnet(t, server, cfg.DataDir, fmt.Sprintf("test%d", i+2))
1005         }
1006         cfg = TestingConfig()
1007         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1008         leecher(cfg)
1009         client, err := NewClient(cfg)
1010         require.NoError(t, err)
1011         defer client.Close()
1012         defer testutil.ExportStatusWriter(client, "c")()
1013         tr, err := client.AddMagnet(magnet1)
1014         require.NoError(t, err)
1015         tr.AddClientPeer(server)
1016         <-tr.GotInfo()
1017         tr.DownloadAll()
1018         client.WaitAll()
1019 }
1020
1021 // This appears to be the situation with the S3 BitTorrent client.
1022 func TestObfuscatedHeaderFallbackSeederDisallowsLeecherPrefers(t *testing.T) {
1023         // Leecher prefers obfuscation, but the seeder does not allow it.
1024         testSeederLeecherPair(
1025                 t,
1026                 func(cfg *ClientConfig) {
1027                         cfg.HeaderObfuscationPolicy.Preferred = false
1028                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1029                 },
1030                 func(cfg *ClientConfig) {
1031                         cfg.HeaderObfuscationPolicy.Preferred = true
1032                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1033                 },
1034         )
1035 }
1036
1037 func TestObfuscatedHeaderFallbackSeederRequiresLeecherPrefersNot(t *testing.T) {
1038         // Leecher prefers no obfuscation, but the seeder enforces it.
1039         testSeederLeecherPair(
1040                 t,
1041                 func(cfg *ClientConfig) {
1042                         cfg.HeaderObfuscationPolicy.Preferred = true
1043                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1044                 },
1045                 func(cfg *ClientConfig) {
1046                         cfg.HeaderObfuscationPolicy.Preferred = false
1047                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1048                 },
1049         )
1050 }
1051
1052 func TestClientAddressInUse(t *testing.T) {
1053         s, _ := NewUtpSocket("udp", ":50007", nil)
1054         if s != nil {
1055                 defer s.Close()
1056         }
1057         cfg := TestingConfig().SetListenAddr(":50007")
1058         cl, err := NewClient(cfg)
1059         require.Error(t, err)
1060         require.Nil(t, cl)
1061 }
1062
1063 func TestClientHasDhtServersWhenUtpDisabled(t *testing.T) {
1064         cc := TestingConfig()
1065         cc.DisableUTP = true
1066         cc.NoDHT = false
1067         cl, err := NewClient(cc)
1068         require.NoError(t, err)
1069         defer cl.Close()
1070         assert.NotEmpty(t, cl.DhtServers())
1071 }