]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Split Client dialers and listeners
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "os"
9         "path/filepath"
10         "reflect"
11         "sync"
12         "testing"
13         "time"
14
15         "github.com/bradfitz/iter"
16         "github.com/stretchr/testify/assert"
17         "github.com/stretchr/testify/require"
18         "golang.org/x/time/rate"
19
20         "github.com/anacrolix/dht/v2"
21         _ "github.com/anacrolix/envpprof"
22         "github.com/anacrolix/missinggo"
23         "github.com/anacrolix/missinggo/v2/filecache"
24
25         "github.com/anacrolix/torrent/bencode"
26         "github.com/anacrolix/torrent/internal/testutil"
27         "github.com/anacrolix/torrent/iplist"
28         "github.com/anacrolix/torrent/metainfo"
29         "github.com/anacrolix/torrent/storage"
30 )
31
32 func TestingConfig() *ClientConfig {
33         cfg := NewDefaultClientConfig()
34         cfg.ListenHost = LoopbackListenHost
35         cfg.NoDHT = true
36         cfg.DataDir = tempDir()
37         cfg.DisableTrackers = true
38         cfg.NoDefaultPortForwarding = true
39         cfg.DisableAcceptRateLimiting = true
40         cfg.ListenPort = 0
41         //cfg.Debug = true
42         //cfg.Logger = cfg.Logger.WithText(func(m log.Msg) string {
43         //      t := m.Text()
44         //      m.Values(func(i interface{}) bool {
45         //              t += fmt.Sprintf("\n%[1]T: %[1]v", i)
46         //              return true
47         //      })
48         //      return t
49         //})
50         return cfg
51 }
52
53 func TestClientDefault(t *testing.T) {
54         cl, err := NewClient(TestingConfig())
55         require.NoError(t, err)
56         cl.Close()
57 }
58
59 func TestClientNilConfig(t *testing.T) {
60         cl, err := NewClient(nil)
61         require.NoError(t, err)
62         cl.Close()
63 }
64
65 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
66         cfg := TestingConfig()
67         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
68         require.NoError(t, err)
69         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
70         defer ci.Close()
71         cfg.DefaultStorage = ci
72         cl, err := NewClient(cfg)
73         require.NoError(t, err)
74         cl.Close()
75         // And again, https://github.com/anacrolix/torrent/issues/158
76         cl, err = NewClient(cfg)
77         require.NoError(t, err)
78         cl.Close()
79 }
80
81 func TestAddDropTorrent(t *testing.T) {
82         cl, err := NewClient(TestingConfig())
83         require.NoError(t, err)
84         defer cl.Close()
85         dir, mi := testutil.GreetingTestTorrent()
86         defer os.RemoveAll(dir)
87         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
88         require.NoError(t, err)
89         assert.True(t, new)
90         tt.SetMaxEstablishedConns(0)
91         tt.SetMaxEstablishedConns(1)
92         tt.Drop()
93 }
94
95 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
96         // TODO?
97         t.SkipNow()
98 }
99
100 func TestAddTorrentNoUsableURLs(t *testing.T) {
101         // TODO?
102         t.SkipNow()
103 }
104
105 func TestAddPeersToUnknownTorrent(t *testing.T) {
106         // TODO?
107         t.SkipNow()
108 }
109
110 func TestPieceHashSize(t *testing.T) {
111         assert.Equal(t, 20, pieceHash.Size())
112 }
113
114 func TestTorrentInitialState(t *testing.T) {
115         dir, mi := testutil.GreetingTestTorrent()
116         defer os.RemoveAll(dir)
117         cl := &Client{
118                 config: TestingConfig(),
119         }
120         cl.initLogger()
121         tor := cl.newTorrent(
122                 mi.HashInfoBytes(),
123                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
124         )
125         tor.setChunkSize(2)
126         tor.cl.lock()
127         err := tor.setInfoBytes(mi.InfoBytes)
128         tor.cl.unlock()
129         require.NoError(t, err)
130         require.Len(t, tor.pieces, 3)
131         tor.pendAllChunkSpecs(0)
132         tor.cl.lock()
133         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
134         tor.cl.unlock()
135         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
136 }
137
138 func TestReducedDialTimeout(t *testing.T) {
139         cfg := NewDefaultClientConfig()
140         for _, _case := range []struct {
141                 Max             time.Duration
142                 HalfOpenLimit   int
143                 PendingPeers    int
144                 ExpectedReduced time.Duration
145         }{
146                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
147                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
148                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
149                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
150                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
151                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
152         } {
153                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
154                 expected := _case.ExpectedReduced
155                 if expected < cfg.MinDialTimeout {
156                         expected = cfg.MinDialTimeout
157                 }
158                 if reduced != expected {
159                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
160                 }
161         }
162 }
163
164 func TestAddDropManyTorrents(t *testing.T) {
165         cl, err := NewClient(TestingConfig())
166         require.NoError(t, err)
167         defer cl.Close()
168         for i := range iter.N(1000) {
169                 var spec TorrentSpec
170                 binary.PutVarint(spec.InfoHash[:], int64(i))
171                 tt, new, err := cl.AddTorrentSpec(&spec)
172                 assert.NoError(t, err)
173                 assert.True(t, new)
174                 defer tt.Drop()
175         }
176 }
177
178 type fileCacheClientStorageFactoryParams struct {
179         Capacity    int64
180         SetCapacity bool
181         Wrapper     func(*filecache.Cache) storage.ClientImpl
182 }
183
184 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
185         return func(dataDir string) storage.ClientImpl {
186                 fc, err := filecache.NewCache(dataDir)
187                 if err != nil {
188                         panic(err)
189                 }
190                 if ps.SetCapacity {
191                         fc.SetCapacity(ps.Capacity)
192                 }
193                 return ps.Wrapper(fc)
194         }
195 }
196
197 type storageFactory func(string) storage.ClientImpl
198
199 func TestClientTransferDefault(t *testing.T) {
200         testClientTransfer(t, testClientTransferParams{
201                 ExportClientStatus: true,
202                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
203                         Wrapper: fileCachePieceResourceStorage,
204                 }),
205         })
206 }
207
208 func TestClientTransferRateLimitedUpload(t *testing.T) {
209         started := time.Now()
210         testClientTransfer(t, testClientTransferParams{
211                 // We are uploading 13 bytes (the length of the greeting torrent). The
212                 // chunks are 2 bytes in length. Then the smallest burst we can run
213                 // with is 2. Time taken is (13-burst)/rate.
214                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
215                 ExportClientStatus:      true,
216         })
217         require.True(t, time.Since(started) > time.Second)
218 }
219
220 func TestClientTransferRateLimitedDownload(t *testing.T) {
221         testClientTransfer(t, testClientTransferParams{
222                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
223         })
224 }
225
226 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
227         return storage.NewResourcePieces(fc.AsResourceProvider())
228 }
229
230 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
231         testClientTransfer(t, testClientTransferParams{
232                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
233                         SetCapacity: true,
234                         // Going below the piece length means it can't complete a piece so
235                         // that it can be hashed.
236                         Capacity: 5,
237                         Wrapper:  fileCachePieceResourceStorage,
238                 }),
239                 SetReadahead: setReadahead,
240                 // Can't readahead too far or the cache will thrash and drop data we
241                 // thought we had.
242                 Readahead:          readahead,
243                 ExportClientStatus: true,
244         })
245 }
246
247 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
248         testClientTransferSmallCache(t, true, 5)
249 }
250
251 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
252         testClientTransferSmallCache(t, true, 15)
253 }
254
255 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
256         testClientTransferSmallCache(t, false, -1)
257 }
258
259 func TestClientTransferVarious(t *testing.T) {
260         // Leecher storage
261         for _, ls := range []struct {
262                 name string
263                 f    storageFactory
264         }{
265                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
266                         Wrapper: fileCachePieceResourceStorage,
267                 })},
268                 {"Boltdb", storage.NewBoltDB},
269         } {
270                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
271                         // Seeder storage
272                         for _, ss := range []struct {
273                                 name string
274                                 f    func(string) storage.ClientImpl
275                         }{
276                                 {"File", storage.NewFile},
277                                 {"Mmap", storage.NewMMap},
278                         } {
279                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
280                                         for _, responsive := range []bool{false, true} {
281                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
282                                                         t.Run("NoReadahead", func(t *testing.T) {
283                                                                 testClientTransfer(t, testClientTransferParams{
284                                                                         Responsive:     responsive,
285                                                                         SeederStorage:  ss.f,
286                                                                         LeecherStorage: ls.f,
287                                                                 })
288                                                         })
289                                                         for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
290                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
291                                                                         testClientTransfer(t, testClientTransferParams{
292                                                                                 SeederStorage:  ss.f,
293                                                                                 Responsive:     responsive,
294                                                                                 SetReadahead:   true,
295                                                                                 Readahead:      readahead,
296                                                                                 LeecherStorage: ls.f,
297                                                                         })
298                                                                 })
299                                                         }
300                                                 })
301                                         }
302                                 })
303                         }
304                 })
305         }
306 }
307
308 type testClientTransferParams struct {
309         Responsive                 bool
310         Readahead                  int64
311         SetReadahead               bool
312         ExportClientStatus         bool
313         LeecherStorage             func(string) storage.ClientImpl
314         SeederStorage              func(string) storage.ClientImpl
315         SeederUploadRateLimiter    *rate.Limiter
316         LeecherDownloadRateLimiter *rate.Limiter
317 }
318
319 // Creates a seeder and a leecher, and ensures the data transfers when a read
320 // is attempted on the leecher.
321 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
322         greetingTempDir, mi := testutil.GreetingTestTorrent()
323         defer os.RemoveAll(greetingTempDir)
324         // Create seeder and a Torrent.
325         cfg := TestingConfig()
326         cfg.Seed = true
327         if ps.SeederUploadRateLimiter != nil {
328                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
329         }
330         // cfg.ListenAddr = "localhost:4000"
331         if ps.SeederStorage != nil {
332                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
333                 defer cfg.DefaultStorage.Close()
334         } else {
335                 cfg.DataDir = greetingTempDir
336         }
337         seeder, err := NewClient(cfg)
338         require.NoError(t, err)
339         if ps.ExportClientStatus {
340                 defer testutil.ExportStatusWriter(seeder, "s")()
341         }
342         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
343         // Run a Stats right after Closing the Client. This will trigger the Stats
344         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
345         defer seederTorrent.Stats()
346         defer seeder.Close()
347         seederTorrent.VerifyData()
348         // Create leecher and a Torrent.
349         leecherDataDir, err := ioutil.TempDir("", "")
350         require.NoError(t, err)
351         defer os.RemoveAll(leecherDataDir)
352         cfg = TestingConfig()
353         if ps.LeecherStorage == nil {
354                 cfg.DataDir = leecherDataDir
355         } else {
356                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
357         }
358         if ps.LeecherDownloadRateLimiter != nil {
359                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
360         }
361         cfg.Seed = false
362         //cfg.Debug = true
363         leecher, err := NewClient(cfg)
364         require.NoError(t, err)
365         defer leecher.Close()
366         if ps.ExportClientStatus {
367                 defer testutil.ExportStatusWriter(leecher, "l")()
368         }
369         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
370                 ret = TorrentSpecFromMetaInfo(mi)
371                 ret.ChunkSize = 2
372                 return
373         }())
374         require.NoError(t, err)
375         assert.True(t, new)
376
377         //// This was used when observing coalescing of piece state changes.
378         //logPieceStateChanges(leecherTorrent)
379
380         // Now do some things with leecher and seeder.
381         leecherTorrent.AddClientPeer(seeder)
382         // The Torrent should not be interested in obtaining peers, so the one we
383         // just added should be the only one.
384         assert.False(t, leecherTorrent.Seeding())
385         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
386         r := leecherTorrent.NewReader()
387         defer r.Close()
388         if ps.Responsive {
389                 r.SetResponsive()
390         }
391         if ps.SetReadahead {
392                 r.SetReadahead(ps.Readahead)
393         }
394         assertReadAllGreeting(t, r)
395
396         seederStats := seederTorrent.Stats()
397         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
398         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
399
400         leecherStats := leecherTorrent.Stats()
401         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
402         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
403
404         // Try reading through again for the cases where the torrent data size
405         // exceeds the size of the cache.
406         assertReadAllGreeting(t, r)
407 }
408
409 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
410         pos, err := r.Seek(0, io.SeekStart)
411         assert.NoError(t, err)
412         assert.EqualValues(t, 0, pos)
413         _greeting, err := ioutil.ReadAll(r)
414         assert.NoError(t, err)
415         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
416 }
417
418 // Check that after completing leeching, a leecher transitions to a seeding
419 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
420 func TestSeedAfterDownloading(t *testing.T) {
421         greetingTempDir, mi := testutil.GreetingTestTorrent()
422         defer os.RemoveAll(greetingTempDir)
423
424         cfg := TestingConfig()
425         cfg.Seed = true
426         cfg.DataDir = greetingTempDir
427         seeder, err := NewClient(cfg)
428         require.NoError(t, err)
429         defer seeder.Close()
430         defer testutil.ExportStatusWriter(seeder, "s")()
431         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
432         require.NoError(t, err)
433         assert.True(t, ok)
434         seederTorrent.VerifyData()
435
436         cfg = TestingConfig()
437         cfg.Seed = true
438         cfg.DataDir, err = ioutil.TempDir("", "")
439         require.NoError(t, err)
440         defer os.RemoveAll(cfg.DataDir)
441         leecher, err := NewClient(cfg)
442         require.NoError(t, err)
443         defer leecher.Close()
444         defer testutil.ExportStatusWriter(leecher, "l")()
445
446         cfg = TestingConfig()
447         cfg.Seed = false
448         cfg.DataDir, err = ioutil.TempDir("", "")
449         require.NoError(t, err)
450         defer os.RemoveAll(cfg.DataDir)
451         leecherLeecher, _ := NewClient(cfg)
452         require.NoError(t, err)
453         defer leecherLeecher.Close()
454         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
455         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
456                 ret = TorrentSpecFromMetaInfo(mi)
457                 ret.ChunkSize = 2
458                 return
459         }())
460         require.NoError(t, err)
461         assert.True(t, ok)
462         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
463                 ret = TorrentSpecFromMetaInfo(mi)
464                 ret.ChunkSize = 3
465                 return
466         }())
467         require.NoError(t, err)
468         assert.True(t, ok)
469         // Simultaneously DownloadAll in Leecher, and read the contents
470         // consecutively in LeecherLeecher. This non-deterministically triggered a
471         // case where the leecher wouldn't unchoke the LeecherLeecher.
472         var wg sync.WaitGroup
473         wg.Add(1)
474         go func() {
475                 defer wg.Done()
476                 r := llg.NewReader()
477                 defer r.Close()
478                 b, err := ioutil.ReadAll(r)
479                 require.NoError(t, err)
480                 assert.EqualValues(t, testutil.GreetingFileContents, b)
481         }()
482         done := make(chan struct{})
483         defer close(done)
484         go leecherGreeting.AddClientPeer(seeder)
485         go leecherGreeting.AddClientPeer(leecherLeecher)
486         wg.Add(1)
487         go func() {
488                 defer wg.Done()
489                 leecherGreeting.DownloadAll()
490                 leecher.WaitAll()
491         }()
492         wg.Wait()
493 }
494
495 func TestMergingTrackersByAddingSpecs(t *testing.T) {
496         cl, err := NewClient(TestingConfig())
497         require.NoError(t, err)
498         defer cl.Close()
499         spec := TorrentSpec{}
500         T, new, _ := cl.AddTorrentSpec(&spec)
501         if !new {
502                 t.FailNow()
503         }
504         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
505         _, new, _ = cl.AddTorrentSpec(&spec)
506         assert.False(t, new)
507         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
508         // Because trackers are disabled in TestingConfig.
509         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
510 }
511
512 // We read from a piece which is marked completed, but is missing data.
513 func TestCompletedPieceWrongSize(t *testing.T) {
514         cfg := TestingConfig()
515         cfg.DefaultStorage = badStorage{}
516         cl, err := NewClient(cfg)
517         require.NoError(t, err)
518         defer cl.Close()
519         info := metainfo.Info{
520                 PieceLength: 15,
521                 Pieces:      make([]byte, 20),
522                 Files: []metainfo.FileInfo{
523                         {Path: []string{"greeting"}, Length: 13},
524                 },
525         }
526         b, err := bencode.Marshal(info)
527         require.NoError(t, err)
528         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
529                 InfoBytes: b,
530                 InfoHash:  metainfo.HashBytes(b),
531         })
532         require.NoError(t, err)
533         defer tt.Drop()
534         assert.True(t, new)
535         r := tt.NewReader()
536         defer r.Close()
537         b, err = ioutil.ReadAll(r)
538         assert.Len(t, b, 13)
539         assert.NoError(t, err)
540 }
541
542 func BenchmarkAddLargeTorrent(b *testing.B) {
543         cfg := TestingConfig()
544         cfg.DisableTCP = true
545         cfg.DisableUTP = true
546         cl, err := NewClient(cfg)
547         require.NoError(b, err)
548         defer cl.Close()
549         b.ReportAllocs()
550         for range iter.N(b.N) {
551                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
552                 if err != nil {
553                         b.Fatal(err)
554                 }
555                 t.Drop()
556         }
557 }
558
559 func TestResponsive(t *testing.T) {
560         seederDataDir, mi := testutil.GreetingTestTorrent()
561         defer os.RemoveAll(seederDataDir)
562         cfg := TestingConfig()
563         cfg.Seed = true
564         cfg.DataDir = seederDataDir
565         seeder, err := NewClient(cfg)
566         require.Nil(t, err)
567         defer seeder.Close()
568         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
569         seederTorrent.VerifyData()
570         leecherDataDir, err := ioutil.TempDir("", "")
571         require.Nil(t, err)
572         defer os.RemoveAll(leecherDataDir)
573         cfg = TestingConfig()
574         cfg.DataDir = leecherDataDir
575         leecher, err := NewClient(cfg)
576         require.Nil(t, err)
577         defer leecher.Close()
578         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
579                 ret = TorrentSpecFromMetaInfo(mi)
580                 ret.ChunkSize = 2
581                 return
582         }())
583         leecherTorrent.AddClientPeer(seeder)
584         reader := leecherTorrent.NewReader()
585         defer reader.Close()
586         reader.SetReadahead(0)
587         reader.SetResponsive()
588         b := make([]byte, 2)
589         _, err = reader.Seek(3, io.SeekStart)
590         require.NoError(t, err)
591         _, err = io.ReadFull(reader, b)
592         assert.Nil(t, err)
593         assert.EqualValues(t, "lo", string(b))
594         _, err = reader.Seek(11, io.SeekStart)
595         require.NoError(t, err)
596         n, err := io.ReadFull(reader, b)
597         assert.Nil(t, err)
598         assert.EqualValues(t, 2, n)
599         assert.EqualValues(t, "d\n", string(b))
600 }
601
602 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
603         seederDataDir, mi := testutil.GreetingTestTorrent()
604         defer os.RemoveAll(seederDataDir)
605         cfg := TestingConfig()
606         cfg.Seed = true
607         cfg.DataDir = seederDataDir
608         seeder, err := NewClient(cfg)
609         require.Nil(t, err)
610         defer seeder.Close()
611         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
612         seederTorrent.VerifyData()
613         leecherDataDir, err := ioutil.TempDir("", "")
614         require.Nil(t, err)
615         defer os.RemoveAll(leecherDataDir)
616         cfg = TestingConfig()
617         cfg.DataDir = leecherDataDir
618         leecher, err := NewClient(cfg)
619         require.Nil(t, err)
620         defer leecher.Close()
621         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
622                 ret = TorrentSpecFromMetaInfo(mi)
623                 ret.ChunkSize = 2
624                 return
625         }())
626         leecherTorrent.AddClientPeer(seeder)
627         reader := leecherTorrent.NewReader()
628         defer reader.Close()
629         reader.SetReadahead(0)
630         reader.SetResponsive()
631         b := make([]byte, 2)
632         _, err = reader.Seek(3, io.SeekStart)
633         require.NoError(t, err)
634         _, err = io.ReadFull(reader, b)
635         assert.Nil(t, err)
636         assert.EqualValues(t, "lo", string(b))
637         go leecherTorrent.Drop()
638         _, err = reader.Seek(11, io.SeekStart)
639         require.NoError(t, err)
640         n, err := reader.Read(b)
641         assert.EqualError(t, err, "torrent closed")
642         assert.EqualValues(t, 0, n)
643 }
644
645 func TestDHTInheritBlocklist(t *testing.T) {
646         ipl := iplist.New(nil)
647         require.NotNil(t, ipl)
648         cfg := TestingConfig()
649         cfg.IPBlocklist = ipl
650         cfg.NoDHT = false
651         cl, err := NewClient(cfg)
652         require.NoError(t, err)
653         defer cl.Close()
654         numServers := 0
655         cl.eachDhtServer(func(s *dht.Server) {
656                 assert.Equal(t, ipl, s.IPBlocklist())
657                 numServers++
658         })
659         assert.EqualValues(t, 2, numServers)
660 }
661
662 // Check that stuff is merged in subsequent AddTorrentSpec for the same
663 // infohash.
664 func TestAddTorrentSpecMerging(t *testing.T) {
665         cl, err := NewClient(TestingConfig())
666         require.NoError(t, err)
667         defer cl.Close()
668         dir, mi := testutil.GreetingTestTorrent()
669         defer os.RemoveAll(dir)
670         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
671                 InfoHash: mi.HashInfoBytes(),
672         })
673         require.NoError(t, err)
674         require.True(t, new)
675         require.Nil(t, tt.Info())
676         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
677         require.NoError(t, err)
678         require.False(t, new)
679         require.NotNil(t, tt.Info())
680 }
681
682 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
683         dir, mi := testutil.GreetingTestTorrent()
684         os.RemoveAll(dir)
685         cl, _ := NewClient(TestingConfig())
686         defer cl.Close()
687         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
688                 InfoHash: mi.HashInfoBytes(),
689         })
690         tt.Drop()
691         assert.EqualValues(t, 0, len(cl.Torrents()))
692         select {
693         case <-tt.GotInfo():
694                 t.FailNow()
695         default:
696         }
697 }
698
699 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
700         for i := range iter.N(info.NumPieces()) {
701                 p := info.Piece(i)
702                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
703         }
704 }
705
706 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
707         fileCacheDir, err := ioutil.TempDir("", "")
708         require.NoError(t, err)
709         defer os.RemoveAll(fileCacheDir)
710         fileCache, err := filecache.NewCache(fileCacheDir)
711         require.NoError(t, err)
712         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
713         defer os.RemoveAll(greetingDataTempDir)
714         filePieceStore := csf(fileCache)
715         defer filePieceStore.Close()
716         info, err := greetingMetainfo.UnmarshalInfo()
717         require.NoError(t, err)
718         ih := greetingMetainfo.HashInfoBytes()
719         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
720         require.NoError(t, err)
721         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
722         // require.Equal(t, len(testutil.GreetingFileContents), written)
723         // require.NoError(t, err)
724         for i := 0; i < info.NumPieces(); i++ {
725                 p := info.Piece(i)
726                 if alreadyCompleted {
727                         require.NoError(t, greetingData.Piece(p).MarkComplete())
728                 }
729         }
730         cfg := TestingConfig()
731         // TODO: Disable network option?
732         cfg.DisableTCP = true
733         cfg.DisableUTP = true
734         cfg.DefaultStorage = filePieceStore
735         cl, err := NewClient(cfg)
736         require.NoError(t, err)
737         defer cl.Close()
738         tt, err := cl.AddTorrent(greetingMetainfo)
739         require.NoError(t, err)
740         psrs := tt.PieceStateRuns()
741         assert.Len(t, psrs, 1)
742         assert.EqualValues(t, 3, psrs[0].Length)
743         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
744         if alreadyCompleted {
745                 r := tt.NewReader()
746                 b, err := ioutil.ReadAll(r)
747                 assert.NoError(t, err)
748                 assert.EqualValues(t, testutil.GreetingFileContents, b)
749         }
750 }
751
752 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
753         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
754 }
755
756 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
757         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
758 }
759
760 func TestAddMetainfoWithNodes(t *testing.T) {
761         cfg := TestingConfig()
762         cfg.ListenHost = func(string) string { return "" }
763         cfg.NoDHT = false
764         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
765         // For now, we want to just jam the nodes into the table, without
766         // verifying them first. Also the DHT code doesn't support mixing secure
767         // and insecure nodes if security is enabled (yet).
768         // cfg.DHTConfig.NoSecurity = true
769         cl, err := NewClient(cfg)
770         require.NoError(t, err)
771         defer cl.Close()
772         sum := func() (ret int64) {
773                 cl.eachDhtServer(func(s *dht.Server) {
774                         ret += s.Stats().OutboundQueriesAttempted
775                 })
776                 return
777         }
778         assert.EqualValues(t, 0, sum())
779         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
780         require.NoError(t, err)
781         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
782         // check if the announce-list is here instead. TODO: Add nodes.
783         assert.Len(t, tt.metainfo.AnnounceList, 5)
784         // There are 6 nodes in the torrent file.
785         for sum() != int64(6*len(cl.dhtServers)) {
786                 time.Sleep(time.Millisecond)
787         }
788 }
789
790 type testDownloadCancelParams struct {
791         SetLeecherStorageCapacity bool
792         LeecherStorageCapacity    int64
793         Cancel                    bool
794 }
795
796 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
797         greetingTempDir, mi := testutil.GreetingTestTorrent()
798         defer os.RemoveAll(greetingTempDir)
799         cfg := TestingConfig()
800         cfg.Seed = true
801         cfg.DataDir = greetingTempDir
802         seeder, err := NewClient(cfg)
803         require.NoError(t, err)
804         defer seeder.Close()
805         defer testutil.ExportStatusWriter(seeder, "s")()
806         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
807         seederTorrent.VerifyData()
808         leecherDataDir, err := ioutil.TempDir("", "")
809         require.NoError(t, err)
810         defer os.RemoveAll(leecherDataDir)
811         fc, err := filecache.NewCache(leecherDataDir)
812         require.NoError(t, err)
813         if ps.SetLeecherStorageCapacity {
814                 fc.SetCapacity(ps.LeecherStorageCapacity)
815         }
816         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
817         cfg.DataDir = leecherDataDir
818         leecher, err := NewClient(cfg)
819         require.NoError(t, err)
820         defer leecher.Close()
821         defer testutil.ExportStatusWriter(leecher, "l")()
822         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
823                 ret = TorrentSpecFromMetaInfo(mi)
824                 ret.ChunkSize = 2
825                 return
826         }())
827         require.NoError(t, err)
828         assert.True(t, new)
829         psc := leecherGreeting.SubscribePieceStateChanges()
830         defer psc.Close()
831
832         leecherGreeting.cl.lock()
833         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
834         if ps.Cancel {
835                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
836         }
837         leecherGreeting.cl.unlock()
838         done := make(chan struct{})
839         defer close(done)
840         go leecherGreeting.AddClientPeer(seeder)
841         completes := make(map[int]bool, 3)
842         expected := func() map[int]bool {
843                 if ps.Cancel {
844                         return map[int]bool{0: false, 1: false, 2: false}
845                 } else {
846                         return map[int]bool{0: true, 1: true, 2: true}
847                 }
848         }()
849         for !reflect.DeepEqual(completes, expected) {
850                 _v := <-psc.Values
851                 v := _v.(PieceStateChange)
852                 completes[v.Index] = v.Complete
853         }
854 }
855
856 func TestTorrentDownloadAll(t *testing.T) {
857         testDownloadCancel(t, testDownloadCancelParams{})
858 }
859
860 func TestTorrentDownloadAllThenCancel(t *testing.T) {
861         testDownloadCancel(t, testDownloadCancelParams{
862                 Cancel: true,
863         })
864 }
865
866 // Ensure that it's an error for a peer to send an invalid have message.
867 func TestPeerInvalidHave(t *testing.T) {
868         cl, err := NewClient(TestingConfig())
869         require.NoError(t, err)
870         defer cl.Close()
871         info := metainfo.Info{
872                 PieceLength: 1,
873                 Pieces:      make([]byte, 20),
874                 Files:       []metainfo.FileInfo{{Length: 1}},
875         }
876         infoBytes, err := bencode.Marshal(info)
877         require.NoError(t, err)
878         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
879                 InfoBytes: infoBytes,
880                 InfoHash:  metainfo.HashBytes(infoBytes),
881                 Storage:   badStorage{},
882         })
883         require.NoError(t, err)
884         assert.True(t, _new)
885         defer tt.Drop()
886         cn := &connection{
887                 t: tt,
888         }
889         assert.NoError(t, cn.peerSentHave(0))
890         assert.Error(t, cn.peerSentHave(1))
891 }
892
893 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
894         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
895         defer os.RemoveAll(greetingTempDir)
896         cfg := TestingConfig()
897         cfg.DataDir = greetingTempDir
898         seeder, err := NewClient(TestingConfig())
899         require.NoError(t, err)
900         seeder.AddTorrentSpec(&TorrentSpec{
901                 InfoBytes: greetingMetainfo.InfoBytes,
902         })
903 }
904
905 // Check that when the listen port is 0, all the protocols listened on have
906 // the same port, and it isn't zero.
907 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
908         cl, err := NewClient(TestingConfig())
909         require.NoError(t, err)
910         defer cl.Close()
911         port := cl.LocalPort()
912         assert.NotEqual(t, 0, port)
913         cl.eachListener(func(s listener) bool {
914                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
915                 return true
916         })
917 }
918
919 func TestClientDynamicListenTCPOnly(t *testing.T) {
920         cfg := TestingConfig()
921         cfg.DisableUTP = true
922         cfg.DisableTCP = false
923         cl, err := NewClient(cfg)
924         require.NoError(t, err)
925         defer cl.Close()
926         assert.NotEqual(t, 0, cl.LocalPort())
927 }
928
929 func TestClientDynamicListenUTPOnly(t *testing.T) {
930         cfg := TestingConfig()
931         cfg.DisableTCP = true
932         cfg.DisableUTP = false
933         cl, err := NewClient(cfg)
934         require.NoError(t, err)
935         defer cl.Close()
936         assert.NotEqual(t, 0, cl.LocalPort())
937 }
938
939 func totalConns(tts []*Torrent) (ret int) {
940         for _, tt := range tts {
941                 tt.cl.lock()
942                 ret += len(tt.conns)
943                 tt.cl.unlock()
944         }
945         return
946 }
947
948 func TestSetMaxEstablishedConn(t *testing.T) {
949         var tts []*Torrent
950         ih := testutil.GreetingMetaInfo().HashInfoBytes()
951         cfg := TestingConfig()
952         cfg.DisableAcceptRateLimiting = true
953         cfg.dropDuplicatePeerIds = true
954         for i := range iter.N(3) {
955                 cl, err := NewClient(cfg)
956                 require.NoError(t, err)
957                 defer cl.Close()
958                 tt, _ := cl.AddTorrentInfoHash(ih)
959                 tt.SetMaxEstablishedConns(2)
960                 defer testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))()
961                 tts = append(tts, tt)
962         }
963         addPeers := func() {
964                 for _, tt := range tts {
965                         for _, _tt := range tts {
966                                 // if tt != _tt {
967                                 tt.AddClientPeer(_tt.cl)
968                                 // }
969                         }
970                 }
971         }
972         waitTotalConns := func(num int) {
973                 for totalConns(tts) != num {
974                         addPeers()
975                         time.Sleep(time.Millisecond)
976                 }
977         }
978         addPeers()
979         waitTotalConns(6)
980         tts[0].SetMaxEstablishedConns(1)
981         waitTotalConns(4)
982         tts[0].SetMaxEstablishedConns(0)
983         waitTotalConns(2)
984         tts[0].SetMaxEstablishedConns(1)
985         addPeers()
986         waitTotalConns(4)
987         tts[0].SetMaxEstablishedConns(2)
988         addPeers()
989         waitTotalConns(6)
990 }
991
992 // Creates a file containing its own name as data. Make a metainfo from that, adds it to the given
993 // client, and returns a magnet link.
994 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
995         os.MkdirAll(dir, 0770)
996         file, err := os.Create(filepath.Join(dir, name))
997         require.NoError(t, err)
998         file.Write([]byte(name))
999         file.Close()
1000         mi := metainfo.MetaInfo{}
1001         mi.SetDefaults()
1002         info := metainfo.Info{PieceLength: 256 * 1024}
1003         err = info.BuildFromFilePath(filepath.Join(dir, name))
1004         require.NoError(t, err)
1005         mi.InfoBytes, err = bencode.Marshal(info)
1006         require.NoError(t, err)
1007         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1008         tr, err := cl.AddTorrent(&mi)
1009         require.NoError(t, err)
1010         require.True(t, tr.Seeding())
1011         tr.VerifyData()
1012         return magnet
1013 }
1014
1015 // https://github.com/anacrolix/torrent/issues/114
1016 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1017         testSeederLeecherPair(
1018                 t,
1019                 func(cfg *ClientConfig) {
1020                         cfg.HeaderObfuscationPolicy.Preferred = true
1021                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1022                 },
1023                 func(cfg *ClientConfig) {
1024                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1025                 },
1026         )
1027 }
1028
1029 // Test that the leecher can download a torrent in its entirety from the seeder. Note that the
1030 // seeder config is done first.
1031 func testSeederLeecherPair(t *testing.T, seeder func(*ClientConfig), leecher func(*ClientConfig)) {
1032         cfg := TestingConfig()
1033         cfg.Seed = true
1034         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1035         os.Mkdir(cfg.DataDir, 0755)
1036         seeder(cfg)
1037         server, err := NewClient(cfg)
1038         require.NoError(t, err)
1039         defer server.Close()
1040         defer testutil.ExportStatusWriter(server, "s")()
1041         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1042         // Extra torrents are added to test the seeder having to match incoming obfuscated headers
1043         // against more than one torrent. See issue #114
1044         makeMagnet(t, server, cfg.DataDir, "test2")
1045         for i := 0; i < 100; i++ {
1046                 makeMagnet(t, server, cfg.DataDir, fmt.Sprintf("test%d", i+2))
1047         }
1048         cfg = TestingConfig()
1049         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1050         leecher(cfg)
1051         client, err := NewClient(cfg)
1052         require.NoError(t, err)
1053         defer client.Close()
1054         defer testutil.ExportStatusWriter(client, "c")()
1055         tr, err := client.AddMagnet(magnet1)
1056         require.NoError(t, err)
1057         tr.AddClientPeer(server)
1058         <-tr.GotInfo()
1059         tr.DownloadAll()
1060         client.WaitAll()
1061 }
1062
1063 // This appears to be the situation with the S3 BitTorrent client.
1064 func TestObfuscatedHeaderFallbackSeederDisallowsLeecherPrefers(t *testing.T) {
1065         // Leecher prefers obfuscation, but the seeder does not allow it.
1066         testSeederLeecherPair(
1067                 t,
1068                 func(cfg *ClientConfig) {
1069                         cfg.HeaderObfuscationPolicy.Preferred = false
1070                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1071                 },
1072                 func(cfg *ClientConfig) {
1073                         cfg.HeaderObfuscationPolicy.Preferred = true
1074                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1075                 },
1076         )
1077 }
1078
1079 func TestObfuscatedHeaderFallbackSeederRequiresLeecherPrefersNot(t *testing.T) {
1080         // Leecher prefers no obfuscation, but the seeder enforces it.
1081         testSeederLeecherPair(
1082                 t,
1083                 func(cfg *ClientConfig) {
1084                         cfg.HeaderObfuscationPolicy.Preferred = true
1085                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1086                 },
1087                 func(cfg *ClientConfig) {
1088                         cfg.HeaderObfuscationPolicy.Preferred = false
1089                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1090                 },
1091         )
1092 }
1093
1094 func TestClientAddressInUse(t *testing.T) {
1095         s, _ := NewUtpSocket("udp", ":50007", nil)
1096         if s != nil {
1097                 defer s.Close()
1098         }
1099         cfg := TestingConfig().SetListenAddr(":50007")
1100         cl, err := NewClient(cfg)
1101         require.Error(t, err)
1102         require.Nil(t, cl)
1103 }
1104
1105 func TestClientHasDhtServersWhenUtpDisabled(t *testing.T) {
1106         cc := TestingConfig()
1107         cc.DisableUTP = true
1108         cc.NoDHT = false
1109         cl, err := NewClient(cc)
1110         require.NoError(t, err)
1111         defer cl.Close()
1112         assert.NotEmpty(t, cl.DhtServers())
1113 }
1114
1115 func TestIssue335(t *testing.T) {
1116         dir, mi := testutil.GreetingTestTorrent()
1117         defer os.RemoveAll(dir)
1118         cfg := TestingConfig()
1119         cfg.Seed = false
1120         cfg.Debug = true
1121         cfg.DataDir = dir
1122         comp, err := storage.NewBoltPieceCompletion(dir)
1123         require.NoError(t, err)
1124         defer comp.Close()
1125         cfg.DefaultStorage = storage.NewMMapWithCompletion(dir, comp)
1126         cl, err := NewClient(cfg)
1127         require.NoError(t, err)
1128         defer cl.Close()
1129         tor, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1130         require.NoError(t, err)
1131         assert.True(t, new)
1132         require.True(t, cl.WaitAll())
1133         tor.Drop()
1134         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1135         require.NoError(t, err)
1136         assert.True(t, new)
1137         require.True(t, cl.WaitAll())
1138 }