]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Fix benchmark broken by changes to client listeners
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "os"
9         "path/filepath"
10         "reflect"
11         "sync"
12         "testing"
13         "time"
14
15         "github.com/anacrolix/dht"
16         _ "github.com/anacrolix/envpprof"
17         "github.com/anacrolix/missinggo"
18         "github.com/anacrolix/missinggo/filecache"
19         "github.com/anacrolix/torrent/bencode"
20         "github.com/anacrolix/torrent/internal/testutil"
21         "github.com/anacrolix/torrent/iplist"
22         "github.com/anacrolix/torrent/metainfo"
23         "github.com/anacrolix/torrent/storage"
24         "github.com/bradfitz/iter"
25         "github.com/stretchr/testify/assert"
26         "github.com/stretchr/testify/require"
27         "golang.org/x/time/rate"
28 )
29
30 func TestingConfig() *ClientConfig {
31         cfg := NewDefaultClientConfig()
32         cfg.ListenHost = LoopbackListenHost
33         cfg.NoDHT = true
34         cfg.DataDir = tempDir()
35         cfg.DisableTrackers = true
36         cfg.NoDefaultPortForwarding = true
37         cfg.DisableAcceptRateLimiting = true
38         return cfg
39 }
40
41 func TestClientDefault(t *testing.T) {
42         cl, err := NewClient(TestingConfig())
43         require.NoError(t, err)
44         cl.Close()
45 }
46
47 func TestClientNilConfig(t *testing.T) {
48         cl, err := NewClient(nil)
49         require.NoError(t, err)
50         cl.Close()
51 }
52
53 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
54         cfg := TestingConfig()
55         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
56         require.NoError(t, err)
57         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
58         defer ci.Close()
59         cfg.DefaultStorage = ci
60         cl, err := NewClient(cfg)
61         require.NoError(t, err)
62         cl.Close()
63         // And again, https://github.com/anacrolix/torrent/issues/158
64         cl, err = NewClient(cfg)
65         require.NoError(t, err)
66         cl.Close()
67 }
68
69 func TestAddDropTorrent(t *testing.T) {
70         cl, err := NewClient(TestingConfig())
71         require.NoError(t, err)
72         defer cl.Close()
73         dir, mi := testutil.GreetingTestTorrent()
74         defer os.RemoveAll(dir)
75         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
76         require.NoError(t, err)
77         assert.True(t, new)
78         tt.SetMaxEstablishedConns(0)
79         tt.SetMaxEstablishedConns(1)
80         tt.Drop()
81 }
82
83 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
84         // TODO?
85         t.SkipNow()
86 }
87
88 func TestAddTorrentNoUsableURLs(t *testing.T) {
89         // TODO?
90         t.SkipNow()
91 }
92
93 func TestAddPeersToUnknownTorrent(t *testing.T) {
94         // TODO?
95         t.SkipNow()
96 }
97
98 func TestPieceHashSize(t *testing.T) {
99         assert.Equal(t, 20, pieceHash.Size())
100 }
101
102 func TestTorrentInitialState(t *testing.T) {
103         dir, mi := testutil.GreetingTestTorrent()
104         defer os.RemoveAll(dir)
105         cl := &Client{
106                 config: &ClientConfig{},
107         }
108         cl.initLogger()
109         tor := cl.newTorrent(
110                 mi.HashInfoBytes(),
111                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
112         )
113         tor.setChunkSize(2)
114         tor.cl.lock()
115         err := tor.setInfoBytes(mi.InfoBytes)
116         tor.cl.unlock()
117         require.NoError(t, err)
118         require.Len(t, tor.pieces, 3)
119         tor.pendAllChunkSpecs(0)
120         tor.cl.lock()
121         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
122         tor.cl.unlock()
123         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
124 }
125
126 func TestReducedDialTimeout(t *testing.T) {
127         cfg := NewDefaultClientConfig()
128         for _, _case := range []struct {
129                 Max             time.Duration
130                 HalfOpenLimit   int
131                 PendingPeers    int
132                 ExpectedReduced time.Duration
133         }{
134                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
135                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
136                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
137                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
138                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
139                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
140         } {
141                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
142                 expected := _case.ExpectedReduced
143                 if expected < cfg.MinDialTimeout {
144                         expected = cfg.MinDialTimeout
145                 }
146                 if reduced != expected {
147                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
148                 }
149         }
150 }
151
152 func TestAddDropManyTorrents(t *testing.T) {
153         cl, err := NewClient(TestingConfig())
154         require.NoError(t, err)
155         defer cl.Close()
156         for i := range iter.N(1000) {
157                 var spec TorrentSpec
158                 binary.PutVarint(spec.InfoHash[:], int64(i))
159                 tt, new, err := cl.AddTorrentSpec(&spec)
160                 assert.NoError(t, err)
161                 assert.True(t, new)
162                 defer tt.Drop()
163         }
164 }
165
166 type FileCacheClientStorageFactoryParams struct {
167         Capacity    int64
168         SetCapacity bool
169         Wrapper     func(*filecache.Cache) storage.ClientImpl
170 }
171
172 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
173         return func(dataDir string) storage.ClientImpl {
174                 fc, err := filecache.NewCache(dataDir)
175                 if err != nil {
176                         panic(err)
177                 }
178                 if ps.SetCapacity {
179                         fc.SetCapacity(ps.Capacity)
180                 }
181                 return ps.Wrapper(fc)
182         }
183 }
184
185 type storageFactory func(string) storage.ClientImpl
186
187 func TestClientTransferDefault(t *testing.T) {
188         testClientTransfer(t, testClientTransferParams{
189                 ExportClientStatus: true,
190                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
191                         Wrapper: fileCachePieceResourceStorage,
192                 }),
193         })
194 }
195
196 func TestClientTransferRateLimitedUpload(t *testing.T) {
197         started := time.Now()
198         testClientTransfer(t, testClientTransferParams{
199                 // We are uploading 13 bytes (the length of the greeting torrent). The
200                 // chunks are 2 bytes in length. Then the smallest burst we can run
201                 // with is 2. Time taken is (13-burst)/rate.
202                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
203                 ExportClientStatus:      true,
204         })
205         require.True(t, time.Since(started) > time.Second)
206 }
207
208 func TestClientTransferRateLimitedDownload(t *testing.T) {
209         testClientTransfer(t, testClientTransferParams{
210                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
211         })
212 }
213
214 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
215         return storage.NewResourcePieces(fc.AsResourceProvider())
216 }
217
218 func TestClientTransferSmallCache(t *testing.T) {
219         testClientTransfer(t, testClientTransferParams{
220                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
221                         SetCapacity: true,
222                         // Going below the piece length means it can't complete a piece so
223                         // that it can be hashed.
224                         Capacity: 5,
225                         Wrapper:  fileCachePieceResourceStorage,
226                 }),
227                 SetReadahead: true,
228                 // Can't readahead too far or the cache will thrash and drop data we
229                 // thought we had.
230                 Readahead:          0,
231                 ExportClientStatus: true,
232         })
233 }
234
235 func TestClientTransferVarious(t *testing.T) {
236         // Leecher storage
237         for _, ls := range []storageFactory{
238                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
239                         Wrapper: fileCachePieceResourceStorage,
240                 }),
241                 storage.NewBoltDB,
242         } {
243                 // Seeder storage
244                 for _, ss := range []func(string) storage.ClientImpl{
245                         storage.NewFile,
246                         storage.NewMMap,
247                 } {
248                         for _, responsive := range []bool{false, true} {
249                                 testClientTransfer(t, testClientTransferParams{
250                                         Responsive:     responsive,
251                                         SeederStorage:  ss,
252                                         LeecherStorage: ls,
253                                 })
254                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
255                                         testClientTransfer(t, testClientTransferParams{
256                                                 SeederStorage:  ss,
257                                                 Responsive:     responsive,
258                                                 SetReadahead:   true,
259                                                 Readahead:      readahead,
260                                                 LeecherStorage: ls,
261                                         })
262                                 }
263                         }
264                 }
265         }
266 }
267
268 type testClientTransferParams struct {
269         Responsive                 bool
270         Readahead                  int64
271         SetReadahead               bool
272         ExportClientStatus         bool
273         LeecherStorage             func(string) storage.ClientImpl
274         SeederStorage              func(string) storage.ClientImpl
275         SeederUploadRateLimiter    *rate.Limiter
276         LeecherDownloadRateLimiter *rate.Limiter
277 }
278
279 // Creates a seeder and a leecher, and ensures the data transfers when a read
280 // is attempted on the leecher.
281 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
282         greetingTempDir, mi := testutil.GreetingTestTorrent()
283         defer os.RemoveAll(greetingTempDir)
284         // Create seeder and a Torrent.
285         cfg := TestingConfig()
286         cfg.Seed = true
287         if ps.SeederUploadRateLimiter != nil {
288                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
289         }
290         // cfg.ListenAddr = "localhost:4000"
291         if ps.SeederStorage != nil {
292                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
293                 defer cfg.DefaultStorage.Close()
294         } else {
295                 cfg.DataDir = greetingTempDir
296         }
297         seeder, err := NewClient(cfg)
298         require.NoError(t, err)
299         if ps.ExportClientStatus {
300                 defer testutil.ExportStatusWriter(seeder, "s")()
301         }
302         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
303         // Run a Stats right after Closing the Client. This will trigger the Stats
304         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
305         defer seederTorrent.Stats()
306         defer seeder.Close()
307         seederTorrent.VerifyData()
308         // Create leecher and a Torrent.
309         leecherDataDir, err := ioutil.TempDir("", "")
310         require.NoError(t, err)
311         defer os.RemoveAll(leecherDataDir)
312         cfg = TestingConfig()
313         if ps.LeecherStorage == nil {
314                 cfg.DataDir = leecherDataDir
315         } else {
316                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
317         }
318         if ps.LeecherDownloadRateLimiter != nil {
319                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
320         }
321         cfg.Seed = false
322         leecher, err := NewClient(cfg)
323         require.NoError(t, err)
324         defer leecher.Close()
325         if ps.ExportClientStatus {
326                 defer testutil.ExportStatusWriter(leecher, "l")()
327         }
328         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
329                 ret = TorrentSpecFromMetaInfo(mi)
330                 ret.ChunkSize = 2
331                 return
332         }())
333         require.NoError(t, err)
334         assert.True(t, new)
335         // Now do some things with leecher and seeder.
336         leecherTorrent.AddClientPeer(seeder)
337         // The Torrent should not be interested in obtaining peers, so the one we
338         // just added should be the only one.
339         assert.False(t, leecherTorrent.Seeding())
340         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
341         r := leecherTorrent.NewReader()
342         defer r.Close()
343         if ps.Responsive {
344                 r.SetResponsive()
345         }
346         if ps.SetReadahead {
347                 r.SetReadahead(ps.Readahead)
348         }
349         assertReadAllGreeting(t, r)
350
351         seederStats := seederTorrent.Stats()
352         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
353         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
354
355         leecherStats := leecherTorrent.Stats()
356         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
357         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
358
359         // Try reading through again for the cases where the torrent data size
360         // exceeds the size of the cache.
361         assertReadAllGreeting(t, r)
362 }
363
364 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
365         pos, err := r.Seek(0, io.SeekStart)
366         assert.NoError(t, err)
367         assert.EqualValues(t, 0, pos)
368         _greeting, err := ioutil.ReadAll(r)
369         assert.NoError(t, err)
370         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
371 }
372
373 // Check that after completing leeching, a leecher transitions to a seeding
374 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
375 func TestSeedAfterDownloading(t *testing.T) {
376         greetingTempDir, mi := testutil.GreetingTestTorrent()
377         defer os.RemoveAll(greetingTempDir)
378
379         cfg := TestingConfig()
380         cfg.Seed = true
381         cfg.DataDir = greetingTempDir
382         seeder, err := NewClient(cfg)
383         require.NoError(t, err)
384         defer seeder.Close()
385         defer testutil.ExportStatusWriter(seeder, "s")()
386         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
387         require.NoError(t, err)
388         assert.True(t, ok)
389         seederTorrent.VerifyData()
390
391         cfg = TestingConfig()
392         cfg.Seed = true
393         cfg.DataDir, err = ioutil.TempDir("", "")
394         require.NoError(t, err)
395         defer os.RemoveAll(cfg.DataDir)
396         leecher, err := NewClient(cfg)
397         require.NoError(t, err)
398         defer leecher.Close()
399         defer testutil.ExportStatusWriter(leecher, "l")()
400
401         cfg = TestingConfig()
402         cfg.Seed = false
403         cfg.DataDir, err = ioutil.TempDir("", "")
404         require.NoError(t, err)
405         defer os.RemoveAll(cfg.DataDir)
406         leecherLeecher, _ := NewClient(cfg)
407         require.NoError(t, err)
408         defer leecherLeecher.Close()
409         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
410         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
411                 ret = TorrentSpecFromMetaInfo(mi)
412                 ret.ChunkSize = 2
413                 return
414         }())
415         require.NoError(t, err)
416         assert.True(t, ok)
417         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
418                 ret = TorrentSpecFromMetaInfo(mi)
419                 ret.ChunkSize = 3
420                 return
421         }())
422         require.NoError(t, err)
423         assert.True(t, ok)
424         // Simultaneously DownloadAll in Leecher, and read the contents
425         // consecutively in LeecherLeecher. This non-deterministically triggered a
426         // case where the leecher wouldn't unchoke the LeecherLeecher.
427         var wg sync.WaitGroup
428         wg.Add(1)
429         go func() {
430                 defer wg.Done()
431                 r := llg.NewReader()
432                 defer r.Close()
433                 b, err := ioutil.ReadAll(r)
434                 require.NoError(t, err)
435                 assert.EqualValues(t, testutil.GreetingFileContents, b)
436         }()
437         done := make(chan struct{})
438         defer close(done)
439         go leecherGreeting.AddClientPeer(seeder)
440         go leecherGreeting.AddClientPeer(leecherLeecher)
441         wg.Add(1)
442         go func() {
443                 defer wg.Done()
444                 leecherGreeting.DownloadAll()
445                 leecher.WaitAll()
446         }()
447         wg.Wait()
448 }
449
450 func TestMergingTrackersByAddingSpecs(t *testing.T) {
451         cl, err := NewClient(TestingConfig())
452         require.NoError(t, err)
453         defer cl.Close()
454         spec := TorrentSpec{}
455         T, new, _ := cl.AddTorrentSpec(&spec)
456         if !new {
457                 t.FailNow()
458         }
459         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
460         _, new, _ = cl.AddTorrentSpec(&spec)
461         assert.False(t, new)
462         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
463         // Because trackers are disabled in TestingConfig.
464         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
465 }
466
467 // We read from a piece which is marked completed, but is missing data.
468 func TestCompletedPieceWrongSize(t *testing.T) {
469         cfg := TestingConfig()
470         cfg.DefaultStorage = badStorage{}
471         cl, err := NewClient(cfg)
472         require.NoError(t, err)
473         defer cl.Close()
474         info := metainfo.Info{
475                 PieceLength: 15,
476                 Pieces:      make([]byte, 20),
477                 Files: []metainfo.FileInfo{
478                         {Path: []string{"greeting"}, Length: 13},
479                 },
480         }
481         b, err := bencode.Marshal(info)
482         require.NoError(t, err)
483         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
484                 InfoBytes: b,
485                 InfoHash:  metainfo.HashBytes(b),
486         })
487         require.NoError(t, err)
488         defer tt.Drop()
489         assert.True(t, new)
490         r := tt.NewReader()
491         defer r.Close()
492         b, err = ioutil.ReadAll(r)
493         assert.Len(t, b, 13)
494         assert.NoError(t, err)
495 }
496
497 func BenchmarkAddLargeTorrent(b *testing.B) {
498         cfg := TestingConfig()
499         cfg.DisableTCP = true
500         cfg.DisableUTP = true
501         cl, err := NewClient(cfg)
502         require.NoError(b, err)
503         defer cl.Close()
504         for range iter.N(b.N) {
505                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
506                 if err != nil {
507                         b.Fatal(err)
508                 }
509                 t.Drop()
510         }
511 }
512
513 func TestResponsive(t *testing.T) {
514         seederDataDir, mi := testutil.GreetingTestTorrent()
515         defer os.RemoveAll(seederDataDir)
516         cfg := TestingConfig()
517         cfg.Seed = true
518         cfg.DataDir = seederDataDir
519         seeder, err := NewClient(cfg)
520         require.Nil(t, err)
521         defer seeder.Close()
522         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
523         seederTorrent.VerifyData()
524         leecherDataDir, err := ioutil.TempDir("", "")
525         require.Nil(t, err)
526         defer os.RemoveAll(leecherDataDir)
527         cfg = TestingConfig()
528         cfg.DataDir = leecherDataDir
529         leecher, err := NewClient(cfg)
530         require.Nil(t, err)
531         defer leecher.Close()
532         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
533                 ret = TorrentSpecFromMetaInfo(mi)
534                 ret.ChunkSize = 2
535                 return
536         }())
537         leecherTorrent.AddClientPeer(seeder)
538         reader := leecherTorrent.NewReader()
539         defer reader.Close()
540         reader.SetReadahead(0)
541         reader.SetResponsive()
542         b := make([]byte, 2)
543         _, err = reader.Seek(3, io.SeekStart)
544         require.NoError(t, err)
545         _, err = io.ReadFull(reader, b)
546         assert.Nil(t, err)
547         assert.EqualValues(t, "lo", string(b))
548         _, err = reader.Seek(11, io.SeekStart)
549         require.NoError(t, err)
550         n, err := io.ReadFull(reader, b)
551         assert.Nil(t, err)
552         assert.EqualValues(t, 2, n)
553         assert.EqualValues(t, "d\n", string(b))
554 }
555
556 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
557         seederDataDir, mi := testutil.GreetingTestTorrent()
558         defer os.RemoveAll(seederDataDir)
559         cfg := TestingConfig()
560         cfg.Seed = true
561         cfg.DataDir = seederDataDir
562         seeder, err := NewClient(cfg)
563         require.Nil(t, err)
564         defer seeder.Close()
565         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
566         seederTorrent.VerifyData()
567         leecherDataDir, err := ioutil.TempDir("", "")
568         require.Nil(t, err)
569         defer os.RemoveAll(leecherDataDir)
570         cfg = TestingConfig()
571         cfg.DataDir = leecherDataDir
572         leecher, err := NewClient(cfg)
573         require.Nil(t, err)
574         defer leecher.Close()
575         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
576                 ret = TorrentSpecFromMetaInfo(mi)
577                 ret.ChunkSize = 2
578                 return
579         }())
580         leecherTorrent.AddClientPeer(seeder)
581         reader := leecherTorrent.NewReader()
582         defer reader.Close()
583         reader.SetReadahead(0)
584         reader.SetResponsive()
585         b := make([]byte, 2)
586         _, err = reader.Seek(3, io.SeekStart)
587         require.NoError(t, err)
588         _, err = io.ReadFull(reader, b)
589         assert.Nil(t, err)
590         assert.EqualValues(t, "lo", string(b))
591         go leecherTorrent.Drop()
592         _, err = reader.Seek(11, io.SeekStart)
593         require.NoError(t, err)
594         n, err := reader.Read(b)
595         assert.EqualError(t, err, "torrent closed")
596         assert.EqualValues(t, 0, n)
597 }
598
599 func TestDHTInheritBlocklist(t *testing.T) {
600         ipl := iplist.New(nil)
601         require.NotNil(t, ipl)
602         cfg := TestingConfig()
603         cfg.IPBlocklist = ipl
604         cfg.NoDHT = false
605         cl, err := NewClient(cfg)
606         require.NoError(t, err)
607         defer cl.Close()
608         numServers := 0
609         cl.eachDhtServer(func(s *dht.Server) {
610                 assert.Equal(t, ipl, s.IPBlocklist())
611                 numServers++
612         })
613         assert.EqualValues(t, 2, numServers)
614 }
615
616 // Check that stuff is merged in subsequent AddTorrentSpec for the same
617 // infohash.
618 func TestAddTorrentSpecMerging(t *testing.T) {
619         cl, err := NewClient(TestingConfig())
620         require.NoError(t, err)
621         defer cl.Close()
622         dir, mi := testutil.GreetingTestTorrent()
623         defer os.RemoveAll(dir)
624         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
625                 InfoHash: mi.HashInfoBytes(),
626         })
627         require.NoError(t, err)
628         require.True(t, new)
629         require.Nil(t, tt.Info())
630         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
631         require.NoError(t, err)
632         require.False(t, new)
633         require.NotNil(t, tt.Info())
634 }
635
636 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
637         dir, mi := testutil.GreetingTestTorrent()
638         os.RemoveAll(dir)
639         cl, _ := NewClient(TestingConfig())
640         defer cl.Close()
641         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
642                 InfoHash: mi.HashInfoBytes(),
643         })
644         tt.Drop()
645         assert.EqualValues(t, 0, len(cl.Torrents()))
646         select {
647         case <-tt.GotInfo():
648                 t.FailNow()
649         default:
650         }
651 }
652
653 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
654         for i := range iter.N(info.NumPieces()) {
655                 p := info.Piece(i)
656                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
657         }
658 }
659
660 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
661         fileCacheDir, err := ioutil.TempDir("", "")
662         require.NoError(t, err)
663         defer os.RemoveAll(fileCacheDir)
664         fileCache, err := filecache.NewCache(fileCacheDir)
665         require.NoError(t, err)
666         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
667         defer os.RemoveAll(greetingDataTempDir)
668         filePieceStore := csf(fileCache)
669         defer filePieceStore.Close()
670         info, err := greetingMetainfo.UnmarshalInfo()
671         require.NoError(t, err)
672         ih := greetingMetainfo.HashInfoBytes()
673         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
674         require.NoError(t, err)
675         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
676         // require.Equal(t, len(testutil.GreetingFileContents), written)
677         // require.NoError(t, err)
678         for i := 0; i < info.NumPieces(); i++ {
679                 p := info.Piece(i)
680                 if alreadyCompleted {
681                         require.NoError(t, greetingData.Piece(p).MarkComplete())
682                 }
683         }
684         cfg := TestingConfig()
685         // TODO: Disable network option?
686         cfg.DisableTCP = true
687         cfg.DisableUTP = true
688         cfg.DefaultStorage = filePieceStore
689         cl, err := NewClient(cfg)
690         require.NoError(t, err)
691         defer cl.Close()
692         tt, err := cl.AddTorrent(greetingMetainfo)
693         require.NoError(t, err)
694         psrs := tt.PieceStateRuns()
695         assert.Len(t, psrs, 1)
696         assert.EqualValues(t, 3, psrs[0].Length)
697         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
698         if alreadyCompleted {
699                 r := tt.NewReader()
700                 b, err := ioutil.ReadAll(r)
701                 assert.NoError(t, err)
702                 assert.EqualValues(t, testutil.GreetingFileContents, b)
703         }
704 }
705
706 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
707         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
708 }
709
710 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
711         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
712 }
713
714 func TestAddMetainfoWithNodes(t *testing.T) {
715         cfg := TestingConfig()
716         cfg.ListenHost = func(string) string { return "" }
717         cfg.NoDHT = false
718         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
719         // For now, we want to just jam the nodes into the table, without
720         // verifying them first. Also the DHT code doesn't support mixing secure
721         // and insecure nodes if security is enabled (yet).
722         // cfg.DHTConfig.NoSecurity = true
723         cl, err := NewClient(cfg)
724         require.NoError(t, err)
725         defer cl.Close()
726         sum := func() (ret int64) {
727                 cl.eachDhtServer(func(s *dht.Server) {
728                         ret += s.Stats().OutboundQueriesAttempted
729                 })
730                 return
731         }
732         assert.EqualValues(t, 0, sum())
733         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
734         require.NoError(t, err)
735         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
736         // check if the announce-list is here instead. TODO: Add nodes.
737         assert.Len(t, tt.metainfo.AnnounceList, 5)
738         // There are 6 nodes in the torrent file.
739         assert.EqualValues(t, 6*len(cl.dhtServers), sum())
740 }
741
742 type testDownloadCancelParams struct {
743         SetLeecherStorageCapacity bool
744         LeecherStorageCapacity    int64
745         Cancel                    bool
746 }
747
748 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
749         greetingTempDir, mi := testutil.GreetingTestTorrent()
750         defer os.RemoveAll(greetingTempDir)
751         cfg := TestingConfig()
752         cfg.Seed = true
753         cfg.DataDir = greetingTempDir
754         seeder, err := NewClient(cfg)
755         require.NoError(t, err)
756         defer seeder.Close()
757         defer testutil.ExportStatusWriter(seeder, "s")()
758         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
759         seederTorrent.VerifyData()
760         leecherDataDir, err := ioutil.TempDir("", "")
761         require.NoError(t, err)
762         defer os.RemoveAll(leecherDataDir)
763         fc, err := filecache.NewCache(leecherDataDir)
764         require.NoError(t, err)
765         if ps.SetLeecherStorageCapacity {
766                 fc.SetCapacity(ps.LeecherStorageCapacity)
767         }
768         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
769         cfg.DataDir = leecherDataDir
770         leecher, _ := NewClient(cfg)
771         defer leecher.Close()
772         defer testutil.ExportStatusWriter(leecher, "l")()
773         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
774                 ret = TorrentSpecFromMetaInfo(mi)
775                 ret.ChunkSize = 2
776                 return
777         }())
778         require.NoError(t, err)
779         assert.True(t, new)
780         psc := leecherGreeting.SubscribePieceStateChanges()
781         defer psc.Close()
782
783         leecherGreeting.cl.lock()
784         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
785         if ps.Cancel {
786                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
787         }
788         leecherGreeting.cl.unlock()
789         done := make(chan struct{})
790         defer close(done)
791         go leecherGreeting.AddClientPeer(seeder)
792         completes := make(map[int]bool, 3)
793         expected := func() map[int]bool {
794                 if ps.Cancel {
795                         return map[int]bool{0: false, 1: false, 2: false}
796                 } else {
797                         return map[int]bool{0: true, 1: true, 2: true}
798                 }
799         }()
800         for !reflect.DeepEqual(completes, expected) {
801                 select {
802                 case _v := <-psc.Values:
803                         v := _v.(PieceStateChange)
804                         completes[v.Index] = v.Complete
805                 }
806         }
807 }
808
809 func TestTorrentDownloadAll(t *testing.T) {
810         testDownloadCancel(t, testDownloadCancelParams{})
811 }
812
813 func TestTorrentDownloadAllThenCancel(t *testing.T) {
814         testDownloadCancel(t, testDownloadCancelParams{
815                 Cancel: true,
816         })
817 }
818
819 // Ensure that it's an error for a peer to send an invalid have message.
820 func TestPeerInvalidHave(t *testing.T) {
821         cl, err := NewClient(TestingConfig())
822         require.NoError(t, err)
823         defer cl.Close()
824         info := metainfo.Info{
825                 PieceLength: 1,
826                 Pieces:      make([]byte, 20),
827                 Files:       []metainfo.FileInfo{{Length: 1}},
828         }
829         infoBytes, err := bencode.Marshal(info)
830         require.NoError(t, err)
831         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
832                 InfoBytes: infoBytes,
833                 InfoHash:  metainfo.HashBytes(infoBytes),
834                 Storage:   badStorage{},
835         })
836         require.NoError(t, err)
837         assert.True(t, _new)
838         defer tt.Drop()
839         cn := &connection{
840                 t: tt,
841         }
842         assert.NoError(t, cn.peerSentHave(0))
843         assert.Error(t, cn.peerSentHave(1))
844 }
845
846 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
847         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
848         defer os.RemoveAll(greetingTempDir)
849         cfg := TestingConfig()
850         cfg.DataDir = greetingTempDir
851         seeder, err := NewClient(TestingConfig())
852         require.NoError(t, err)
853         seeder.AddTorrentSpec(&TorrentSpec{
854                 InfoBytes: greetingMetainfo.InfoBytes,
855         })
856 }
857
858 // Check that when the listen port is 0, all the protocols listened on have
859 // the same port, and it isn't zero.
860 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
861         cl, err := NewClient(TestingConfig())
862         require.NoError(t, err)
863         defer cl.Close()
864         port := cl.LocalPort()
865         assert.NotEqual(t, 0, port)
866         cl.eachListener(func(s socket) bool {
867                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
868                 return true
869         })
870 }
871
872 func TestClientDynamicListenTCPOnly(t *testing.T) {
873         cfg := TestingConfig()
874         cfg.DisableUTP = true
875         cfg.DisableTCP = false
876         cl, err := NewClient(cfg)
877         require.NoError(t, err)
878         defer cl.Close()
879         assert.NotEqual(t, 0, cl.LocalPort())
880 }
881
882 func TestClientDynamicListenUTPOnly(t *testing.T) {
883         cfg := TestingConfig()
884         cfg.DisableTCP = true
885         cfg.DisableUTP = false
886         cl, err := NewClient(cfg)
887         require.NoError(t, err)
888         defer cl.Close()
889         assert.NotEqual(t, 0, cl.LocalPort())
890 }
891
892 func totalConns(tts []*Torrent) (ret int) {
893         for _, tt := range tts {
894                 tt.cl.lock()
895                 ret += len(tt.conns)
896                 tt.cl.unlock()
897         }
898         return
899 }
900
901 func TestSetMaxEstablishedConn(t *testing.T) {
902         var tts []*Torrent
903         ih := testutil.GreetingMetaInfo().HashInfoBytes()
904         cfg := TestingConfig()
905         cfg.DisableAcceptRateLimiting = true
906         cfg.dropDuplicatePeerIds = true
907         for i := range iter.N(3) {
908                 cl, err := NewClient(cfg)
909                 require.NoError(t, err)
910                 defer cl.Close()
911                 tt, _ := cl.AddTorrentInfoHash(ih)
912                 tt.SetMaxEstablishedConns(2)
913                 defer testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))()
914                 tts = append(tts, tt)
915         }
916         addPeers := func() {
917                 for _, tt := range tts {
918                         for _, _tt := range tts {
919                                 // if tt != _tt {
920                                 tt.AddClientPeer(_tt.cl)
921                                 // }
922                         }
923                 }
924         }
925         waitTotalConns := func(num int) {
926                 for totalConns(tts) != num {
927                         addPeers()
928                         time.Sleep(time.Millisecond)
929                 }
930         }
931         addPeers()
932         waitTotalConns(6)
933         tts[0].SetMaxEstablishedConns(1)
934         waitTotalConns(4)
935         tts[0].SetMaxEstablishedConns(0)
936         waitTotalConns(2)
937         tts[0].SetMaxEstablishedConns(1)
938         addPeers()
939         waitTotalConns(4)
940         tts[0].SetMaxEstablishedConns(2)
941         addPeers()
942         waitTotalConns(6)
943 }
944
945 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
946         os.MkdirAll(dir, 0770)
947         file, err := os.Create(filepath.Join(dir, name))
948         require.NoError(t, err)
949         file.Write([]byte(name))
950         file.Close()
951         mi := metainfo.MetaInfo{}
952         mi.SetDefaults()
953         info := metainfo.Info{PieceLength: 256 * 1024}
954         err = info.BuildFromFilePath(filepath.Join(dir, name))
955         require.NoError(t, err)
956         mi.InfoBytes, err = bencode.Marshal(info)
957         require.NoError(t, err)
958         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
959         tr, err := cl.AddTorrent(&mi)
960         require.NoError(t, err)
961         require.True(t, tr.Seeding())
962         tr.VerifyData()
963         return magnet
964 }
965
966 // https://github.com/anacrolix/torrent/issues/114
967 func TestMultipleTorrentsWithEncryption(t *testing.T) {
968         cfg := TestingConfig()
969         cfg.DisableUTP = true
970         cfg.Seed = true
971         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
972         cfg.ForceEncryption = true
973         os.Mkdir(cfg.DataDir, 0755)
974         server, err := NewClient(cfg)
975         require.NoError(t, err)
976         defer server.Close()
977         defer testutil.ExportStatusWriter(server, "s")()
978         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
979         makeMagnet(t, server, cfg.DataDir, "test2")
980         cfg = TestingConfig()
981         cfg.DisableUTP = true
982         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
983         cfg.ForceEncryption = true
984         client, err := NewClient(cfg)
985         require.NoError(t, err)
986         defer client.Close()
987         defer testutil.ExportStatusWriter(client, "c")()
988         tr, err := client.AddMagnet(magnet1)
989         require.NoError(t, err)
990         tr.AddClientPeer(server)
991         <-tr.GotInfo()
992         tr.DownloadAll()
993         client.WaitAll()
994 }
995
996 func TestClientAddressInUse(t *testing.T) {
997         s, _ := NewUtpSocket("udp", ":50007", nil)
998         if s != nil {
999                 defer s.Close()
1000         }
1001         cfg := TestingConfig().SetListenAddr(":50007")
1002         cl, err := NewClient(cfg)
1003         require.Error(t, err)
1004         require.Nil(t, cl)
1005 }
1006
1007 func TestClientHasDhtServersWhenUtpDisabled(t *testing.T) {
1008         cc := TestingConfig()
1009         cc.DisableUTP = true
1010         cc.NoDHT = false
1011         cl, err := NewClient(cc)
1012         require.NoError(t, err)
1013         defer cl.Close()
1014         assert.NotEmpty(t, cl.DhtServers())
1015 }