]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Use subtests in TestClientTransferVarious
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "log"
9         "os"
10         "path/filepath"
11         "reflect"
12         "sync"
13         "testing"
14         "time"
15
16         "github.com/bradfitz/iter"
17         "github.com/stretchr/testify/assert"
18         "github.com/stretchr/testify/require"
19         "golang.org/x/time/rate"
20
21         "github.com/anacrolix/dht/v2"
22         _ "github.com/anacrolix/envpprof"
23         "github.com/anacrolix/missinggo"
24         "github.com/anacrolix/missinggo/filecache"
25
26         "github.com/anacrolix/torrent/bencode"
27         "github.com/anacrolix/torrent/internal/testutil"
28         "github.com/anacrolix/torrent/iplist"
29         "github.com/anacrolix/torrent/metainfo"
30         "github.com/anacrolix/torrent/storage"
31 )
32
33 func TestingConfig() *ClientConfig {
34         cfg := NewDefaultClientConfig()
35         cfg.ListenHost = LoopbackListenHost
36         cfg.NoDHT = true
37         cfg.DataDir = tempDir()
38         cfg.DisableTrackers = true
39         cfg.NoDefaultPortForwarding = true
40         cfg.DisableAcceptRateLimiting = true
41         cfg.ListenPort = 0
42         //cfg.Debug = true
43         //cfg.Logger = cfg.Logger.WithText(func(m log.Msg) string {
44         //      t := m.Text()
45         //      m.Values(func(i interface{}) bool {
46         //              t += fmt.Sprintf("\n%[1]T: %[1]v", i)
47         //              return true
48         //      })
49         //      return t
50         //})
51         return cfg
52 }
53
54 func TestClientDefault(t *testing.T) {
55         cl, err := NewClient(TestingConfig())
56         require.NoError(t, err)
57         cl.Close()
58 }
59
60 func TestClientNilConfig(t *testing.T) {
61         cl, err := NewClient(nil)
62         require.NoError(t, err)
63         cl.Close()
64 }
65
66 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
67         cfg := TestingConfig()
68         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
69         require.NoError(t, err)
70         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
71         defer ci.Close()
72         cfg.DefaultStorage = ci
73         cl, err := NewClient(cfg)
74         require.NoError(t, err)
75         cl.Close()
76         // And again, https://github.com/anacrolix/torrent/issues/158
77         cl, err = NewClient(cfg)
78         require.NoError(t, err)
79         cl.Close()
80 }
81
82 func TestAddDropTorrent(t *testing.T) {
83         cl, err := NewClient(TestingConfig())
84         require.NoError(t, err)
85         defer cl.Close()
86         dir, mi := testutil.GreetingTestTorrent()
87         defer os.RemoveAll(dir)
88         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
89         require.NoError(t, err)
90         assert.True(t, new)
91         tt.SetMaxEstablishedConns(0)
92         tt.SetMaxEstablishedConns(1)
93         tt.Drop()
94 }
95
96 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
97         // TODO?
98         t.SkipNow()
99 }
100
101 func TestAddTorrentNoUsableURLs(t *testing.T) {
102         // TODO?
103         t.SkipNow()
104 }
105
106 func TestAddPeersToUnknownTorrent(t *testing.T) {
107         // TODO?
108         t.SkipNow()
109 }
110
111 func TestPieceHashSize(t *testing.T) {
112         assert.Equal(t, 20, pieceHash.Size())
113 }
114
115 func TestTorrentInitialState(t *testing.T) {
116         dir, mi := testutil.GreetingTestTorrent()
117         defer os.RemoveAll(dir)
118         cl := &Client{
119                 config: TestingConfig(),
120         }
121         cl.initLogger()
122         tor := cl.newTorrent(
123                 mi.HashInfoBytes(),
124                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
125         )
126         tor.setChunkSize(2)
127         tor.cl.lock()
128         err := tor.setInfoBytes(mi.InfoBytes)
129         tor.cl.unlock()
130         require.NoError(t, err)
131         require.Len(t, tor.pieces, 3)
132         tor.pendAllChunkSpecs(0)
133         tor.cl.lock()
134         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
135         tor.cl.unlock()
136         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
137 }
138
139 func TestReducedDialTimeout(t *testing.T) {
140         cfg := NewDefaultClientConfig()
141         for _, _case := range []struct {
142                 Max             time.Duration
143                 HalfOpenLimit   int
144                 PendingPeers    int
145                 ExpectedReduced time.Duration
146         }{
147                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
148                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
149                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
150                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
151                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
152                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
153         } {
154                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
155                 expected := _case.ExpectedReduced
156                 if expected < cfg.MinDialTimeout {
157                         expected = cfg.MinDialTimeout
158                 }
159                 if reduced != expected {
160                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
161                 }
162         }
163 }
164
165 func TestAddDropManyTorrents(t *testing.T) {
166         cl, err := NewClient(TestingConfig())
167         require.NoError(t, err)
168         defer cl.Close()
169         for i := range iter.N(1000) {
170                 var spec TorrentSpec
171                 binary.PutVarint(spec.InfoHash[:], int64(i))
172                 tt, new, err := cl.AddTorrentSpec(&spec)
173                 assert.NoError(t, err)
174                 assert.True(t, new)
175                 defer tt.Drop()
176         }
177 }
178
179 type fileCacheClientStorageFactoryParams struct {
180         Capacity    int64
181         SetCapacity bool
182         Wrapper     func(*filecache.Cache) storage.ClientImpl
183 }
184
185 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
186         return func(dataDir string) storage.ClientImpl {
187                 fc, err := filecache.NewCache(dataDir)
188                 if err != nil {
189                         panic(err)
190                 }
191                 if ps.SetCapacity {
192                         fc.SetCapacity(ps.Capacity)
193                 }
194                 return ps.Wrapper(fc)
195         }
196 }
197
198 type storageFactory func(string) storage.ClientImpl
199
200 func TestClientTransferDefault(t *testing.T) {
201         testClientTransfer(t, testClientTransferParams{
202                 ExportClientStatus: true,
203                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
204                         Wrapper: fileCachePieceResourceStorage,
205                 }),
206         })
207 }
208
209 func TestClientTransferRateLimitedUpload(t *testing.T) {
210         started := time.Now()
211         testClientTransfer(t, testClientTransferParams{
212                 // We are uploading 13 bytes (the length of the greeting torrent). The
213                 // chunks are 2 bytes in length. Then the smallest burst we can run
214                 // with is 2. Time taken is (13-burst)/rate.
215                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
216                 ExportClientStatus:      true,
217         })
218         require.True(t, time.Since(started) > time.Second)
219 }
220
221 func TestClientTransferRateLimitedDownload(t *testing.T) {
222         testClientTransfer(t, testClientTransferParams{
223                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
224         })
225 }
226
227 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
228         return storage.NewResourcePieces(fc.AsResourceProvider())
229 }
230
231 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
232         testClientTransfer(t, testClientTransferParams{
233                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
234                         SetCapacity: true,
235                         // Going below the piece length means it can't complete a piece so
236                         // that it can be hashed.
237                         Capacity: 5,
238                         Wrapper:  fileCachePieceResourceStorage,
239                 }),
240                 SetReadahead: setReadahead,
241                 // Can't readahead too far or the cache will thrash and drop data we
242                 // thought we had.
243                 Readahead:          readahead,
244                 ExportClientStatus: true,
245         })
246 }
247
248 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
249         testClientTransferSmallCache(t, true, 5)
250 }
251
252 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
253         testClientTransferSmallCache(t, true, 15)
254 }
255
256 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
257         testClientTransferSmallCache(t, false, -1)
258 }
259
260 func TestClientTransferVarious(t *testing.T) {
261         // Leecher storage
262         for _, ls := range []struct {
263                 name string
264                 f    storageFactory
265         }{
266                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
267                         Wrapper: fileCachePieceResourceStorage,
268                 })},
269                 {"Boltdb", storage.NewBoltDB},
270         } {
271                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
272                         // Seeder storage
273                         for _, ss := range []struct {
274                                 name string
275                                 f    func(string) storage.ClientImpl
276                         }{
277                                 {"File", storage.NewFile},
278                                 {"Mmap", storage.NewMMap},
279                         } {
280                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
281                                         for _, responsive := range []bool{false, true} {
282                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
283                                                         t.Run("NoReadahead", func(t *testing.T) {
284                                                                 testClientTransfer(t, testClientTransferParams{
285                                                                         Responsive:     responsive,
286                                                                         SeederStorage:  ss.f,
287                                                                         LeecherStorage: ls.f,
288                                                                 })
289                                                         })
290                                                         for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
291                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
292                                                                         testClientTransfer(t, testClientTransferParams{
293                                                                                 SeederStorage:  ss.f,
294                                                                                 Responsive:     responsive,
295                                                                                 SetReadahead:   true,
296                                                                                 Readahead:      readahead,
297                                                                                 LeecherStorage: ls.f,
298                                                                         })
299                                                                 })
300                                                         }
301                                                 })
302                                         }
303                                 })
304                         }
305                 })
306         }
307 }
308
309 type testClientTransferParams struct {
310         Responsive                 bool
311         Readahead                  int64
312         SetReadahead               bool
313         ExportClientStatus         bool
314         LeecherStorage             func(string) storage.ClientImpl
315         SeederStorage              func(string) storage.ClientImpl
316         SeederUploadRateLimiter    *rate.Limiter
317         LeecherDownloadRateLimiter *rate.Limiter
318 }
319
320 func logPieceStateChanges(t *Torrent) {
321         sub := t.SubscribePieceStateChanges()
322         go func() {
323                 defer sub.Close()
324                 for e := range sub.Values {
325                         log.Printf("%p %#v", t, e)
326                 }
327         }()
328 }
329
330 // Creates a seeder and a leecher, and ensures the data transfers when a read
331 // is attempted on the leecher.
332 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
333         greetingTempDir, mi := testutil.GreetingTestTorrent()
334         defer os.RemoveAll(greetingTempDir)
335         // Create seeder and a Torrent.
336         cfg := TestingConfig()
337         cfg.Seed = true
338         if ps.SeederUploadRateLimiter != nil {
339                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
340         }
341         // cfg.ListenAddr = "localhost:4000"
342         if ps.SeederStorage != nil {
343                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
344                 defer cfg.DefaultStorage.Close()
345         } else {
346                 cfg.DataDir = greetingTempDir
347         }
348         seeder, err := NewClient(cfg)
349         require.NoError(t, err)
350         if ps.ExportClientStatus {
351                 defer testutil.ExportStatusWriter(seeder, "s")()
352         }
353         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
354         // Run a Stats right after Closing the Client. This will trigger the Stats
355         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
356         defer seederTorrent.Stats()
357         defer seeder.Close()
358         seederTorrent.VerifyData()
359         // Create leecher and a Torrent.
360         leecherDataDir, err := ioutil.TempDir("", "")
361         require.NoError(t, err)
362         defer os.RemoveAll(leecherDataDir)
363         cfg = TestingConfig()
364         if ps.LeecherStorage == nil {
365                 cfg.DataDir = leecherDataDir
366         } else {
367                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
368         }
369         if ps.LeecherDownloadRateLimiter != nil {
370                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
371         }
372         cfg.Seed = false
373         //cfg.Debug = true
374         leecher, err := NewClient(cfg)
375         require.NoError(t, err)
376         defer leecher.Close()
377         if ps.ExportClientStatus {
378                 defer testutil.ExportStatusWriter(leecher, "l")()
379         }
380         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
381                 ret = TorrentSpecFromMetaInfo(mi)
382                 ret.ChunkSize = 2
383                 return
384         }())
385         require.NoError(t, err)
386         assert.True(t, new)
387
388         //// This was used when observing coalescing of piece state changes.
389         //logPieceStateChanges(leecherTorrent)
390
391         // Now do some things with leecher and seeder.
392         leecherTorrent.AddClientPeer(seeder)
393         // The Torrent should not be interested in obtaining peers, so the one we
394         // just added should be the only one.
395         assert.False(t, leecherTorrent.Seeding())
396         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
397         r := leecherTorrent.NewReader()
398         defer r.Close()
399         if ps.Responsive {
400                 r.SetResponsive()
401         }
402         if ps.SetReadahead {
403                 r.SetReadahead(ps.Readahead)
404         }
405         assertReadAllGreeting(t, r)
406
407         seederStats := seederTorrent.Stats()
408         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
409         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
410
411         leecherStats := leecherTorrent.Stats()
412         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
413         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
414
415         // Try reading through again for the cases where the torrent data size
416         // exceeds the size of the cache.
417         assertReadAllGreeting(t, r)
418 }
419
420 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
421         pos, err := r.Seek(0, io.SeekStart)
422         assert.NoError(t, err)
423         assert.EqualValues(t, 0, pos)
424         _greeting, err := ioutil.ReadAll(r)
425         assert.NoError(t, err)
426         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
427 }
428
429 // Check that after completing leeching, a leecher transitions to a seeding
430 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
431 func TestSeedAfterDownloading(t *testing.T) {
432         greetingTempDir, mi := testutil.GreetingTestTorrent()
433         defer os.RemoveAll(greetingTempDir)
434
435         cfg := TestingConfig()
436         cfg.Seed = true
437         cfg.DataDir = greetingTempDir
438         seeder, err := NewClient(cfg)
439         require.NoError(t, err)
440         defer seeder.Close()
441         defer testutil.ExportStatusWriter(seeder, "s")()
442         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
443         require.NoError(t, err)
444         assert.True(t, ok)
445         seederTorrent.VerifyData()
446
447         cfg = TestingConfig()
448         cfg.Seed = true
449         cfg.DataDir, err = ioutil.TempDir("", "")
450         require.NoError(t, err)
451         defer os.RemoveAll(cfg.DataDir)
452         leecher, err := NewClient(cfg)
453         require.NoError(t, err)
454         defer leecher.Close()
455         defer testutil.ExportStatusWriter(leecher, "l")()
456
457         cfg = TestingConfig()
458         cfg.Seed = false
459         cfg.DataDir, err = ioutil.TempDir("", "")
460         require.NoError(t, err)
461         defer os.RemoveAll(cfg.DataDir)
462         leecherLeecher, _ := NewClient(cfg)
463         require.NoError(t, err)
464         defer leecherLeecher.Close()
465         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
466         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
467                 ret = TorrentSpecFromMetaInfo(mi)
468                 ret.ChunkSize = 2
469                 return
470         }())
471         require.NoError(t, err)
472         assert.True(t, ok)
473         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
474                 ret = TorrentSpecFromMetaInfo(mi)
475                 ret.ChunkSize = 3
476                 return
477         }())
478         require.NoError(t, err)
479         assert.True(t, ok)
480         // Simultaneously DownloadAll in Leecher, and read the contents
481         // consecutively in LeecherLeecher. This non-deterministically triggered a
482         // case where the leecher wouldn't unchoke the LeecherLeecher.
483         var wg sync.WaitGroup
484         wg.Add(1)
485         go func() {
486                 defer wg.Done()
487                 r := llg.NewReader()
488                 defer r.Close()
489                 b, err := ioutil.ReadAll(r)
490                 require.NoError(t, err)
491                 assert.EqualValues(t, testutil.GreetingFileContents, b)
492         }()
493         done := make(chan struct{})
494         defer close(done)
495         go leecherGreeting.AddClientPeer(seeder)
496         go leecherGreeting.AddClientPeer(leecherLeecher)
497         wg.Add(1)
498         go func() {
499                 defer wg.Done()
500                 leecherGreeting.DownloadAll()
501                 leecher.WaitAll()
502         }()
503         wg.Wait()
504 }
505
506 func TestMergingTrackersByAddingSpecs(t *testing.T) {
507         cl, err := NewClient(TestingConfig())
508         require.NoError(t, err)
509         defer cl.Close()
510         spec := TorrentSpec{}
511         T, new, _ := cl.AddTorrentSpec(&spec)
512         if !new {
513                 t.FailNow()
514         }
515         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
516         _, new, _ = cl.AddTorrentSpec(&spec)
517         assert.False(t, new)
518         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
519         // Because trackers are disabled in TestingConfig.
520         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
521 }
522
523 // We read from a piece which is marked completed, but is missing data.
524 func TestCompletedPieceWrongSize(t *testing.T) {
525         cfg := TestingConfig()
526         cfg.DefaultStorage = badStorage{}
527         cl, err := NewClient(cfg)
528         require.NoError(t, err)
529         defer cl.Close()
530         info := metainfo.Info{
531                 PieceLength: 15,
532                 Pieces:      make([]byte, 20),
533                 Files: []metainfo.FileInfo{
534                         {Path: []string{"greeting"}, Length: 13},
535                 },
536         }
537         b, err := bencode.Marshal(info)
538         require.NoError(t, err)
539         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
540                 InfoBytes: b,
541                 InfoHash:  metainfo.HashBytes(b),
542         })
543         require.NoError(t, err)
544         defer tt.Drop()
545         assert.True(t, new)
546         r := tt.NewReader()
547         defer r.Close()
548         b, err = ioutil.ReadAll(r)
549         assert.Len(t, b, 13)
550         assert.NoError(t, err)
551 }
552
553 func BenchmarkAddLargeTorrent(b *testing.B) {
554         cfg := TestingConfig()
555         cfg.DisableTCP = true
556         cfg.DisableUTP = true
557         cl, err := NewClient(cfg)
558         require.NoError(b, err)
559         defer cl.Close()
560         b.ReportAllocs()
561         for range iter.N(b.N) {
562                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
563                 if err != nil {
564                         b.Fatal(err)
565                 }
566                 t.Drop()
567         }
568 }
569
570 func TestResponsive(t *testing.T) {
571         seederDataDir, mi := testutil.GreetingTestTorrent()
572         defer os.RemoveAll(seederDataDir)
573         cfg := TestingConfig()
574         cfg.Seed = true
575         cfg.DataDir = seederDataDir
576         seeder, err := NewClient(cfg)
577         require.Nil(t, err)
578         defer seeder.Close()
579         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
580         seederTorrent.VerifyData()
581         leecherDataDir, err := ioutil.TempDir("", "")
582         require.Nil(t, err)
583         defer os.RemoveAll(leecherDataDir)
584         cfg = TestingConfig()
585         cfg.DataDir = leecherDataDir
586         leecher, err := NewClient(cfg)
587         require.Nil(t, err)
588         defer leecher.Close()
589         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
590                 ret = TorrentSpecFromMetaInfo(mi)
591                 ret.ChunkSize = 2
592                 return
593         }())
594         leecherTorrent.AddClientPeer(seeder)
595         reader := leecherTorrent.NewReader()
596         defer reader.Close()
597         reader.SetReadahead(0)
598         reader.SetResponsive()
599         b := make([]byte, 2)
600         _, err = reader.Seek(3, io.SeekStart)
601         require.NoError(t, err)
602         _, err = io.ReadFull(reader, b)
603         assert.Nil(t, err)
604         assert.EqualValues(t, "lo", string(b))
605         _, err = reader.Seek(11, io.SeekStart)
606         require.NoError(t, err)
607         n, err := io.ReadFull(reader, b)
608         assert.Nil(t, err)
609         assert.EqualValues(t, 2, n)
610         assert.EqualValues(t, "d\n", string(b))
611 }
612
613 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
614         seederDataDir, mi := testutil.GreetingTestTorrent()
615         defer os.RemoveAll(seederDataDir)
616         cfg := TestingConfig()
617         cfg.Seed = true
618         cfg.DataDir = seederDataDir
619         seeder, err := NewClient(cfg)
620         require.Nil(t, err)
621         defer seeder.Close()
622         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
623         seederTorrent.VerifyData()
624         leecherDataDir, err := ioutil.TempDir("", "")
625         require.Nil(t, err)
626         defer os.RemoveAll(leecherDataDir)
627         cfg = TestingConfig()
628         cfg.DataDir = leecherDataDir
629         leecher, err := NewClient(cfg)
630         require.Nil(t, err)
631         defer leecher.Close()
632         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
633                 ret = TorrentSpecFromMetaInfo(mi)
634                 ret.ChunkSize = 2
635                 return
636         }())
637         leecherTorrent.AddClientPeer(seeder)
638         reader := leecherTorrent.NewReader()
639         defer reader.Close()
640         reader.SetReadahead(0)
641         reader.SetResponsive()
642         b := make([]byte, 2)
643         _, err = reader.Seek(3, io.SeekStart)
644         require.NoError(t, err)
645         _, err = io.ReadFull(reader, b)
646         assert.Nil(t, err)
647         assert.EqualValues(t, "lo", string(b))
648         go leecherTorrent.Drop()
649         _, err = reader.Seek(11, io.SeekStart)
650         require.NoError(t, err)
651         n, err := reader.Read(b)
652         assert.EqualError(t, err, "torrent closed")
653         assert.EqualValues(t, 0, n)
654 }
655
656 func TestDHTInheritBlocklist(t *testing.T) {
657         ipl := iplist.New(nil)
658         require.NotNil(t, ipl)
659         cfg := TestingConfig()
660         cfg.IPBlocklist = ipl
661         cfg.NoDHT = false
662         cl, err := NewClient(cfg)
663         require.NoError(t, err)
664         defer cl.Close()
665         numServers := 0
666         cl.eachDhtServer(func(s *dht.Server) {
667                 assert.Equal(t, ipl, s.IPBlocklist())
668                 numServers++
669         })
670         assert.EqualValues(t, 2, numServers)
671 }
672
673 // Check that stuff is merged in subsequent AddTorrentSpec for the same
674 // infohash.
675 func TestAddTorrentSpecMerging(t *testing.T) {
676         cl, err := NewClient(TestingConfig())
677         require.NoError(t, err)
678         defer cl.Close()
679         dir, mi := testutil.GreetingTestTorrent()
680         defer os.RemoveAll(dir)
681         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
682                 InfoHash: mi.HashInfoBytes(),
683         })
684         require.NoError(t, err)
685         require.True(t, new)
686         require.Nil(t, tt.Info())
687         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
688         require.NoError(t, err)
689         require.False(t, new)
690         require.NotNil(t, tt.Info())
691 }
692
693 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
694         dir, mi := testutil.GreetingTestTorrent()
695         os.RemoveAll(dir)
696         cl, _ := NewClient(TestingConfig())
697         defer cl.Close()
698         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
699                 InfoHash: mi.HashInfoBytes(),
700         })
701         tt.Drop()
702         assert.EqualValues(t, 0, len(cl.Torrents()))
703         select {
704         case <-tt.GotInfo():
705                 t.FailNow()
706         default:
707         }
708 }
709
710 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
711         for i := range iter.N(info.NumPieces()) {
712                 p := info.Piece(i)
713                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
714         }
715 }
716
717 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
718         fileCacheDir, err := ioutil.TempDir("", "")
719         require.NoError(t, err)
720         defer os.RemoveAll(fileCacheDir)
721         fileCache, err := filecache.NewCache(fileCacheDir)
722         require.NoError(t, err)
723         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
724         defer os.RemoveAll(greetingDataTempDir)
725         filePieceStore := csf(fileCache)
726         defer filePieceStore.Close()
727         info, err := greetingMetainfo.UnmarshalInfo()
728         require.NoError(t, err)
729         ih := greetingMetainfo.HashInfoBytes()
730         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
731         require.NoError(t, err)
732         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
733         // require.Equal(t, len(testutil.GreetingFileContents), written)
734         // require.NoError(t, err)
735         for i := 0; i < info.NumPieces(); i++ {
736                 p := info.Piece(i)
737                 if alreadyCompleted {
738                         require.NoError(t, greetingData.Piece(p).MarkComplete())
739                 }
740         }
741         cfg := TestingConfig()
742         // TODO: Disable network option?
743         cfg.DisableTCP = true
744         cfg.DisableUTP = true
745         cfg.DefaultStorage = filePieceStore
746         cl, err := NewClient(cfg)
747         require.NoError(t, err)
748         defer cl.Close()
749         tt, err := cl.AddTorrent(greetingMetainfo)
750         require.NoError(t, err)
751         psrs := tt.PieceStateRuns()
752         assert.Len(t, psrs, 1)
753         assert.EqualValues(t, 3, psrs[0].Length)
754         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
755         if alreadyCompleted {
756                 r := tt.NewReader()
757                 b, err := ioutil.ReadAll(r)
758                 assert.NoError(t, err)
759                 assert.EqualValues(t, testutil.GreetingFileContents, b)
760         }
761 }
762
763 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
764         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
765 }
766
767 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
768         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
769 }
770
771 func TestAddMetainfoWithNodes(t *testing.T) {
772         cfg := TestingConfig()
773         cfg.ListenHost = func(string) string { return "" }
774         cfg.NoDHT = false
775         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
776         // For now, we want to just jam the nodes into the table, without
777         // verifying them first. Also the DHT code doesn't support mixing secure
778         // and insecure nodes if security is enabled (yet).
779         // cfg.DHTConfig.NoSecurity = true
780         cl, err := NewClient(cfg)
781         require.NoError(t, err)
782         defer cl.Close()
783         sum := func() (ret int64) {
784                 cl.eachDhtServer(func(s *dht.Server) {
785                         ret += s.Stats().OutboundQueriesAttempted
786                 })
787                 return
788         }
789         assert.EqualValues(t, 0, sum())
790         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
791         require.NoError(t, err)
792         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
793         // check if the announce-list is here instead. TODO: Add nodes.
794         assert.Len(t, tt.metainfo.AnnounceList, 5)
795         // There are 6 nodes in the torrent file.
796         for sum() != int64(6*len(cl.dhtServers)) {
797                 time.Sleep(time.Millisecond)
798         }
799 }
800
801 type testDownloadCancelParams struct {
802         SetLeecherStorageCapacity bool
803         LeecherStorageCapacity    int64
804         Cancel                    bool
805 }
806
807 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
808         greetingTempDir, mi := testutil.GreetingTestTorrent()
809         defer os.RemoveAll(greetingTempDir)
810         cfg := TestingConfig()
811         cfg.Seed = true
812         cfg.DataDir = greetingTempDir
813         seeder, err := NewClient(cfg)
814         require.NoError(t, err)
815         defer seeder.Close()
816         defer testutil.ExportStatusWriter(seeder, "s")()
817         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
818         seederTorrent.VerifyData()
819         leecherDataDir, err := ioutil.TempDir("", "")
820         require.NoError(t, err)
821         defer os.RemoveAll(leecherDataDir)
822         fc, err := filecache.NewCache(leecherDataDir)
823         require.NoError(t, err)
824         if ps.SetLeecherStorageCapacity {
825                 fc.SetCapacity(ps.LeecherStorageCapacity)
826         }
827         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
828         cfg.DataDir = leecherDataDir
829         leecher, err := NewClient(cfg)
830         require.NoError(t, err)
831         defer leecher.Close()
832         defer testutil.ExportStatusWriter(leecher, "l")()
833         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
834                 ret = TorrentSpecFromMetaInfo(mi)
835                 ret.ChunkSize = 2
836                 return
837         }())
838         require.NoError(t, err)
839         assert.True(t, new)
840         psc := leecherGreeting.SubscribePieceStateChanges()
841         defer psc.Close()
842
843         leecherGreeting.cl.lock()
844         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
845         if ps.Cancel {
846                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
847         }
848         leecherGreeting.cl.unlock()
849         done := make(chan struct{})
850         defer close(done)
851         go leecherGreeting.AddClientPeer(seeder)
852         completes := make(map[int]bool, 3)
853         expected := func() map[int]bool {
854                 if ps.Cancel {
855                         return map[int]bool{0: false, 1: false, 2: false}
856                 } else {
857                         return map[int]bool{0: true, 1: true, 2: true}
858                 }
859         }()
860         for !reflect.DeepEqual(completes, expected) {
861                 _v := <-psc.Values
862                 v := _v.(PieceStateChange)
863                 completes[v.Index] = v.Complete
864         }
865 }
866
867 func TestTorrentDownloadAll(t *testing.T) {
868         testDownloadCancel(t, testDownloadCancelParams{})
869 }
870
871 func TestTorrentDownloadAllThenCancel(t *testing.T) {
872         testDownloadCancel(t, testDownloadCancelParams{
873                 Cancel: true,
874         })
875 }
876
877 // Ensure that it's an error for a peer to send an invalid have message.
878 func TestPeerInvalidHave(t *testing.T) {
879         cl, err := NewClient(TestingConfig())
880         require.NoError(t, err)
881         defer cl.Close()
882         info := metainfo.Info{
883                 PieceLength: 1,
884                 Pieces:      make([]byte, 20),
885                 Files:       []metainfo.FileInfo{{Length: 1}},
886         }
887         infoBytes, err := bencode.Marshal(info)
888         require.NoError(t, err)
889         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
890                 InfoBytes: infoBytes,
891                 InfoHash:  metainfo.HashBytes(infoBytes),
892                 Storage:   badStorage{},
893         })
894         require.NoError(t, err)
895         assert.True(t, _new)
896         defer tt.Drop()
897         cn := &connection{
898                 t: tt,
899         }
900         assert.NoError(t, cn.peerSentHave(0))
901         assert.Error(t, cn.peerSentHave(1))
902 }
903
904 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
905         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
906         defer os.RemoveAll(greetingTempDir)
907         cfg := TestingConfig()
908         cfg.DataDir = greetingTempDir
909         seeder, err := NewClient(TestingConfig())
910         require.NoError(t, err)
911         seeder.AddTorrentSpec(&TorrentSpec{
912                 InfoBytes: greetingMetainfo.InfoBytes,
913         })
914 }
915
916 // Check that when the listen port is 0, all the protocols listened on have
917 // the same port, and it isn't zero.
918 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
919         cl, err := NewClient(TestingConfig())
920         require.NoError(t, err)
921         defer cl.Close()
922         port := cl.LocalPort()
923         assert.NotEqual(t, 0, port)
924         cl.eachListener(func(s socket) bool {
925                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
926                 return true
927         })
928 }
929
930 func TestClientDynamicListenTCPOnly(t *testing.T) {
931         cfg := TestingConfig()
932         cfg.DisableUTP = true
933         cfg.DisableTCP = false
934         cl, err := NewClient(cfg)
935         require.NoError(t, err)
936         defer cl.Close()
937         assert.NotEqual(t, 0, cl.LocalPort())
938 }
939
940 func TestClientDynamicListenUTPOnly(t *testing.T) {
941         cfg := TestingConfig()
942         cfg.DisableTCP = true
943         cfg.DisableUTP = false
944         cl, err := NewClient(cfg)
945         require.NoError(t, err)
946         defer cl.Close()
947         assert.NotEqual(t, 0, cl.LocalPort())
948 }
949
950 func totalConns(tts []*Torrent) (ret int) {
951         for _, tt := range tts {
952                 tt.cl.lock()
953                 ret += len(tt.conns)
954                 tt.cl.unlock()
955         }
956         return
957 }
958
959 func TestSetMaxEstablishedConn(t *testing.T) {
960         var tts []*Torrent
961         ih := testutil.GreetingMetaInfo().HashInfoBytes()
962         cfg := TestingConfig()
963         cfg.DisableAcceptRateLimiting = true
964         cfg.dropDuplicatePeerIds = true
965         for i := range iter.N(3) {
966                 cl, err := NewClient(cfg)
967                 require.NoError(t, err)
968                 defer cl.Close()
969                 tt, _ := cl.AddTorrentInfoHash(ih)
970                 tt.SetMaxEstablishedConns(2)
971                 defer testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))()
972                 tts = append(tts, tt)
973         }
974         addPeers := func() {
975                 for _, tt := range tts {
976                         for _, _tt := range tts {
977                                 // if tt != _tt {
978                                 tt.AddClientPeer(_tt.cl)
979                                 // }
980                         }
981                 }
982         }
983         waitTotalConns := func(num int) {
984                 for totalConns(tts) != num {
985                         addPeers()
986                         time.Sleep(time.Millisecond)
987                 }
988         }
989         addPeers()
990         waitTotalConns(6)
991         tts[0].SetMaxEstablishedConns(1)
992         waitTotalConns(4)
993         tts[0].SetMaxEstablishedConns(0)
994         waitTotalConns(2)
995         tts[0].SetMaxEstablishedConns(1)
996         addPeers()
997         waitTotalConns(4)
998         tts[0].SetMaxEstablishedConns(2)
999         addPeers()
1000         waitTotalConns(6)
1001 }
1002
1003 // Creates a file containing its own name as data. Make a metainfo from that, adds it to the given
1004 // client, and returns a magnet link.
1005 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1006         os.MkdirAll(dir, 0770)
1007         file, err := os.Create(filepath.Join(dir, name))
1008         require.NoError(t, err)
1009         file.Write([]byte(name))
1010         file.Close()
1011         mi := metainfo.MetaInfo{}
1012         mi.SetDefaults()
1013         info := metainfo.Info{PieceLength: 256 * 1024}
1014         err = info.BuildFromFilePath(filepath.Join(dir, name))
1015         require.NoError(t, err)
1016         mi.InfoBytes, err = bencode.Marshal(info)
1017         require.NoError(t, err)
1018         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1019         tr, err := cl.AddTorrent(&mi)
1020         require.NoError(t, err)
1021         require.True(t, tr.Seeding())
1022         tr.VerifyData()
1023         return magnet
1024 }
1025
1026 // https://github.com/anacrolix/torrent/issues/114
1027 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1028         testSeederLeecherPair(
1029                 t,
1030                 func(cfg *ClientConfig) {
1031                         cfg.HeaderObfuscationPolicy.Preferred = true
1032                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1033                 },
1034                 func(cfg *ClientConfig) {
1035                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1036                 },
1037         )
1038 }
1039
1040 // Test that the leecher can download a torrent in its entirety from the seeder. Note that the
1041 // seeder config is done first.
1042 func testSeederLeecherPair(t *testing.T, seeder func(*ClientConfig), leecher func(*ClientConfig)) {
1043         cfg := TestingConfig()
1044         cfg.Seed = true
1045         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1046         os.Mkdir(cfg.DataDir, 0755)
1047         seeder(cfg)
1048         server, err := NewClient(cfg)
1049         require.NoError(t, err)
1050         defer server.Close()
1051         defer testutil.ExportStatusWriter(server, "s")()
1052         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1053         // Extra torrents are added to test the seeder having to match incoming obfuscated headers
1054         // against more than one torrent. See issue #114
1055         makeMagnet(t, server, cfg.DataDir, "test2")
1056         for i := 0; i < 100; i++ {
1057                 makeMagnet(t, server, cfg.DataDir, fmt.Sprintf("test%d", i+2))
1058         }
1059         cfg = TestingConfig()
1060         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1061         leecher(cfg)
1062         client, err := NewClient(cfg)
1063         require.NoError(t, err)
1064         defer client.Close()
1065         defer testutil.ExportStatusWriter(client, "c")()
1066         tr, err := client.AddMagnet(magnet1)
1067         require.NoError(t, err)
1068         tr.AddClientPeer(server)
1069         <-tr.GotInfo()
1070         tr.DownloadAll()
1071         client.WaitAll()
1072 }
1073
1074 // This appears to be the situation with the S3 BitTorrent client.
1075 func TestObfuscatedHeaderFallbackSeederDisallowsLeecherPrefers(t *testing.T) {
1076         // Leecher prefers obfuscation, but the seeder does not allow it.
1077         testSeederLeecherPair(
1078                 t,
1079                 func(cfg *ClientConfig) {
1080                         cfg.HeaderObfuscationPolicy.Preferred = false
1081                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1082                 },
1083                 func(cfg *ClientConfig) {
1084                         cfg.HeaderObfuscationPolicy.Preferred = true
1085                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1086                 },
1087         )
1088 }
1089
1090 func TestObfuscatedHeaderFallbackSeederRequiresLeecherPrefersNot(t *testing.T) {
1091         // Leecher prefers no obfuscation, but the seeder enforces it.
1092         testSeederLeecherPair(
1093                 t,
1094                 func(cfg *ClientConfig) {
1095                         cfg.HeaderObfuscationPolicy.Preferred = true
1096                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1097                 },
1098                 func(cfg *ClientConfig) {
1099                         cfg.HeaderObfuscationPolicy.Preferred = false
1100                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1101                 },
1102         )
1103 }
1104
1105 func TestClientAddressInUse(t *testing.T) {
1106         s, _ := NewUtpSocket("udp", ":50007", nil)
1107         if s != nil {
1108                 defer s.Close()
1109         }
1110         cfg := TestingConfig().SetListenAddr(":50007")
1111         cl, err := NewClient(cfg)
1112         require.Error(t, err)
1113         require.Nil(t, cl)
1114 }
1115
1116 func TestClientHasDhtServersWhenUtpDisabled(t *testing.T) {
1117         cc := TestingConfig()
1118         cc.DisableUTP = true
1119         cc.NoDHT = false
1120         cl, err := NewClient(cc)
1121         require.NoError(t, err)
1122         defer cl.Close()
1123         assert.NotEmpty(t, cl.DhtServers())
1124 }
1125
1126 func TestIssue335(t *testing.T) {
1127         dir, mi := testutil.GreetingTestTorrent()
1128         defer os.RemoveAll(dir)
1129         cfg := TestingConfig()
1130         cfg.Seed = false
1131         cfg.Debug = true
1132         cfg.DataDir = dir
1133         comp, err := storage.NewBoltPieceCompletion(dir)
1134         require.NoError(t, err)
1135         defer comp.Close()
1136         cfg.DefaultStorage = storage.NewMMapWithCompletion(dir, comp)
1137         cl, err := NewClient(cfg)
1138         require.NoError(t, err)
1139         defer cl.Close()
1140         tor, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1141         require.NoError(t, err)
1142         assert.True(t, new)
1143         require.True(t, cl.WaitAll())
1144         tor.Drop()
1145         tor, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1146         require.NoError(t, err)
1147         assert.True(t, new)
1148         require.True(t, cl.WaitAll())
1149 }