]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Add an error check in a test
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "os"
9         "path/filepath"
10         "reflect"
11         "sync"
12         "testing"
13         "time"
14
15         "github.com/bradfitz/iter"
16         "github.com/stretchr/testify/assert"
17         "github.com/stretchr/testify/require"
18         "golang.org/x/time/rate"
19
20         "github.com/anacrolix/dht"
21         _ "github.com/anacrolix/envpprof"
22         "github.com/anacrolix/missinggo"
23         "github.com/anacrolix/missinggo/filecache"
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/internal/testutil"
26         "github.com/anacrolix/torrent/iplist"
27         "github.com/anacrolix/torrent/metainfo"
28         "github.com/anacrolix/torrent/storage"
29 )
30
31 func TestingConfig() *ClientConfig {
32         cfg := NewDefaultClientConfig()
33         cfg.ListenHost = LoopbackListenHost
34         cfg.NoDHT = true
35         cfg.DataDir = tempDir()
36         cfg.DisableTrackers = true
37         cfg.NoDefaultPortForwarding = true
38         cfg.DisableAcceptRateLimiting = true
39         return cfg
40 }
41
42 func TestClientDefault(t *testing.T) {
43         cl, err := NewClient(TestingConfig())
44         require.NoError(t, err)
45         cl.Close()
46 }
47
48 func TestClientNilConfig(t *testing.T) {
49         cl, err := NewClient(nil)
50         require.NoError(t, err)
51         cl.Close()
52 }
53
54 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
55         cfg := TestingConfig()
56         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
57         require.NoError(t, err)
58         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
59         defer ci.Close()
60         cfg.DefaultStorage = ci
61         cl, err := NewClient(cfg)
62         require.NoError(t, err)
63         cl.Close()
64         // And again, https://github.com/anacrolix/torrent/issues/158
65         cl, err = NewClient(cfg)
66         require.NoError(t, err)
67         cl.Close()
68 }
69
70 func TestAddDropTorrent(t *testing.T) {
71         cl, err := NewClient(TestingConfig())
72         require.NoError(t, err)
73         defer cl.Close()
74         dir, mi := testutil.GreetingTestTorrent()
75         defer os.RemoveAll(dir)
76         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
77         require.NoError(t, err)
78         assert.True(t, new)
79         tt.SetMaxEstablishedConns(0)
80         tt.SetMaxEstablishedConns(1)
81         tt.Drop()
82 }
83
84 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
85         // TODO?
86         t.SkipNow()
87 }
88
89 func TestAddTorrentNoUsableURLs(t *testing.T) {
90         // TODO?
91         t.SkipNow()
92 }
93
94 func TestAddPeersToUnknownTorrent(t *testing.T) {
95         // TODO?
96         t.SkipNow()
97 }
98
99 func TestPieceHashSize(t *testing.T) {
100         assert.Equal(t, 20, pieceHash.Size())
101 }
102
103 func TestTorrentInitialState(t *testing.T) {
104         dir, mi := testutil.GreetingTestTorrent()
105         defer os.RemoveAll(dir)
106         cl := &Client{
107                 config: &ClientConfig{},
108         }
109         cl.initLogger()
110         tor := cl.newTorrent(
111                 mi.HashInfoBytes(),
112                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
113         )
114         tor.setChunkSize(2)
115         tor.cl.lock()
116         err := tor.setInfoBytes(mi.InfoBytes)
117         tor.cl.unlock()
118         require.NoError(t, err)
119         require.Len(t, tor.pieces, 3)
120         tor.pendAllChunkSpecs(0)
121         tor.cl.lock()
122         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
123         tor.cl.unlock()
124         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
125 }
126
127 func TestReducedDialTimeout(t *testing.T) {
128         cfg := NewDefaultClientConfig()
129         for _, _case := range []struct {
130                 Max             time.Duration
131                 HalfOpenLimit   int
132                 PendingPeers    int
133                 ExpectedReduced time.Duration
134         }{
135                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
136                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
137                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
138                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
139                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
140                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
141         } {
142                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
143                 expected := _case.ExpectedReduced
144                 if expected < cfg.MinDialTimeout {
145                         expected = cfg.MinDialTimeout
146                 }
147                 if reduced != expected {
148                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
149                 }
150         }
151 }
152
153 func TestAddDropManyTorrents(t *testing.T) {
154         cl, err := NewClient(TestingConfig())
155         require.NoError(t, err)
156         defer cl.Close()
157         for i := range iter.N(1000) {
158                 var spec TorrentSpec
159                 binary.PutVarint(spec.InfoHash[:], int64(i))
160                 tt, new, err := cl.AddTorrentSpec(&spec)
161                 assert.NoError(t, err)
162                 assert.True(t, new)
163                 defer tt.Drop()
164         }
165 }
166
167 type FileCacheClientStorageFactoryParams struct {
168         Capacity    int64
169         SetCapacity bool
170         Wrapper     func(*filecache.Cache) storage.ClientImpl
171 }
172
173 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
174         return func(dataDir string) storage.ClientImpl {
175                 fc, err := filecache.NewCache(dataDir)
176                 if err != nil {
177                         panic(err)
178                 }
179                 if ps.SetCapacity {
180                         fc.SetCapacity(ps.Capacity)
181                 }
182                 return ps.Wrapper(fc)
183         }
184 }
185
186 type storageFactory func(string) storage.ClientImpl
187
188 func TestClientTransferDefault(t *testing.T) {
189         testClientTransfer(t, testClientTransferParams{
190                 ExportClientStatus: true,
191                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
192                         Wrapper: fileCachePieceResourceStorage,
193                 }),
194         })
195 }
196
197 func TestClientTransferRateLimitedUpload(t *testing.T) {
198         started := time.Now()
199         testClientTransfer(t, testClientTransferParams{
200                 // We are uploading 13 bytes (the length of the greeting torrent). The
201                 // chunks are 2 bytes in length. Then the smallest burst we can run
202                 // with is 2. Time taken is (13-burst)/rate.
203                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
204                 ExportClientStatus:      true,
205         })
206         require.True(t, time.Since(started) > time.Second)
207 }
208
209 func TestClientTransferRateLimitedDownload(t *testing.T) {
210         testClientTransfer(t, testClientTransferParams{
211                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
212         })
213 }
214
215 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
216         return storage.NewResourcePieces(fc.AsResourceProvider())
217 }
218
219 func TestClientTransferSmallCache(t *testing.T) {
220         testClientTransfer(t, testClientTransferParams{
221                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
222                         SetCapacity: true,
223                         // Going below the piece length means it can't complete a piece so
224                         // that it can be hashed.
225                         Capacity: 5,
226                         Wrapper:  fileCachePieceResourceStorage,
227                 }),
228                 SetReadahead: true,
229                 // Can't readahead too far or the cache will thrash and drop data we
230                 // thought we had.
231                 Readahead:          0,
232                 ExportClientStatus: true,
233         })
234 }
235
236 func TestClientTransferVarious(t *testing.T) {
237         // Leecher storage
238         for _, ls := range []storageFactory{
239                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
240                         Wrapper: fileCachePieceResourceStorage,
241                 }),
242                 storage.NewBoltDB,
243         } {
244                 // Seeder storage
245                 for _, ss := range []func(string) storage.ClientImpl{
246                         storage.NewFile,
247                         storage.NewMMap,
248                 } {
249                         for _, responsive := range []bool{false, true} {
250                                 testClientTransfer(t, testClientTransferParams{
251                                         Responsive:     responsive,
252                                         SeederStorage:  ss,
253                                         LeecherStorage: ls,
254                                 })
255                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
256                                         testClientTransfer(t, testClientTransferParams{
257                                                 SeederStorage:  ss,
258                                                 Responsive:     responsive,
259                                                 SetReadahead:   true,
260                                                 Readahead:      readahead,
261                                                 LeecherStorage: ls,
262                                         })
263                                 }
264                         }
265                 }
266         }
267 }
268
269 type testClientTransferParams struct {
270         Responsive                 bool
271         Readahead                  int64
272         SetReadahead               bool
273         ExportClientStatus         bool
274         LeecherStorage             func(string) storage.ClientImpl
275         SeederStorage              func(string) storage.ClientImpl
276         SeederUploadRateLimiter    *rate.Limiter
277         LeecherDownloadRateLimiter *rate.Limiter
278 }
279
280 // Creates a seeder and a leecher, and ensures the data transfers when a read
281 // is attempted on the leecher.
282 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
283         greetingTempDir, mi := testutil.GreetingTestTorrent()
284         defer os.RemoveAll(greetingTempDir)
285         // Create seeder and a Torrent.
286         cfg := TestingConfig()
287         cfg.Seed = true
288         if ps.SeederUploadRateLimiter != nil {
289                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
290         }
291         // cfg.ListenAddr = "localhost:4000"
292         if ps.SeederStorage != nil {
293                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
294                 defer cfg.DefaultStorage.Close()
295         } else {
296                 cfg.DataDir = greetingTempDir
297         }
298         seeder, err := NewClient(cfg)
299         require.NoError(t, err)
300         if ps.ExportClientStatus {
301                 defer testutil.ExportStatusWriter(seeder, "s")()
302         }
303         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
304         // Run a Stats right after Closing the Client. This will trigger the Stats
305         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
306         defer seederTorrent.Stats()
307         defer seeder.Close()
308         seederTorrent.VerifyData()
309         // Create leecher and a Torrent.
310         leecherDataDir, err := ioutil.TempDir("", "")
311         require.NoError(t, err)
312         defer os.RemoveAll(leecherDataDir)
313         cfg = TestingConfig()
314         if ps.LeecherStorage == nil {
315                 cfg.DataDir = leecherDataDir
316         } else {
317                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
318         }
319         if ps.LeecherDownloadRateLimiter != nil {
320                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
321         }
322         cfg.Seed = false
323         leecher, err := NewClient(cfg)
324         require.NoError(t, err)
325         defer leecher.Close()
326         if ps.ExportClientStatus {
327                 defer testutil.ExportStatusWriter(leecher, "l")()
328         }
329         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
330                 ret = TorrentSpecFromMetaInfo(mi)
331                 ret.ChunkSize = 2
332                 return
333         }())
334         require.NoError(t, err)
335         assert.True(t, new)
336         // Now do some things with leecher and seeder.
337         leecherTorrent.AddClientPeer(seeder)
338         // The Torrent should not be interested in obtaining peers, so the one we
339         // just added should be the only one.
340         assert.False(t, leecherTorrent.Seeding())
341         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
342         r := leecherTorrent.NewReader()
343         defer r.Close()
344         if ps.Responsive {
345                 r.SetResponsive()
346         }
347         if ps.SetReadahead {
348                 r.SetReadahead(ps.Readahead)
349         }
350         assertReadAllGreeting(t, r)
351
352         seederStats := seederTorrent.Stats()
353         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
354         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
355
356         leecherStats := leecherTorrent.Stats()
357         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
358         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
359
360         // Try reading through again for the cases where the torrent data size
361         // exceeds the size of the cache.
362         assertReadAllGreeting(t, r)
363 }
364
365 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
366         pos, err := r.Seek(0, io.SeekStart)
367         assert.NoError(t, err)
368         assert.EqualValues(t, 0, pos)
369         _greeting, err := ioutil.ReadAll(r)
370         assert.NoError(t, err)
371         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
372 }
373
374 // Check that after completing leeching, a leecher transitions to a seeding
375 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
376 func TestSeedAfterDownloading(t *testing.T) {
377         greetingTempDir, mi := testutil.GreetingTestTorrent()
378         defer os.RemoveAll(greetingTempDir)
379
380         cfg := TestingConfig()
381         cfg.Seed = true
382         cfg.DataDir = greetingTempDir
383         seeder, err := NewClient(cfg)
384         require.NoError(t, err)
385         defer seeder.Close()
386         defer testutil.ExportStatusWriter(seeder, "s")()
387         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
388         require.NoError(t, err)
389         assert.True(t, ok)
390         seederTorrent.VerifyData()
391
392         cfg = TestingConfig()
393         cfg.Seed = true
394         cfg.DataDir, err = ioutil.TempDir("", "")
395         require.NoError(t, err)
396         defer os.RemoveAll(cfg.DataDir)
397         leecher, err := NewClient(cfg)
398         require.NoError(t, err)
399         defer leecher.Close()
400         defer testutil.ExportStatusWriter(leecher, "l")()
401
402         cfg = TestingConfig()
403         cfg.Seed = false
404         cfg.DataDir, err = ioutil.TempDir("", "")
405         require.NoError(t, err)
406         defer os.RemoveAll(cfg.DataDir)
407         leecherLeecher, _ := NewClient(cfg)
408         require.NoError(t, err)
409         defer leecherLeecher.Close()
410         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
411         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
412                 ret = TorrentSpecFromMetaInfo(mi)
413                 ret.ChunkSize = 2
414                 return
415         }())
416         require.NoError(t, err)
417         assert.True(t, ok)
418         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
419                 ret = TorrentSpecFromMetaInfo(mi)
420                 ret.ChunkSize = 3
421                 return
422         }())
423         require.NoError(t, err)
424         assert.True(t, ok)
425         // Simultaneously DownloadAll in Leecher, and read the contents
426         // consecutively in LeecherLeecher. This non-deterministically triggered a
427         // case where the leecher wouldn't unchoke the LeecherLeecher.
428         var wg sync.WaitGroup
429         wg.Add(1)
430         go func() {
431                 defer wg.Done()
432                 r := llg.NewReader()
433                 defer r.Close()
434                 b, err := ioutil.ReadAll(r)
435                 require.NoError(t, err)
436                 assert.EqualValues(t, testutil.GreetingFileContents, b)
437         }()
438         done := make(chan struct{})
439         defer close(done)
440         go leecherGreeting.AddClientPeer(seeder)
441         go leecherGreeting.AddClientPeer(leecherLeecher)
442         wg.Add(1)
443         go func() {
444                 defer wg.Done()
445                 leecherGreeting.DownloadAll()
446                 leecher.WaitAll()
447         }()
448         wg.Wait()
449 }
450
451 func TestMergingTrackersByAddingSpecs(t *testing.T) {
452         cl, err := NewClient(TestingConfig())
453         require.NoError(t, err)
454         defer cl.Close()
455         spec := TorrentSpec{}
456         T, new, _ := cl.AddTorrentSpec(&spec)
457         if !new {
458                 t.FailNow()
459         }
460         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
461         _, new, _ = cl.AddTorrentSpec(&spec)
462         assert.False(t, new)
463         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
464         // Because trackers are disabled in TestingConfig.
465         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
466 }
467
468 // We read from a piece which is marked completed, but is missing data.
469 func TestCompletedPieceWrongSize(t *testing.T) {
470         cfg := TestingConfig()
471         cfg.DefaultStorage = badStorage{}
472         cl, err := NewClient(cfg)
473         require.NoError(t, err)
474         defer cl.Close()
475         info := metainfo.Info{
476                 PieceLength: 15,
477                 Pieces:      make([]byte, 20),
478                 Files: []metainfo.FileInfo{
479                         {Path: []string{"greeting"}, Length: 13},
480                 },
481         }
482         b, err := bencode.Marshal(info)
483         require.NoError(t, err)
484         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
485                 InfoBytes: b,
486                 InfoHash:  metainfo.HashBytes(b),
487         })
488         require.NoError(t, err)
489         defer tt.Drop()
490         assert.True(t, new)
491         r := tt.NewReader()
492         defer r.Close()
493         b, err = ioutil.ReadAll(r)
494         assert.Len(t, b, 13)
495         assert.NoError(t, err)
496 }
497
498 func BenchmarkAddLargeTorrent(b *testing.B) {
499         cfg := TestingConfig()
500         cfg.DisableTCP = true
501         cfg.DisableUTP = true
502         cl, err := NewClient(cfg)
503         require.NoError(b, err)
504         defer cl.Close()
505         b.ReportAllocs()
506         for range iter.N(b.N) {
507                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
508                 if err != nil {
509                         b.Fatal(err)
510                 }
511                 t.Drop()
512         }
513 }
514
515 func TestResponsive(t *testing.T) {
516         seederDataDir, mi := testutil.GreetingTestTorrent()
517         defer os.RemoveAll(seederDataDir)
518         cfg := TestingConfig()
519         cfg.Seed = true
520         cfg.DataDir = seederDataDir
521         seeder, err := NewClient(cfg)
522         require.Nil(t, err)
523         defer seeder.Close()
524         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
525         seederTorrent.VerifyData()
526         leecherDataDir, err := ioutil.TempDir("", "")
527         require.Nil(t, err)
528         defer os.RemoveAll(leecherDataDir)
529         cfg = TestingConfig()
530         cfg.DataDir = leecherDataDir
531         leecher, err := NewClient(cfg)
532         require.Nil(t, err)
533         defer leecher.Close()
534         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
535                 ret = TorrentSpecFromMetaInfo(mi)
536                 ret.ChunkSize = 2
537                 return
538         }())
539         leecherTorrent.AddClientPeer(seeder)
540         reader := leecherTorrent.NewReader()
541         defer reader.Close()
542         reader.SetReadahead(0)
543         reader.SetResponsive()
544         b := make([]byte, 2)
545         _, err = reader.Seek(3, io.SeekStart)
546         require.NoError(t, err)
547         _, err = io.ReadFull(reader, b)
548         assert.Nil(t, err)
549         assert.EqualValues(t, "lo", string(b))
550         _, err = reader.Seek(11, io.SeekStart)
551         require.NoError(t, err)
552         n, err := io.ReadFull(reader, b)
553         assert.Nil(t, err)
554         assert.EqualValues(t, 2, n)
555         assert.EqualValues(t, "d\n", string(b))
556 }
557
558 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
559         seederDataDir, mi := testutil.GreetingTestTorrent()
560         defer os.RemoveAll(seederDataDir)
561         cfg := TestingConfig()
562         cfg.Seed = true
563         cfg.DataDir = seederDataDir
564         seeder, err := NewClient(cfg)
565         require.Nil(t, err)
566         defer seeder.Close()
567         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
568         seederTorrent.VerifyData()
569         leecherDataDir, err := ioutil.TempDir("", "")
570         require.Nil(t, err)
571         defer os.RemoveAll(leecherDataDir)
572         cfg = TestingConfig()
573         cfg.DataDir = leecherDataDir
574         leecher, err := NewClient(cfg)
575         require.Nil(t, err)
576         defer leecher.Close()
577         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
578                 ret = TorrentSpecFromMetaInfo(mi)
579                 ret.ChunkSize = 2
580                 return
581         }())
582         leecherTorrent.AddClientPeer(seeder)
583         reader := leecherTorrent.NewReader()
584         defer reader.Close()
585         reader.SetReadahead(0)
586         reader.SetResponsive()
587         b := make([]byte, 2)
588         _, err = reader.Seek(3, io.SeekStart)
589         require.NoError(t, err)
590         _, err = io.ReadFull(reader, b)
591         assert.Nil(t, err)
592         assert.EqualValues(t, "lo", string(b))
593         go leecherTorrent.Drop()
594         _, err = reader.Seek(11, io.SeekStart)
595         require.NoError(t, err)
596         n, err := reader.Read(b)
597         assert.EqualError(t, err, "torrent closed")
598         assert.EqualValues(t, 0, n)
599 }
600
601 func TestDHTInheritBlocklist(t *testing.T) {
602         ipl := iplist.New(nil)
603         require.NotNil(t, ipl)
604         cfg := TestingConfig()
605         cfg.IPBlocklist = ipl
606         cfg.NoDHT = false
607         cl, err := NewClient(cfg)
608         require.NoError(t, err)
609         defer cl.Close()
610         numServers := 0
611         cl.eachDhtServer(func(s *dht.Server) {
612                 assert.Equal(t, ipl, s.IPBlocklist())
613                 numServers++
614         })
615         assert.EqualValues(t, 2, numServers)
616 }
617
618 // Check that stuff is merged in subsequent AddTorrentSpec for the same
619 // infohash.
620 func TestAddTorrentSpecMerging(t *testing.T) {
621         cl, err := NewClient(TestingConfig())
622         require.NoError(t, err)
623         defer cl.Close()
624         dir, mi := testutil.GreetingTestTorrent()
625         defer os.RemoveAll(dir)
626         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
627                 InfoHash: mi.HashInfoBytes(),
628         })
629         require.NoError(t, err)
630         require.True(t, new)
631         require.Nil(t, tt.Info())
632         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
633         require.NoError(t, err)
634         require.False(t, new)
635         require.NotNil(t, tt.Info())
636 }
637
638 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
639         dir, mi := testutil.GreetingTestTorrent()
640         os.RemoveAll(dir)
641         cl, _ := NewClient(TestingConfig())
642         defer cl.Close()
643         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
644                 InfoHash: mi.HashInfoBytes(),
645         })
646         tt.Drop()
647         assert.EqualValues(t, 0, len(cl.Torrents()))
648         select {
649         case <-tt.GotInfo():
650                 t.FailNow()
651         default:
652         }
653 }
654
655 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
656         for i := range iter.N(info.NumPieces()) {
657                 p := info.Piece(i)
658                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
659         }
660 }
661
662 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
663         fileCacheDir, err := ioutil.TempDir("", "")
664         require.NoError(t, err)
665         defer os.RemoveAll(fileCacheDir)
666         fileCache, err := filecache.NewCache(fileCacheDir)
667         require.NoError(t, err)
668         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
669         defer os.RemoveAll(greetingDataTempDir)
670         filePieceStore := csf(fileCache)
671         defer filePieceStore.Close()
672         info, err := greetingMetainfo.UnmarshalInfo()
673         require.NoError(t, err)
674         ih := greetingMetainfo.HashInfoBytes()
675         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
676         require.NoError(t, err)
677         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
678         // require.Equal(t, len(testutil.GreetingFileContents), written)
679         // require.NoError(t, err)
680         for i := 0; i < info.NumPieces(); i++ {
681                 p := info.Piece(i)
682                 if alreadyCompleted {
683                         require.NoError(t, greetingData.Piece(p).MarkComplete())
684                 }
685         }
686         cfg := TestingConfig()
687         // TODO: Disable network option?
688         cfg.DisableTCP = true
689         cfg.DisableUTP = true
690         cfg.DefaultStorage = filePieceStore
691         cl, err := NewClient(cfg)
692         require.NoError(t, err)
693         defer cl.Close()
694         tt, err := cl.AddTorrent(greetingMetainfo)
695         require.NoError(t, err)
696         psrs := tt.PieceStateRuns()
697         assert.Len(t, psrs, 1)
698         assert.EqualValues(t, 3, psrs[0].Length)
699         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
700         if alreadyCompleted {
701                 r := tt.NewReader()
702                 b, err := ioutil.ReadAll(r)
703                 assert.NoError(t, err)
704                 assert.EqualValues(t, testutil.GreetingFileContents, b)
705         }
706 }
707
708 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
709         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
710 }
711
712 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
713         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
714 }
715
716 func TestAddMetainfoWithNodes(t *testing.T) {
717         cfg := TestingConfig()
718         cfg.ListenHost = func(string) string { return "" }
719         cfg.NoDHT = false
720         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
721         // For now, we want to just jam the nodes into the table, without
722         // verifying them first. Also the DHT code doesn't support mixing secure
723         // and insecure nodes if security is enabled (yet).
724         // cfg.DHTConfig.NoSecurity = true
725         cl, err := NewClient(cfg)
726         require.NoError(t, err)
727         defer cl.Close()
728         sum := func() (ret int64) {
729                 cl.eachDhtServer(func(s *dht.Server) {
730                         ret += s.Stats().OutboundQueriesAttempted
731                 })
732                 return
733         }
734         assert.EqualValues(t, 0, sum())
735         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
736         require.NoError(t, err)
737         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
738         // check if the announce-list is here instead. TODO: Add nodes.
739         assert.Len(t, tt.metainfo.AnnounceList, 5)
740         // There are 6 nodes in the torrent file.
741         for sum() != int64(6*len(cl.dhtServers)) {
742                 time.Sleep(time.Millisecond)
743         }
744 }
745
746 type testDownloadCancelParams struct {
747         SetLeecherStorageCapacity bool
748         LeecherStorageCapacity    int64
749         Cancel                    bool
750 }
751
752 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
753         greetingTempDir, mi := testutil.GreetingTestTorrent()
754         defer os.RemoveAll(greetingTempDir)
755         cfg := TestingConfig()
756         cfg.Seed = true
757         cfg.DataDir = greetingTempDir
758         seeder, err := NewClient(cfg)
759         require.NoError(t, err)
760         defer seeder.Close()
761         defer testutil.ExportStatusWriter(seeder, "s")()
762         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
763         seederTorrent.VerifyData()
764         leecherDataDir, err := ioutil.TempDir("", "")
765         require.NoError(t, err)
766         defer os.RemoveAll(leecherDataDir)
767         fc, err := filecache.NewCache(leecherDataDir)
768         require.NoError(t, err)
769         if ps.SetLeecherStorageCapacity {
770                 fc.SetCapacity(ps.LeecherStorageCapacity)
771         }
772         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
773         cfg.DataDir = leecherDataDir
774         leecher, err := NewClient(cfg)
775         require.NoError(t, err)
776         defer leecher.Close()
777         defer testutil.ExportStatusWriter(leecher, "l")()
778         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
779                 ret = TorrentSpecFromMetaInfo(mi)
780                 ret.ChunkSize = 2
781                 return
782         }())
783         require.NoError(t, err)
784         assert.True(t, new)
785         psc := leecherGreeting.SubscribePieceStateChanges()
786         defer psc.Close()
787
788         leecherGreeting.cl.lock()
789         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
790         if ps.Cancel {
791                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
792         }
793         leecherGreeting.cl.unlock()
794         done := make(chan struct{})
795         defer close(done)
796         go leecherGreeting.AddClientPeer(seeder)
797         completes := make(map[int]bool, 3)
798         expected := func() map[int]bool {
799                 if ps.Cancel {
800                         return map[int]bool{0: false, 1: false, 2: false}
801                 } else {
802                         return map[int]bool{0: true, 1: true, 2: true}
803                 }
804         }()
805         for !reflect.DeepEqual(completes, expected) {
806                 _v := <-psc.Values
807                 v := _v.(PieceStateChange)
808                 completes[v.Index] = v.Complete
809         }
810 }
811
812 func TestTorrentDownloadAll(t *testing.T) {
813         testDownloadCancel(t, testDownloadCancelParams{})
814 }
815
816 func TestTorrentDownloadAllThenCancel(t *testing.T) {
817         testDownloadCancel(t, testDownloadCancelParams{
818                 Cancel: true,
819         })
820 }
821
822 // Ensure that it's an error for a peer to send an invalid have message.
823 func TestPeerInvalidHave(t *testing.T) {
824         cl, err := NewClient(TestingConfig())
825         require.NoError(t, err)
826         defer cl.Close()
827         info := metainfo.Info{
828                 PieceLength: 1,
829                 Pieces:      make([]byte, 20),
830                 Files:       []metainfo.FileInfo{{Length: 1}},
831         }
832         infoBytes, err := bencode.Marshal(info)
833         require.NoError(t, err)
834         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
835                 InfoBytes: infoBytes,
836                 InfoHash:  metainfo.HashBytes(infoBytes),
837                 Storage:   badStorage{},
838         })
839         require.NoError(t, err)
840         assert.True(t, _new)
841         defer tt.Drop()
842         cn := &connection{
843                 t: tt,
844         }
845         assert.NoError(t, cn.peerSentHave(0))
846         assert.Error(t, cn.peerSentHave(1))
847 }
848
849 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
850         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
851         defer os.RemoveAll(greetingTempDir)
852         cfg := TestingConfig()
853         cfg.DataDir = greetingTempDir
854         seeder, err := NewClient(TestingConfig())
855         require.NoError(t, err)
856         seeder.AddTorrentSpec(&TorrentSpec{
857                 InfoBytes: greetingMetainfo.InfoBytes,
858         })
859 }
860
861 // Check that when the listen port is 0, all the protocols listened on have
862 // the same port, and it isn't zero.
863 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
864         cl, err := NewClient(TestingConfig())
865         require.NoError(t, err)
866         defer cl.Close()
867         port := cl.LocalPort()
868         assert.NotEqual(t, 0, port)
869         cl.eachListener(func(s socket) bool {
870                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
871                 return true
872         })
873 }
874
875 func TestClientDynamicListenTCPOnly(t *testing.T) {
876         cfg := TestingConfig()
877         cfg.DisableUTP = true
878         cfg.DisableTCP = false
879         cl, err := NewClient(cfg)
880         require.NoError(t, err)
881         defer cl.Close()
882         assert.NotEqual(t, 0, cl.LocalPort())
883 }
884
885 func TestClientDynamicListenUTPOnly(t *testing.T) {
886         cfg := TestingConfig()
887         cfg.DisableTCP = true
888         cfg.DisableUTP = false
889         cl, err := NewClient(cfg)
890         require.NoError(t, err)
891         defer cl.Close()
892         assert.NotEqual(t, 0, cl.LocalPort())
893 }
894
895 func totalConns(tts []*Torrent) (ret int) {
896         for _, tt := range tts {
897                 tt.cl.lock()
898                 ret += len(tt.conns)
899                 tt.cl.unlock()
900         }
901         return
902 }
903
904 func TestSetMaxEstablishedConn(t *testing.T) {
905         var tts []*Torrent
906         ih := testutil.GreetingMetaInfo().HashInfoBytes()
907         cfg := TestingConfig()
908         cfg.DisableAcceptRateLimiting = true
909         cfg.dropDuplicatePeerIds = true
910         for i := range iter.N(3) {
911                 cl, err := NewClient(cfg)
912                 require.NoError(t, err)
913                 defer cl.Close()
914                 tt, _ := cl.AddTorrentInfoHash(ih)
915                 tt.SetMaxEstablishedConns(2)
916                 defer testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))()
917                 tts = append(tts, tt)
918         }
919         addPeers := func() {
920                 for _, tt := range tts {
921                         for _, _tt := range tts {
922                                 // if tt != _tt {
923                                 tt.AddClientPeer(_tt.cl)
924                                 // }
925                         }
926                 }
927         }
928         waitTotalConns := func(num int) {
929                 for totalConns(tts) != num {
930                         addPeers()
931                         time.Sleep(time.Millisecond)
932                 }
933         }
934         addPeers()
935         waitTotalConns(6)
936         tts[0].SetMaxEstablishedConns(1)
937         waitTotalConns(4)
938         tts[0].SetMaxEstablishedConns(0)
939         waitTotalConns(2)
940         tts[0].SetMaxEstablishedConns(1)
941         addPeers()
942         waitTotalConns(4)
943         tts[0].SetMaxEstablishedConns(2)
944         addPeers()
945         waitTotalConns(6)
946 }
947
948 // Creates a file containing its own name as data. Make a metainfo from that, adds it to the given
949 // client, and returns a magnet link.
950 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
951         os.MkdirAll(dir, 0770)
952         file, err := os.Create(filepath.Join(dir, name))
953         require.NoError(t, err)
954         file.Write([]byte(name))
955         file.Close()
956         mi := metainfo.MetaInfo{}
957         mi.SetDefaults()
958         info := metainfo.Info{PieceLength: 256 * 1024}
959         err = info.BuildFromFilePath(filepath.Join(dir, name))
960         require.NoError(t, err)
961         mi.InfoBytes, err = bencode.Marshal(info)
962         require.NoError(t, err)
963         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
964         tr, err := cl.AddTorrent(&mi)
965         require.NoError(t, err)
966         require.True(t, tr.Seeding())
967         tr.VerifyData()
968         return magnet
969 }
970
971 // https://github.com/anacrolix/torrent/issues/114
972 func TestMultipleTorrentsWithEncryption(t *testing.T) {
973         testSeederLeecherPair(
974                 t,
975                 func(cfg *ClientConfig) {
976                         cfg.HeaderObfuscationPolicy.Preferred = true
977                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
978                 },
979                 func(cfg *ClientConfig) {
980                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
981                 },
982         )
983 }
984
985 // Test that the leecher can download a torrent in its entirety from the seeder. Note that the
986 // seeder config is done first.
987 func testSeederLeecherPair(t *testing.T, seeder func(*ClientConfig), leecher func(*ClientConfig)) {
988         cfg := TestingConfig()
989         cfg.Seed = true
990         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
991         os.Mkdir(cfg.DataDir, 0755)
992         seeder(cfg)
993         server, err := NewClient(cfg)
994         require.NoError(t, err)
995         defer server.Close()
996         defer testutil.ExportStatusWriter(server, "s")()
997         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
998         // Extra torrents are added to test the seeder having to match incoming obfuscated headers
999         // against more than one torrent. See issue #114
1000         makeMagnet(t, server, cfg.DataDir, "test2")
1001         for i := 0; i < 100; i++ {
1002                 makeMagnet(t, server, cfg.DataDir, fmt.Sprintf("test%d", i+2))
1003         }
1004         cfg = TestingConfig()
1005         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1006         leecher(cfg)
1007         client, err := NewClient(cfg)
1008         require.NoError(t, err)
1009         defer client.Close()
1010         defer testutil.ExportStatusWriter(client, "c")()
1011         tr, err := client.AddMagnet(magnet1)
1012         require.NoError(t, err)
1013         tr.AddClientPeer(server)
1014         <-tr.GotInfo()
1015         tr.DownloadAll()
1016         client.WaitAll()
1017 }
1018
1019 // This appears to be the situation with the S3 BitTorrent client.
1020 func TestObfuscatedHeaderFallbackSeederDisallowsLeecherPrefers(t *testing.T) {
1021         // Leecher prefers obfuscation, but the seeder does not allow it.
1022         testSeederLeecherPair(
1023                 t,
1024                 func(cfg *ClientConfig) {
1025                         cfg.HeaderObfuscationPolicy.Preferred = false
1026                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1027                 },
1028                 func(cfg *ClientConfig) {
1029                         cfg.HeaderObfuscationPolicy.Preferred = true
1030                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1031                 },
1032         )
1033 }
1034
1035 func TestObfuscatedHeaderFallbackSeederRequiresLeecherPrefersNot(t *testing.T) {
1036         // Leecher prefers no obfuscation, but the seeder enforces it.
1037         testSeederLeecherPair(
1038                 t,
1039                 func(cfg *ClientConfig) {
1040                         cfg.HeaderObfuscationPolicy.Preferred = true
1041                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1042                 },
1043                 func(cfg *ClientConfig) {
1044                         cfg.HeaderObfuscationPolicy.Preferred = false
1045                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1046                 },
1047         )
1048 }
1049
1050 func TestClientAddressInUse(t *testing.T) {
1051         s, _ := NewUtpSocket("udp", ":50007", nil)
1052         if s != nil {
1053                 defer s.Close()
1054         }
1055         cfg := TestingConfig().SetListenAddr(":50007")
1056         cl, err := NewClient(cfg)
1057         require.Error(t, err)
1058         require.Nil(t, cl)
1059 }
1060
1061 func TestClientHasDhtServersWhenUtpDisabled(t *testing.T) {
1062         cc := TestingConfig()
1063         cc.DisableUTP = true
1064         cc.NoDHT = false
1065         cl, err := NewClient(cc)
1066         require.NoError(t, err)
1067         defer cl.Close()
1068         assert.NotEmpty(t, cl.DhtServers())
1069 }