]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Coalesce piece state change notifications on client unlock
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "log"
9         "os"
10         "path/filepath"
11         "reflect"
12         "sync"
13         "testing"
14         "time"
15
16         "github.com/bradfitz/iter"
17         "github.com/stretchr/testify/assert"
18         "github.com/stretchr/testify/require"
19         "golang.org/x/time/rate"
20
21         "github.com/anacrolix/dht/v2"
22         _ "github.com/anacrolix/envpprof"
23         "github.com/anacrolix/missinggo"
24         "github.com/anacrolix/missinggo/filecache"
25
26         "github.com/anacrolix/torrent/bencode"
27         "github.com/anacrolix/torrent/internal/testutil"
28         "github.com/anacrolix/torrent/iplist"
29         "github.com/anacrolix/torrent/metainfo"
30         "github.com/anacrolix/torrent/storage"
31 )
32
33 func TestingConfig() *ClientConfig {
34         cfg := NewDefaultClientConfig()
35         cfg.ListenHost = LoopbackListenHost
36         cfg.NoDHT = true
37         cfg.DataDir = tempDir()
38         cfg.DisableTrackers = true
39         cfg.NoDefaultPortForwarding = true
40         cfg.DisableAcceptRateLimiting = true
41         cfg.ListenPort = 0
42         //cfg.Debug = true
43         //cfg.Logger = cfg.Logger.WithText(func(m log.Msg) string {
44         //      t := m.Text()
45         //      m.Values(func(i interface{}) bool {
46         //              t += fmt.Sprintf("\n%[1]T: %[1]v", i)
47         //              return true
48         //      })
49         //      return t
50         //})
51         return cfg
52 }
53
54 func TestClientDefault(t *testing.T) {
55         cl, err := NewClient(TestingConfig())
56         require.NoError(t, err)
57         cl.Close()
58 }
59
60 func TestClientNilConfig(t *testing.T) {
61         cl, err := NewClient(nil)
62         require.NoError(t, err)
63         cl.Close()
64 }
65
66 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
67         cfg := TestingConfig()
68         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
69         require.NoError(t, err)
70         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
71         defer ci.Close()
72         cfg.DefaultStorage = ci
73         cl, err := NewClient(cfg)
74         require.NoError(t, err)
75         cl.Close()
76         // And again, https://github.com/anacrolix/torrent/issues/158
77         cl, err = NewClient(cfg)
78         require.NoError(t, err)
79         cl.Close()
80 }
81
82 func TestAddDropTorrent(t *testing.T) {
83         cl, err := NewClient(TestingConfig())
84         require.NoError(t, err)
85         defer cl.Close()
86         dir, mi := testutil.GreetingTestTorrent()
87         defer os.RemoveAll(dir)
88         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
89         require.NoError(t, err)
90         assert.True(t, new)
91         tt.SetMaxEstablishedConns(0)
92         tt.SetMaxEstablishedConns(1)
93         tt.Drop()
94 }
95
96 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
97         // TODO?
98         t.SkipNow()
99 }
100
101 func TestAddTorrentNoUsableURLs(t *testing.T) {
102         // TODO?
103         t.SkipNow()
104 }
105
106 func TestAddPeersToUnknownTorrent(t *testing.T) {
107         // TODO?
108         t.SkipNow()
109 }
110
111 func TestPieceHashSize(t *testing.T) {
112         assert.Equal(t, 20, pieceHash.Size())
113 }
114
115 func TestTorrentInitialState(t *testing.T) {
116         dir, mi := testutil.GreetingTestTorrent()
117         defer os.RemoveAll(dir)
118         cl := &Client{
119                 config: TestingConfig(),
120         }
121         cl.initLogger()
122         tor := cl.newTorrent(
123                 mi.HashInfoBytes(),
124                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
125         )
126         tor.setChunkSize(2)
127         tor.cl.lock()
128         err := tor.setInfoBytes(mi.InfoBytes)
129         tor.cl.unlock()
130         require.NoError(t, err)
131         require.Len(t, tor.pieces, 3)
132         tor.pendAllChunkSpecs(0)
133         tor.cl.lock()
134         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
135         tor.cl.unlock()
136         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
137 }
138
139 func TestReducedDialTimeout(t *testing.T) {
140         cfg := NewDefaultClientConfig()
141         for _, _case := range []struct {
142                 Max             time.Duration
143                 HalfOpenLimit   int
144                 PendingPeers    int
145                 ExpectedReduced time.Duration
146         }{
147                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
148                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
149                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
150                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
151                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
152                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
153         } {
154                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
155                 expected := _case.ExpectedReduced
156                 if expected < cfg.MinDialTimeout {
157                         expected = cfg.MinDialTimeout
158                 }
159                 if reduced != expected {
160                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
161                 }
162         }
163 }
164
165 func TestAddDropManyTorrents(t *testing.T) {
166         cl, err := NewClient(TestingConfig())
167         require.NoError(t, err)
168         defer cl.Close()
169         for i := range iter.N(1000) {
170                 var spec TorrentSpec
171                 binary.PutVarint(spec.InfoHash[:], int64(i))
172                 tt, new, err := cl.AddTorrentSpec(&spec)
173                 assert.NoError(t, err)
174                 assert.True(t, new)
175                 defer tt.Drop()
176         }
177 }
178
179 type FileCacheClientStorageFactoryParams struct {
180         Capacity    int64
181         SetCapacity bool
182         Wrapper     func(*filecache.Cache) storage.ClientImpl
183 }
184
185 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
186         return func(dataDir string) storage.ClientImpl {
187                 fc, err := filecache.NewCache(dataDir)
188                 if err != nil {
189                         panic(err)
190                 }
191                 if ps.SetCapacity {
192                         fc.SetCapacity(ps.Capacity)
193                 }
194                 return ps.Wrapper(fc)
195         }
196 }
197
198 type storageFactory func(string) storage.ClientImpl
199
200 func TestClientTransferDefault(t *testing.T) {
201         testClientTransfer(t, testClientTransferParams{
202                 ExportClientStatus: true,
203                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
204                         Wrapper: fileCachePieceResourceStorage,
205                 }),
206         })
207 }
208
209 func TestClientTransferRateLimitedUpload(t *testing.T) {
210         started := time.Now()
211         testClientTransfer(t, testClientTransferParams{
212                 // We are uploading 13 bytes (the length of the greeting torrent). The
213                 // chunks are 2 bytes in length. Then the smallest burst we can run
214                 // with is 2. Time taken is (13-burst)/rate.
215                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
216                 ExportClientStatus:      true,
217         })
218         require.True(t, time.Since(started) > time.Second)
219 }
220
221 func TestClientTransferRateLimitedDownload(t *testing.T) {
222         testClientTransfer(t, testClientTransferParams{
223                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
224         })
225 }
226
227 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
228         return storage.NewResourcePieces(fc.AsResourceProvider())
229 }
230
231 func TestClientTransferSmallCache(t *testing.T) {
232         testClientTransfer(t, testClientTransferParams{
233                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
234                         SetCapacity: true,
235                         // Going below the piece length means it can't complete a piece so
236                         // that it can be hashed.
237                         Capacity: 5,
238                         Wrapper:  fileCachePieceResourceStorage,
239                 }),
240                 SetReadahead: true,
241                 // Can't readahead too far or the cache will thrash and drop data we
242                 // thought we had.
243                 Readahead:          0,
244                 ExportClientStatus: true,
245         })
246 }
247
248 func TestClientTransferVarious(t *testing.T) {
249         // Leecher storage
250         for _, ls := range []storageFactory{
251                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
252                         Wrapper: fileCachePieceResourceStorage,
253                 }),
254                 storage.NewBoltDB,
255         } {
256                 // Seeder storage
257                 for _, ss := range []func(string) storage.ClientImpl{
258                         storage.NewFile,
259                         storage.NewMMap,
260                 } {
261                         for _, responsive := range []bool{false, true} {
262                                 testClientTransfer(t, testClientTransferParams{
263                                         Responsive:     responsive,
264                                         SeederStorage:  ss,
265                                         LeecherStorage: ls,
266                                 })
267                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
268                                         testClientTransfer(t, testClientTransferParams{
269                                                 SeederStorage:  ss,
270                                                 Responsive:     responsive,
271                                                 SetReadahead:   true,
272                                                 Readahead:      readahead,
273                                                 LeecherStorage: ls,
274                                         })
275                                 }
276                         }
277                 }
278         }
279 }
280
281 type testClientTransferParams struct {
282         Responsive                 bool
283         Readahead                  int64
284         SetReadahead               bool
285         ExportClientStatus         bool
286         LeecherStorage             func(string) storage.ClientImpl
287         SeederStorage              func(string) storage.ClientImpl
288         SeederUploadRateLimiter    *rate.Limiter
289         LeecherDownloadRateLimiter *rate.Limiter
290 }
291
292 func logPieceStateChanges(t *Torrent) {
293         sub := t.SubscribePieceStateChanges()
294         go func() {
295                 defer sub.Close()
296                 for e := range sub.Values {
297                         log.Printf("%p %#v", t, e)
298                 }
299         }()
300 }
301
302 // Creates a seeder and a leecher, and ensures the data transfers when a read
303 // is attempted on the leecher.
304 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
305         greetingTempDir, mi := testutil.GreetingTestTorrent()
306         defer os.RemoveAll(greetingTempDir)
307         // Create seeder and a Torrent.
308         cfg := TestingConfig()
309         cfg.Seed = true
310         if ps.SeederUploadRateLimiter != nil {
311                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
312         }
313         // cfg.ListenAddr = "localhost:4000"
314         if ps.SeederStorage != nil {
315                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
316                 defer cfg.DefaultStorage.Close()
317         } else {
318                 cfg.DataDir = greetingTempDir
319         }
320         seeder, err := NewClient(cfg)
321         require.NoError(t, err)
322         if ps.ExportClientStatus {
323                 defer testutil.ExportStatusWriter(seeder, "s")()
324         }
325         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
326         // Run a Stats right after Closing the Client. This will trigger the Stats
327         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
328         defer seederTorrent.Stats()
329         defer seeder.Close()
330         seederTorrent.VerifyData()
331         // Create leecher and a Torrent.
332         leecherDataDir, err := ioutil.TempDir("", "")
333         require.NoError(t, err)
334         defer os.RemoveAll(leecherDataDir)
335         cfg = TestingConfig()
336         if ps.LeecherStorage == nil {
337                 cfg.DataDir = leecherDataDir
338         } else {
339                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
340         }
341         if ps.LeecherDownloadRateLimiter != nil {
342                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
343         }
344         cfg.Seed = false
345         leecher, err := NewClient(cfg)
346         require.NoError(t, err)
347         defer leecher.Close()
348         if ps.ExportClientStatus {
349                 defer testutil.ExportStatusWriter(leecher, "l")()
350         }
351         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
352                 ret = TorrentSpecFromMetaInfo(mi)
353                 ret.ChunkSize = 2
354                 return
355         }())
356         require.NoError(t, err)
357         assert.True(t, new)
358
359         //// This was used when observing coalescing of piece state changes.
360         //logPieceStateChanges(leecherTorrent)
361
362         // Now do some things with leecher and seeder.
363         leecherTorrent.AddClientPeer(seeder)
364         // The Torrent should not be interested in obtaining peers, so the one we
365         // just added should be the only one.
366         assert.False(t, leecherTorrent.Seeding())
367         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
368         r := leecherTorrent.NewReader()
369         defer r.Close()
370         if ps.Responsive {
371                 r.SetResponsive()
372         }
373         if ps.SetReadahead {
374                 r.SetReadahead(ps.Readahead)
375         }
376         assertReadAllGreeting(t, r)
377
378         seederStats := seederTorrent.Stats()
379         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
380         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
381
382         leecherStats := leecherTorrent.Stats()
383         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
384         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
385
386         // Try reading through again for the cases where the torrent data size
387         // exceeds the size of the cache.
388         assertReadAllGreeting(t, r)
389 }
390
391 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
392         pos, err := r.Seek(0, io.SeekStart)
393         assert.NoError(t, err)
394         assert.EqualValues(t, 0, pos)
395         _greeting, err := ioutil.ReadAll(r)
396         assert.NoError(t, err)
397         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
398 }
399
400 // Check that after completing leeching, a leecher transitions to a seeding
401 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
402 func TestSeedAfterDownloading(t *testing.T) {
403         greetingTempDir, mi := testutil.GreetingTestTorrent()
404         defer os.RemoveAll(greetingTempDir)
405
406         cfg := TestingConfig()
407         cfg.Seed = true
408         cfg.DataDir = greetingTempDir
409         seeder, err := NewClient(cfg)
410         require.NoError(t, err)
411         defer seeder.Close()
412         defer testutil.ExportStatusWriter(seeder, "s")()
413         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
414         require.NoError(t, err)
415         assert.True(t, ok)
416         seederTorrent.VerifyData()
417
418         cfg = TestingConfig()
419         cfg.Seed = true
420         cfg.DataDir, err = ioutil.TempDir("", "")
421         require.NoError(t, err)
422         defer os.RemoveAll(cfg.DataDir)
423         leecher, err := NewClient(cfg)
424         require.NoError(t, err)
425         defer leecher.Close()
426         defer testutil.ExportStatusWriter(leecher, "l")()
427
428         cfg = TestingConfig()
429         cfg.Seed = false
430         cfg.DataDir, err = ioutil.TempDir("", "")
431         require.NoError(t, err)
432         defer os.RemoveAll(cfg.DataDir)
433         leecherLeecher, _ := NewClient(cfg)
434         require.NoError(t, err)
435         defer leecherLeecher.Close()
436         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
437         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
438                 ret = TorrentSpecFromMetaInfo(mi)
439                 ret.ChunkSize = 2
440                 return
441         }())
442         require.NoError(t, err)
443         assert.True(t, ok)
444         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
445                 ret = TorrentSpecFromMetaInfo(mi)
446                 ret.ChunkSize = 3
447                 return
448         }())
449         require.NoError(t, err)
450         assert.True(t, ok)
451         // Simultaneously DownloadAll in Leecher, and read the contents
452         // consecutively in LeecherLeecher. This non-deterministically triggered a
453         // case where the leecher wouldn't unchoke the LeecherLeecher.
454         var wg sync.WaitGroup
455         wg.Add(1)
456         go func() {
457                 defer wg.Done()
458                 r := llg.NewReader()
459                 defer r.Close()
460                 b, err := ioutil.ReadAll(r)
461                 require.NoError(t, err)
462                 assert.EqualValues(t, testutil.GreetingFileContents, b)
463         }()
464         done := make(chan struct{})
465         defer close(done)
466         go leecherGreeting.AddClientPeer(seeder)
467         go leecherGreeting.AddClientPeer(leecherLeecher)
468         wg.Add(1)
469         go func() {
470                 defer wg.Done()
471                 leecherGreeting.DownloadAll()
472                 leecher.WaitAll()
473         }()
474         wg.Wait()
475 }
476
477 func TestMergingTrackersByAddingSpecs(t *testing.T) {
478         cl, err := NewClient(TestingConfig())
479         require.NoError(t, err)
480         defer cl.Close()
481         spec := TorrentSpec{}
482         T, new, _ := cl.AddTorrentSpec(&spec)
483         if !new {
484                 t.FailNow()
485         }
486         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
487         _, new, _ = cl.AddTorrentSpec(&spec)
488         assert.False(t, new)
489         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
490         // Because trackers are disabled in TestingConfig.
491         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
492 }
493
494 // We read from a piece which is marked completed, but is missing data.
495 func TestCompletedPieceWrongSize(t *testing.T) {
496         cfg := TestingConfig()
497         cfg.DefaultStorage = badStorage{}
498         cl, err := NewClient(cfg)
499         require.NoError(t, err)
500         defer cl.Close()
501         info := metainfo.Info{
502                 PieceLength: 15,
503                 Pieces:      make([]byte, 20),
504                 Files: []metainfo.FileInfo{
505                         {Path: []string{"greeting"}, Length: 13},
506                 },
507         }
508         b, err := bencode.Marshal(info)
509         require.NoError(t, err)
510         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
511                 InfoBytes: b,
512                 InfoHash:  metainfo.HashBytes(b),
513         })
514         require.NoError(t, err)
515         defer tt.Drop()
516         assert.True(t, new)
517         r := tt.NewReader()
518         defer r.Close()
519         b, err = ioutil.ReadAll(r)
520         assert.Len(t, b, 13)
521         assert.NoError(t, err)
522 }
523
524 func BenchmarkAddLargeTorrent(b *testing.B) {
525         cfg := TestingConfig()
526         cfg.DisableTCP = true
527         cfg.DisableUTP = true
528         cl, err := NewClient(cfg)
529         require.NoError(b, err)
530         defer cl.Close()
531         b.ReportAllocs()
532         for range iter.N(b.N) {
533                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
534                 if err != nil {
535                         b.Fatal(err)
536                 }
537                 t.Drop()
538         }
539 }
540
541 func TestResponsive(t *testing.T) {
542         seederDataDir, mi := testutil.GreetingTestTorrent()
543         defer os.RemoveAll(seederDataDir)
544         cfg := TestingConfig()
545         cfg.Seed = true
546         cfg.DataDir = seederDataDir
547         seeder, err := NewClient(cfg)
548         require.Nil(t, err)
549         defer seeder.Close()
550         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
551         seederTorrent.VerifyData()
552         leecherDataDir, err := ioutil.TempDir("", "")
553         require.Nil(t, err)
554         defer os.RemoveAll(leecherDataDir)
555         cfg = TestingConfig()
556         cfg.DataDir = leecherDataDir
557         leecher, err := NewClient(cfg)
558         require.Nil(t, err)
559         defer leecher.Close()
560         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
561                 ret = TorrentSpecFromMetaInfo(mi)
562                 ret.ChunkSize = 2
563                 return
564         }())
565         leecherTorrent.AddClientPeer(seeder)
566         reader := leecherTorrent.NewReader()
567         defer reader.Close()
568         reader.SetReadahead(0)
569         reader.SetResponsive()
570         b := make([]byte, 2)
571         _, err = reader.Seek(3, io.SeekStart)
572         require.NoError(t, err)
573         _, err = io.ReadFull(reader, b)
574         assert.Nil(t, err)
575         assert.EqualValues(t, "lo", string(b))
576         _, err = reader.Seek(11, io.SeekStart)
577         require.NoError(t, err)
578         n, err := io.ReadFull(reader, b)
579         assert.Nil(t, err)
580         assert.EqualValues(t, 2, n)
581         assert.EqualValues(t, "d\n", string(b))
582 }
583
584 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
585         seederDataDir, mi := testutil.GreetingTestTorrent()
586         defer os.RemoveAll(seederDataDir)
587         cfg := TestingConfig()
588         cfg.Seed = true
589         cfg.DataDir = seederDataDir
590         seeder, err := NewClient(cfg)
591         require.Nil(t, err)
592         defer seeder.Close()
593         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
594         seederTorrent.VerifyData()
595         leecherDataDir, err := ioutil.TempDir("", "")
596         require.Nil(t, err)
597         defer os.RemoveAll(leecherDataDir)
598         cfg = TestingConfig()
599         cfg.DataDir = leecherDataDir
600         leecher, err := NewClient(cfg)
601         require.Nil(t, err)
602         defer leecher.Close()
603         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
604                 ret = TorrentSpecFromMetaInfo(mi)
605                 ret.ChunkSize = 2
606                 return
607         }())
608         leecherTorrent.AddClientPeer(seeder)
609         reader := leecherTorrent.NewReader()
610         defer reader.Close()
611         reader.SetReadahead(0)
612         reader.SetResponsive()
613         b := make([]byte, 2)
614         _, err = reader.Seek(3, io.SeekStart)
615         require.NoError(t, err)
616         _, err = io.ReadFull(reader, b)
617         assert.Nil(t, err)
618         assert.EqualValues(t, "lo", string(b))
619         go leecherTorrent.Drop()
620         _, err = reader.Seek(11, io.SeekStart)
621         require.NoError(t, err)
622         n, err := reader.Read(b)
623         assert.EqualError(t, err, "torrent closed")
624         assert.EqualValues(t, 0, n)
625 }
626
627 func TestDHTInheritBlocklist(t *testing.T) {
628         ipl := iplist.New(nil)
629         require.NotNil(t, ipl)
630         cfg := TestingConfig()
631         cfg.IPBlocklist = ipl
632         cfg.NoDHT = false
633         cl, err := NewClient(cfg)
634         require.NoError(t, err)
635         defer cl.Close()
636         numServers := 0
637         cl.eachDhtServer(func(s *dht.Server) {
638                 assert.Equal(t, ipl, s.IPBlocklist())
639                 numServers++
640         })
641         assert.EqualValues(t, 2, numServers)
642 }
643
644 // Check that stuff is merged in subsequent AddTorrentSpec for the same
645 // infohash.
646 func TestAddTorrentSpecMerging(t *testing.T) {
647         cl, err := NewClient(TestingConfig())
648         require.NoError(t, err)
649         defer cl.Close()
650         dir, mi := testutil.GreetingTestTorrent()
651         defer os.RemoveAll(dir)
652         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
653                 InfoHash: mi.HashInfoBytes(),
654         })
655         require.NoError(t, err)
656         require.True(t, new)
657         require.Nil(t, tt.Info())
658         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
659         require.NoError(t, err)
660         require.False(t, new)
661         require.NotNil(t, tt.Info())
662 }
663
664 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
665         dir, mi := testutil.GreetingTestTorrent()
666         os.RemoveAll(dir)
667         cl, _ := NewClient(TestingConfig())
668         defer cl.Close()
669         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
670                 InfoHash: mi.HashInfoBytes(),
671         })
672         tt.Drop()
673         assert.EqualValues(t, 0, len(cl.Torrents()))
674         select {
675         case <-tt.GotInfo():
676                 t.FailNow()
677         default:
678         }
679 }
680
681 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
682         for i := range iter.N(info.NumPieces()) {
683                 p := info.Piece(i)
684                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
685         }
686 }
687
688 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
689         fileCacheDir, err := ioutil.TempDir("", "")
690         require.NoError(t, err)
691         defer os.RemoveAll(fileCacheDir)
692         fileCache, err := filecache.NewCache(fileCacheDir)
693         require.NoError(t, err)
694         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
695         defer os.RemoveAll(greetingDataTempDir)
696         filePieceStore := csf(fileCache)
697         defer filePieceStore.Close()
698         info, err := greetingMetainfo.UnmarshalInfo()
699         require.NoError(t, err)
700         ih := greetingMetainfo.HashInfoBytes()
701         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
702         require.NoError(t, err)
703         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
704         // require.Equal(t, len(testutil.GreetingFileContents), written)
705         // require.NoError(t, err)
706         for i := 0; i < info.NumPieces(); i++ {
707                 p := info.Piece(i)
708                 if alreadyCompleted {
709                         require.NoError(t, greetingData.Piece(p).MarkComplete())
710                 }
711         }
712         cfg := TestingConfig()
713         // TODO: Disable network option?
714         cfg.DisableTCP = true
715         cfg.DisableUTP = true
716         cfg.DefaultStorage = filePieceStore
717         cl, err := NewClient(cfg)
718         require.NoError(t, err)
719         defer cl.Close()
720         tt, err := cl.AddTorrent(greetingMetainfo)
721         require.NoError(t, err)
722         psrs := tt.PieceStateRuns()
723         assert.Len(t, psrs, 1)
724         assert.EqualValues(t, 3, psrs[0].Length)
725         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
726         if alreadyCompleted {
727                 r := tt.NewReader()
728                 b, err := ioutil.ReadAll(r)
729                 assert.NoError(t, err)
730                 assert.EqualValues(t, testutil.GreetingFileContents, b)
731         }
732 }
733
734 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
735         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
736 }
737
738 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
739         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
740 }
741
742 func TestAddMetainfoWithNodes(t *testing.T) {
743         cfg := TestingConfig()
744         cfg.ListenHost = func(string) string { return "" }
745         cfg.NoDHT = false
746         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
747         // For now, we want to just jam the nodes into the table, without
748         // verifying them first. Also the DHT code doesn't support mixing secure
749         // and insecure nodes if security is enabled (yet).
750         // cfg.DHTConfig.NoSecurity = true
751         cl, err := NewClient(cfg)
752         require.NoError(t, err)
753         defer cl.Close()
754         sum := func() (ret int64) {
755                 cl.eachDhtServer(func(s *dht.Server) {
756                         ret += s.Stats().OutboundQueriesAttempted
757                 })
758                 return
759         }
760         assert.EqualValues(t, 0, sum())
761         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
762         require.NoError(t, err)
763         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
764         // check if the announce-list is here instead. TODO: Add nodes.
765         assert.Len(t, tt.metainfo.AnnounceList, 5)
766         // There are 6 nodes in the torrent file.
767         for sum() != int64(6*len(cl.dhtServers)) {
768                 time.Sleep(time.Millisecond)
769         }
770 }
771
772 type testDownloadCancelParams struct {
773         SetLeecherStorageCapacity bool
774         LeecherStorageCapacity    int64
775         Cancel                    bool
776 }
777
778 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
779         greetingTempDir, mi := testutil.GreetingTestTorrent()
780         defer os.RemoveAll(greetingTempDir)
781         cfg := TestingConfig()
782         cfg.Seed = true
783         cfg.DataDir = greetingTempDir
784         seeder, err := NewClient(cfg)
785         require.NoError(t, err)
786         defer seeder.Close()
787         defer testutil.ExportStatusWriter(seeder, "s")()
788         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
789         seederTorrent.VerifyData()
790         leecherDataDir, err := ioutil.TempDir("", "")
791         require.NoError(t, err)
792         defer os.RemoveAll(leecherDataDir)
793         fc, err := filecache.NewCache(leecherDataDir)
794         require.NoError(t, err)
795         if ps.SetLeecherStorageCapacity {
796                 fc.SetCapacity(ps.LeecherStorageCapacity)
797         }
798         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
799         cfg.DataDir = leecherDataDir
800         leecher, err := NewClient(cfg)
801         require.NoError(t, err)
802         defer leecher.Close()
803         defer testutil.ExportStatusWriter(leecher, "l")()
804         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
805                 ret = TorrentSpecFromMetaInfo(mi)
806                 ret.ChunkSize = 2
807                 return
808         }())
809         require.NoError(t, err)
810         assert.True(t, new)
811         psc := leecherGreeting.SubscribePieceStateChanges()
812         defer psc.Close()
813
814         leecherGreeting.cl.lock()
815         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
816         if ps.Cancel {
817                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
818         }
819         leecherGreeting.cl.unlock()
820         done := make(chan struct{})
821         defer close(done)
822         go leecherGreeting.AddClientPeer(seeder)
823         completes := make(map[int]bool, 3)
824         expected := func() map[int]bool {
825                 if ps.Cancel {
826                         return map[int]bool{0: false, 1: false, 2: false}
827                 } else {
828                         return map[int]bool{0: true, 1: true, 2: true}
829                 }
830         }()
831         for !reflect.DeepEqual(completes, expected) {
832                 _v := <-psc.Values
833                 v := _v.(PieceStateChange)
834                 completes[v.Index] = v.Complete
835         }
836 }
837
838 func TestTorrentDownloadAll(t *testing.T) {
839         testDownloadCancel(t, testDownloadCancelParams{})
840 }
841
842 func TestTorrentDownloadAllThenCancel(t *testing.T) {
843         testDownloadCancel(t, testDownloadCancelParams{
844                 Cancel: true,
845         })
846 }
847
848 // Ensure that it's an error for a peer to send an invalid have message.
849 func TestPeerInvalidHave(t *testing.T) {
850         cl, err := NewClient(TestingConfig())
851         require.NoError(t, err)
852         defer cl.Close()
853         info := metainfo.Info{
854                 PieceLength: 1,
855                 Pieces:      make([]byte, 20),
856                 Files:       []metainfo.FileInfo{{Length: 1}},
857         }
858         infoBytes, err := bencode.Marshal(info)
859         require.NoError(t, err)
860         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
861                 InfoBytes: infoBytes,
862                 InfoHash:  metainfo.HashBytes(infoBytes),
863                 Storage:   badStorage{},
864         })
865         require.NoError(t, err)
866         assert.True(t, _new)
867         defer tt.Drop()
868         cn := &connection{
869                 t: tt,
870         }
871         assert.NoError(t, cn.peerSentHave(0))
872         assert.Error(t, cn.peerSentHave(1))
873 }
874
875 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
876         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
877         defer os.RemoveAll(greetingTempDir)
878         cfg := TestingConfig()
879         cfg.DataDir = greetingTempDir
880         seeder, err := NewClient(TestingConfig())
881         require.NoError(t, err)
882         seeder.AddTorrentSpec(&TorrentSpec{
883                 InfoBytes: greetingMetainfo.InfoBytes,
884         })
885 }
886
887 // Check that when the listen port is 0, all the protocols listened on have
888 // the same port, and it isn't zero.
889 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
890         cl, err := NewClient(TestingConfig())
891         require.NoError(t, err)
892         defer cl.Close()
893         port := cl.LocalPort()
894         assert.NotEqual(t, 0, port)
895         cl.eachListener(func(s socket) bool {
896                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
897                 return true
898         })
899 }
900
901 func TestClientDynamicListenTCPOnly(t *testing.T) {
902         cfg := TestingConfig()
903         cfg.DisableUTP = true
904         cfg.DisableTCP = false
905         cl, err := NewClient(cfg)
906         require.NoError(t, err)
907         defer cl.Close()
908         assert.NotEqual(t, 0, cl.LocalPort())
909 }
910
911 func TestClientDynamicListenUTPOnly(t *testing.T) {
912         cfg := TestingConfig()
913         cfg.DisableTCP = true
914         cfg.DisableUTP = false
915         cl, err := NewClient(cfg)
916         require.NoError(t, err)
917         defer cl.Close()
918         assert.NotEqual(t, 0, cl.LocalPort())
919 }
920
921 func totalConns(tts []*Torrent) (ret int) {
922         for _, tt := range tts {
923                 tt.cl.lock()
924                 ret += len(tt.conns)
925                 tt.cl.unlock()
926         }
927         return
928 }
929
930 func TestSetMaxEstablishedConn(t *testing.T) {
931         var tts []*Torrent
932         ih := testutil.GreetingMetaInfo().HashInfoBytes()
933         cfg := TestingConfig()
934         cfg.DisableAcceptRateLimiting = true
935         cfg.dropDuplicatePeerIds = true
936         for i := range iter.N(3) {
937                 cl, err := NewClient(cfg)
938                 require.NoError(t, err)
939                 defer cl.Close()
940                 tt, _ := cl.AddTorrentInfoHash(ih)
941                 tt.SetMaxEstablishedConns(2)
942                 defer testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))()
943                 tts = append(tts, tt)
944         }
945         addPeers := func() {
946                 for _, tt := range tts {
947                         for _, _tt := range tts {
948                                 // if tt != _tt {
949                                 tt.AddClientPeer(_tt.cl)
950                                 // }
951                         }
952                 }
953         }
954         waitTotalConns := func(num int) {
955                 for totalConns(tts) != num {
956                         addPeers()
957                         time.Sleep(time.Millisecond)
958                 }
959         }
960         addPeers()
961         waitTotalConns(6)
962         tts[0].SetMaxEstablishedConns(1)
963         waitTotalConns(4)
964         tts[0].SetMaxEstablishedConns(0)
965         waitTotalConns(2)
966         tts[0].SetMaxEstablishedConns(1)
967         addPeers()
968         waitTotalConns(4)
969         tts[0].SetMaxEstablishedConns(2)
970         addPeers()
971         waitTotalConns(6)
972 }
973
974 // Creates a file containing its own name as data. Make a metainfo from that, adds it to the given
975 // client, and returns a magnet link.
976 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
977         os.MkdirAll(dir, 0770)
978         file, err := os.Create(filepath.Join(dir, name))
979         require.NoError(t, err)
980         file.Write([]byte(name))
981         file.Close()
982         mi := metainfo.MetaInfo{}
983         mi.SetDefaults()
984         info := metainfo.Info{PieceLength: 256 * 1024}
985         err = info.BuildFromFilePath(filepath.Join(dir, name))
986         require.NoError(t, err)
987         mi.InfoBytes, err = bencode.Marshal(info)
988         require.NoError(t, err)
989         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
990         tr, err := cl.AddTorrent(&mi)
991         require.NoError(t, err)
992         require.True(t, tr.Seeding())
993         tr.VerifyData()
994         return magnet
995 }
996
997 // https://github.com/anacrolix/torrent/issues/114
998 func TestMultipleTorrentsWithEncryption(t *testing.T) {
999         testSeederLeecherPair(
1000                 t,
1001                 func(cfg *ClientConfig) {
1002                         cfg.HeaderObfuscationPolicy.Preferred = true
1003                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1004                 },
1005                 func(cfg *ClientConfig) {
1006                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1007                 },
1008         )
1009 }
1010
1011 // Test that the leecher can download a torrent in its entirety from the seeder. Note that the
1012 // seeder config is done first.
1013 func testSeederLeecherPair(t *testing.T, seeder func(*ClientConfig), leecher func(*ClientConfig)) {
1014         cfg := TestingConfig()
1015         cfg.Seed = true
1016         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1017         os.Mkdir(cfg.DataDir, 0755)
1018         seeder(cfg)
1019         server, err := NewClient(cfg)
1020         require.NoError(t, err)
1021         defer server.Close()
1022         defer testutil.ExportStatusWriter(server, "s")()
1023         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1024         // Extra torrents are added to test the seeder having to match incoming obfuscated headers
1025         // against more than one torrent. See issue #114
1026         makeMagnet(t, server, cfg.DataDir, "test2")
1027         for i := 0; i < 100; i++ {
1028                 makeMagnet(t, server, cfg.DataDir, fmt.Sprintf("test%d", i+2))
1029         }
1030         cfg = TestingConfig()
1031         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1032         leecher(cfg)
1033         client, err := NewClient(cfg)
1034         require.NoError(t, err)
1035         defer client.Close()
1036         defer testutil.ExportStatusWriter(client, "c")()
1037         tr, err := client.AddMagnet(magnet1)
1038         require.NoError(t, err)
1039         tr.AddClientPeer(server)
1040         <-tr.GotInfo()
1041         tr.DownloadAll()
1042         client.WaitAll()
1043 }
1044
1045 // This appears to be the situation with the S3 BitTorrent client.
1046 func TestObfuscatedHeaderFallbackSeederDisallowsLeecherPrefers(t *testing.T) {
1047         // Leecher prefers obfuscation, but the seeder does not allow it.
1048         testSeederLeecherPair(
1049                 t,
1050                 func(cfg *ClientConfig) {
1051                         cfg.HeaderObfuscationPolicy.Preferred = false
1052                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1053                 },
1054                 func(cfg *ClientConfig) {
1055                         cfg.HeaderObfuscationPolicy.Preferred = true
1056                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1057                 },
1058         )
1059 }
1060
1061 func TestObfuscatedHeaderFallbackSeederRequiresLeecherPrefersNot(t *testing.T) {
1062         // Leecher prefers no obfuscation, but the seeder enforces it.
1063         testSeederLeecherPair(
1064                 t,
1065                 func(cfg *ClientConfig) {
1066                         cfg.HeaderObfuscationPolicy.Preferred = true
1067                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1068                 },
1069                 func(cfg *ClientConfig) {
1070                         cfg.HeaderObfuscationPolicy.Preferred = false
1071                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1072                 },
1073         )
1074 }
1075
1076 func TestClientAddressInUse(t *testing.T) {
1077         s, _ := NewUtpSocket("udp", ":50007", nil)
1078         if s != nil {
1079                 defer s.Close()
1080         }
1081         cfg := TestingConfig().SetListenAddr(":50007")
1082         cl, err := NewClient(cfg)
1083         require.Error(t, err)
1084         require.Nil(t, cl)
1085 }
1086
1087 func TestClientHasDhtServersWhenUtpDisabled(t *testing.T) {
1088         cc := TestingConfig()
1089         cc.DisableUTP = true
1090         cc.NoDHT = false
1091         cl, err := NewClient(cc)
1092         require.NoError(t, err)
1093         defer cl.Close()
1094         assert.NotEmpty(t, cl.DhtServers())
1095 }
1096
1097 func TestIssue335(t *testing.T) {
1098         dir, mi := testutil.GreetingTestTorrent()
1099         defer os.RemoveAll(dir)
1100         cfg := TestingConfig()
1101         cfg.Seed = false
1102         cfg.Debug = true
1103         cfg.DataDir = dir
1104         comp, err := storage.NewBoltPieceCompletion(dir)
1105         require.NoError(t, err)
1106         defer comp.Close()
1107         cfg.DefaultStorage = storage.NewMMapWithCompletion(dir, comp)
1108         cl, err := NewClient(cfg)
1109         require.NoError(t, err)
1110         defer cl.Close()
1111         tor, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1112         require.NoError(t, err)
1113         assert.True(t, new)
1114         require.True(t, cl.WaitAll())
1115         tor.Drop()
1116         tor, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1117         require.NoError(t, err)
1118         assert.True(t, new)
1119         require.True(t, cl.WaitAll())
1120 }