]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Add connection trust flag, and more tests with small caches
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "log"
9         "os"
10         "path/filepath"
11         "reflect"
12         "sync"
13         "testing"
14         "time"
15
16         "github.com/bradfitz/iter"
17         "github.com/stretchr/testify/assert"
18         "github.com/stretchr/testify/require"
19         "golang.org/x/time/rate"
20
21         "github.com/anacrolix/dht/v2"
22         _ "github.com/anacrolix/envpprof"
23         "github.com/anacrolix/missinggo"
24         "github.com/anacrolix/missinggo/filecache"
25
26         "github.com/anacrolix/torrent/bencode"
27         "github.com/anacrolix/torrent/internal/testutil"
28         "github.com/anacrolix/torrent/iplist"
29         "github.com/anacrolix/torrent/metainfo"
30         "github.com/anacrolix/torrent/storage"
31 )
32
33 func TestingConfig() *ClientConfig {
34         cfg := NewDefaultClientConfig()
35         cfg.ListenHost = LoopbackListenHost
36         cfg.NoDHT = true
37         cfg.DataDir = tempDir()
38         cfg.DisableTrackers = true
39         cfg.NoDefaultPortForwarding = true
40         cfg.DisableAcceptRateLimiting = true
41         cfg.ListenPort = 0
42         //cfg.Debug = true
43         //cfg.Logger = cfg.Logger.WithText(func(m log.Msg) string {
44         //      t := m.Text()
45         //      m.Values(func(i interface{}) bool {
46         //              t += fmt.Sprintf("\n%[1]T: %[1]v", i)
47         //              return true
48         //      })
49         //      return t
50         //})
51         return cfg
52 }
53
54 func TestClientDefault(t *testing.T) {
55         cl, err := NewClient(TestingConfig())
56         require.NoError(t, err)
57         cl.Close()
58 }
59
60 func TestClientNilConfig(t *testing.T) {
61         cl, err := NewClient(nil)
62         require.NoError(t, err)
63         cl.Close()
64 }
65
66 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
67         cfg := TestingConfig()
68         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
69         require.NoError(t, err)
70         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
71         defer ci.Close()
72         cfg.DefaultStorage = ci
73         cl, err := NewClient(cfg)
74         require.NoError(t, err)
75         cl.Close()
76         // And again, https://github.com/anacrolix/torrent/issues/158
77         cl, err = NewClient(cfg)
78         require.NoError(t, err)
79         cl.Close()
80 }
81
82 func TestAddDropTorrent(t *testing.T) {
83         cl, err := NewClient(TestingConfig())
84         require.NoError(t, err)
85         defer cl.Close()
86         dir, mi := testutil.GreetingTestTorrent()
87         defer os.RemoveAll(dir)
88         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
89         require.NoError(t, err)
90         assert.True(t, new)
91         tt.SetMaxEstablishedConns(0)
92         tt.SetMaxEstablishedConns(1)
93         tt.Drop()
94 }
95
96 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
97         // TODO?
98         t.SkipNow()
99 }
100
101 func TestAddTorrentNoUsableURLs(t *testing.T) {
102         // TODO?
103         t.SkipNow()
104 }
105
106 func TestAddPeersToUnknownTorrent(t *testing.T) {
107         // TODO?
108         t.SkipNow()
109 }
110
111 func TestPieceHashSize(t *testing.T) {
112         assert.Equal(t, 20, pieceHash.Size())
113 }
114
115 func TestTorrentInitialState(t *testing.T) {
116         dir, mi := testutil.GreetingTestTorrent()
117         defer os.RemoveAll(dir)
118         cl := &Client{
119                 config: TestingConfig(),
120         }
121         cl.initLogger()
122         tor := cl.newTorrent(
123                 mi.HashInfoBytes(),
124                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
125         )
126         tor.setChunkSize(2)
127         tor.cl.lock()
128         err := tor.setInfoBytes(mi.InfoBytes)
129         tor.cl.unlock()
130         require.NoError(t, err)
131         require.Len(t, tor.pieces, 3)
132         tor.pendAllChunkSpecs(0)
133         tor.cl.lock()
134         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
135         tor.cl.unlock()
136         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
137 }
138
139 func TestReducedDialTimeout(t *testing.T) {
140         cfg := NewDefaultClientConfig()
141         for _, _case := range []struct {
142                 Max             time.Duration
143                 HalfOpenLimit   int
144                 PendingPeers    int
145                 ExpectedReduced time.Duration
146         }{
147                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
148                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
149                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
150                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
151                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
152                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
153         } {
154                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
155                 expected := _case.ExpectedReduced
156                 if expected < cfg.MinDialTimeout {
157                         expected = cfg.MinDialTimeout
158                 }
159                 if reduced != expected {
160                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
161                 }
162         }
163 }
164
165 func TestAddDropManyTorrents(t *testing.T) {
166         cl, err := NewClient(TestingConfig())
167         require.NoError(t, err)
168         defer cl.Close()
169         for i := range iter.N(1000) {
170                 var spec TorrentSpec
171                 binary.PutVarint(spec.InfoHash[:], int64(i))
172                 tt, new, err := cl.AddTorrentSpec(&spec)
173                 assert.NoError(t, err)
174                 assert.True(t, new)
175                 defer tt.Drop()
176         }
177 }
178
179 type FileCacheClientStorageFactoryParams struct {
180         Capacity    int64
181         SetCapacity bool
182         Wrapper     func(*filecache.Cache) storage.ClientImpl
183 }
184
185 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
186         return func(dataDir string) storage.ClientImpl {
187                 fc, err := filecache.NewCache(dataDir)
188                 if err != nil {
189                         panic(err)
190                 }
191                 if ps.SetCapacity {
192                         fc.SetCapacity(ps.Capacity)
193                 }
194                 return ps.Wrapper(fc)
195         }
196 }
197
198 type storageFactory func(string) storage.ClientImpl
199
200 func TestClientTransferDefault(t *testing.T) {
201         testClientTransfer(t, testClientTransferParams{
202                 ExportClientStatus: true,
203                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
204                         Wrapper: fileCachePieceResourceStorage,
205                 }),
206         })
207 }
208
209 func TestClientTransferRateLimitedUpload(t *testing.T) {
210         started := time.Now()
211         testClientTransfer(t, testClientTransferParams{
212                 // We are uploading 13 bytes (the length of the greeting torrent). The
213                 // chunks are 2 bytes in length. Then the smallest burst we can run
214                 // with is 2. Time taken is (13-burst)/rate.
215                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
216                 ExportClientStatus:      true,
217         })
218         require.True(t, time.Since(started) > time.Second)
219 }
220
221 func TestClientTransferRateLimitedDownload(t *testing.T) {
222         testClientTransfer(t, testClientTransferParams{
223                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
224         })
225 }
226
227 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
228         return storage.NewResourcePieces(fc.AsResourceProvider())
229 }
230
231 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
232         testClientTransfer(t, testClientTransferParams{
233                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
234                         SetCapacity: true,
235                         // Going below the piece length means it can't complete a piece so
236                         // that it can be hashed.
237                         Capacity: 5,
238                         Wrapper:  fileCachePieceResourceStorage,
239                 }),
240                 SetReadahead: setReadahead,
241                 // Can't readahead too far or the cache will thrash and drop data we
242                 // thought we had.
243                 Readahead:          readahead,
244                 ExportClientStatus: true,
245         })
246 }
247
248 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
249         testClientTransferSmallCache(t, true, 5)
250 }
251
252 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
253         testClientTransferSmallCache(t, true, 15)
254 }
255
256 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
257         testClientTransferSmallCache(t, false, -1)
258 }
259
260 func TestClientTransferVarious(t *testing.T) {
261         // Leecher storage
262         for _, ls := range []storageFactory{
263                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
264                         Wrapper: fileCachePieceResourceStorage,
265                 }),
266                 storage.NewBoltDB,
267         } {
268                 // Seeder storage
269                 for _, ss := range []func(string) storage.ClientImpl{
270                         storage.NewFile,
271                         storage.NewMMap,
272                 } {
273                         for _, responsive := range []bool{false, true} {
274                                 testClientTransfer(t, testClientTransferParams{
275                                         Responsive:     responsive,
276                                         SeederStorage:  ss,
277                                         LeecherStorage: ls,
278                                 })
279                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
280                                         testClientTransfer(t, testClientTransferParams{
281                                                 SeederStorage:  ss,
282                                                 Responsive:     responsive,
283                                                 SetReadahead:   true,
284                                                 Readahead:      readahead,
285                                                 LeecherStorage: ls,
286                                         })
287                                 }
288                         }
289                 }
290         }
291 }
292
293 type testClientTransferParams struct {
294         Responsive                 bool
295         Readahead                  int64
296         SetReadahead               bool
297         ExportClientStatus         bool
298         LeecherStorage             func(string) storage.ClientImpl
299         SeederStorage              func(string) storage.ClientImpl
300         SeederUploadRateLimiter    *rate.Limiter
301         LeecherDownloadRateLimiter *rate.Limiter
302 }
303
304 func logPieceStateChanges(t *Torrent) {
305         sub := t.SubscribePieceStateChanges()
306         go func() {
307                 defer sub.Close()
308                 for e := range sub.Values {
309                         log.Printf("%p %#v", t, e)
310                 }
311         }()
312 }
313
314 // Creates a seeder and a leecher, and ensures the data transfers when a read
315 // is attempted on the leecher.
316 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
317         greetingTempDir, mi := testutil.GreetingTestTorrent()
318         defer os.RemoveAll(greetingTempDir)
319         // Create seeder and a Torrent.
320         cfg := TestingConfig()
321         cfg.Seed = true
322         if ps.SeederUploadRateLimiter != nil {
323                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
324         }
325         // cfg.ListenAddr = "localhost:4000"
326         if ps.SeederStorage != nil {
327                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
328                 defer cfg.DefaultStorage.Close()
329         } else {
330                 cfg.DataDir = greetingTempDir
331         }
332         seeder, err := NewClient(cfg)
333         require.NoError(t, err)
334         if ps.ExportClientStatus {
335                 defer testutil.ExportStatusWriter(seeder, "s")()
336         }
337         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
338         // Run a Stats right after Closing the Client. This will trigger the Stats
339         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
340         defer seederTorrent.Stats()
341         defer seeder.Close()
342         seederTorrent.VerifyData()
343         // Create leecher and a Torrent.
344         leecherDataDir, err := ioutil.TempDir("", "")
345         require.NoError(t, err)
346         defer os.RemoveAll(leecherDataDir)
347         cfg = TestingConfig()
348         if ps.LeecherStorage == nil {
349                 cfg.DataDir = leecherDataDir
350         } else {
351                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
352         }
353         if ps.LeecherDownloadRateLimiter != nil {
354                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
355         }
356         cfg.Seed = false
357         //cfg.Debug = true
358         leecher, err := NewClient(cfg)
359         require.NoError(t, err)
360         defer leecher.Close()
361         if ps.ExportClientStatus {
362                 defer testutil.ExportStatusWriter(leecher, "l")()
363         }
364         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
365                 ret = TorrentSpecFromMetaInfo(mi)
366                 ret.ChunkSize = 2
367                 return
368         }())
369         require.NoError(t, err)
370         assert.True(t, new)
371
372         //// This was used when observing coalescing of piece state changes.
373         //logPieceStateChanges(leecherTorrent)
374
375         // Now do some things with leecher and seeder.
376         leecherTorrent.AddClientPeer(seeder)
377         // The Torrent should not be interested in obtaining peers, so the one we
378         // just added should be the only one.
379         assert.False(t, leecherTorrent.Seeding())
380         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
381         r := leecherTorrent.NewReader()
382         defer r.Close()
383         if ps.Responsive {
384                 r.SetResponsive()
385         }
386         if ps.SetReadahead {
387                 r.SetReadahead(ps.Readahead)
388         }
389         assertReadAllGreeting(t, r)
390
391         seederStats := seederTorrent.Stats()
392         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
393         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
394
395         leecherStats := leecherTorrent.Stats()
396         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
397         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
398
399         // Try reading through again for the cases where the torrent data size
400         // exceeds the size of the cache.
401         assertReadAllGreeting(t, r)
402 }
403
404 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
405         pos, err := r.Seek(0, io.SeekStart)
406         assert.NoError(t, err)
407         assert.EqualValues(t, 0, pos)
408         _greeting, err := ioutil.ReadAll(r)
409         assert.NoError(t, err)
410         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
411 }
412
413 // Check that after completing leeching, a leecher transitions to a seeding
414 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
415 func TestSeedAfterDownloading(t *testing.T) {
416         greetingTempDir, mi := testutil.GreetingTestTorrent()
417         defer os.RemoveAll(greetingTempDir)
418
419         cfg := TestingConfig()
420         cfg.Seed = true
421         cfg.DataDir = greetingTempDir
422         seeder, err := NewClient(cfg)
423         require.NoError(t, err)
424         defer seeder.Close()
425         defer testutil.ExportStatusWriter(seeder, "s")()
426         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
427         require.NoError(t, err)
428         assert.True(t, ok)
429         seederTorrent.VerifyData()
430
431         cfg = TestingConfig()
432         cfg.Seed = true
433         cfg.DataDir, err = ioutil.TempDir("", "")
434         require.NoError(t, err)
435         defer os.RemoveAll(cfg.DataDir)
436         leecher, err := NewClient(cfg)
437         require.NoError(t, err)
438         defer leecher.Close()
439         defer testutil.ExportStatusWriter(leecher, "l")()
440
441         cfg = TestingConfig()
442         cfg.Seed = false
443         cfg.DataDir, err = ioutil.TempDir("", "")
444         require.NoError(t, err)
445         defer os.RemoveAll(cfg.DataDir)
446         leecherLeecher, _ := NewClient(cfg)
447         require.NoError(t, err)
448         defer leecherLeecher.Close()
449         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
450         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
451                 ret = TorrentSpecFromMetaInfo(mi)
452                 ret.ChunkSize = 2
453                 return
454         }())
455         require.NoError(t, err)
456         assert.True(t, ok)
457         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
458                 ret = TorrentSpecFromMetaInfo(mi)
459                 ret.ChunkSize = 3
460                 return
461         }())
462         require.NoError(t, err)
463         assert.True(t, ok)
464         // Simultaneously DownloadAll in Leecher, and read the contents
465         // consecutively in LeecherLeecher. This non-deterministically triggered a
466         // case where the leecher wouldn't unchoke the LeecherLeecher.
467         var wg sync.WaitGroup
468         wg.Add(1)
469         go func() {
470                 defer wg.Done()
471                 r := llg.NewReader()
472                 defer r.Close()
473                 b, err := ioutil.ReadAll(r)
474                 require.NoError(t, err)
475                 assert.EqualValues(t, testutil.GreetingFileContents, b)
476         }()
477         done := make(chan struct{})
478         defer close(done)
479         go leecherGreeting.AddClientPeer(seeder)
480         go leecherGreeting.AddClientPeer(leecherLeecher)
481         wg.Add(1)
482         go func() {
483                 defer wg.Done()
484                 leecherGreeting.DownloadAll()
485                 leecher.WaitAll()
486         }()
487         wg.Wait()
488 }
489
490 func TestMergingTrackersByAddingSpecs(t *testing.T) {
491         cl, err := NewClient(TestingConfig())
492         require.NoError(t, err)
493         defer cl.Close()
494         spec := TorrentSpec{}
495         T, new, _ := cl.AddTorrentSpec(&spec)
496         if !new {
497                 t.FailNow()
498         }
499         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
500         _, new, _ = cl.AddTorrentSpec(&spec)
501         assert.False(t, new)
502         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
503         // Because trackers are disabled in TestingConfig.
504         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
505 }
506
507 // We read from a piece which is marked completed, but is missing data.
508 func TestCompletedPieceWrongSize(t *testing.T) {
509         cfg := TestingConfig()
510         cfg.DefaultStorage = badStorage{}
511         cl, err := NewClient(cfg)
512         require.NoError(t, err)
513         defer cl.Close()
514         info := metainfo.Info{
515                 PieceLength: 15,
516                 Pieces:      make([]byte, 20),
517                 Files: []metainfo.FileInfo{
518                         {Path: []string{"greeting"}, Length: 13},
519                 },
520         }
521         b, err := bencode.Marshal(info)
522         require.NoError(t, err)
523         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
524                 InfoBytes: b,
525                 InfoHash:  metainfo.HashBytes(b),
526         })
527         require.NoError(t, err)
528         defer tt.Drop()
529         assert.True(t, new)
530         r := tt.NewReader()
531         defer r.Close()
532         b, err = ioutil.ReadAll(r)
533         assert.Len(t, b, 13)
534         assert.NoError(t, err)
535 }
536
537 func BenchmarkAddLargeTorrent(b *testing.B) {
538         cfg := TestingConfig()
539         cfg.DisableTCP = true
540         cfg.DisableUTP = true
541         cl, err := NewClient(cfg)
542         require.NoError(b, err)
543         defer cl.Close()
544         b.ReportAllocs()
545         for range iter.N(b.N) {
546                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
547                 if err != nil {
548                         b.Fatal(err)
549                 }
550                 t.Drop()
551         }
552 }
553
554 func TestResponsive(t *testing.T) {
555         seederDataDir, mi := testutil.GreetingTestTorrent()
556         defer os.RemoveAll(seederDataDir)
557         cfg := TestingConfig()
558         cfg.Seed = true
559         cfg.DataDir = seederDataDir
560         seeder, err := NewClient(cfg)
561         require.Nil(t, err)
562         defer seeder.Close()
563         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
564         seederTorrent.VerifyData()
565         leecherDataDir, err := ioutil.TempDir("", "")
566         require.Nil(t, err)
567         defer os.RemoveAll(leecherDataDir)
568         cfg = TestingConfig()
569         cfg.DataDir = leecherDataDir
570         leecher, err := NewClient(cfg)
571         require.Nil(t, err)
572         defer leecher.Close()
573         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
574                 ret = TorrentSpecFromMetaInfo(mi)
575                 ret.ChunkSize = 2
576                 return
577         }())
578         leecherTorrent.AddClientPeer(seeder)
579         reader := leecherTorrent.NewReader()
580         defer reader.Close()
581         reader.SetReadahead(0)
582         reader.SetResponsive()
583         b := make([]byte, 2)
584         _, err = reader.Seek(3, io.SeekStart)
585         require.NoError(t, err)
586         _, err = io.ReadFull(reader, b)
587         assert.Nil(t, err)
588         assert.EqualValues(t, "lo", string(b))
589         _, err = reader.Seek(11, io.SeekStart)
590         require.NoError(t, err)
591         n, err := io.ReadFull(reader, b)
592         assert.Nil(t, err)
593         assert.EqualValues(t, 2, n)
594         assert.EqualValues(t, "d\n", string(b))
595 }
596
597 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
598         seederDataDir, mi := testutil.GreetingTestTorrent()
599         defer os.RemoveAll(seederDataDir)
600         cfg := TestingConfig()
601         cfg.Seed = true
602         cfg.DataDir = seederDataDir
603         seeder, err := NewClient(cfg)
604         require.Nil(t, err)
605         defer seeder.Close()
606         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
607         seederTorrent.VerifyData()
608         leecherDataDir, err := ioutil.TempDir("", "")
609         require.Nil(t, err)
610         defer os.RemoveAll(leecherDataDir)
611         cfg = TestingConfig()
612         cfg.DataDir = leecherDataDir
613         leecher, err := NewClient(cfg)
614         require.Nil(t, err)
615         defer leecher.Close()
616         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
617                 ret = TorrentSpecFromMetaInfo(mi)
618                 ret.ChunkSize = 2
619                 return
620         }())
621         leecherTorrent.AddClientPeer(seeder)
622         reader := leecherTorrent.NewReader()
623         defer reader.Close()
624         reader.SetReadahead(0)
625         reader.SetResponsive()
626         b := make([]byte, 2)
627         _, err = reader.Seek(3, io.SeekStart)
628         require.NoError(t, err)
629         _, err = io.ReadFull(reader, b)
630         assert.Nil(t, err)
631         assert.EqualValues(t, "lo", string(b))
632         go leecherTorrent.Drop()
633         _, err = reader.Seek(11, io.SeekStart)
634         require.NoError(t, err)
635         n, err := reader.Read(b)
636         assert.EqualError(t, err, "torrent closed")
637         assert.EqualValues(t, 0, n)
638 }
639
640 func TestDHTInheritBlocklist(t *testing.T) {
641         ipl := iplist.New(nil)
642         require.NotNil(t, ipl)
643         cfg := TestingConfig()
644         cfg.IPBlocklist = ipl
645         cfg.NoDHT = false
646         cl, err := NewClient(cfg)
647         require.NoError(t, err)
648         defer cl.Close()
649         numServers := 0
650         cl.eachDhtServer(func(s *dht.Server) {
651                 assert.Equal(t, ipl, s.IPBlocklist())
652                 numServers++
653         })
654         assert.EqualValues(t, 2, numServers)
655 }
656
657 // Check that stuff is merged in subsequent AddTorrentSpec for the same
658 // infohash.
659 func TestAddTorrentSpecMerging(t *testing.T) {
660         cl, err := NewClient(TestingConfig())
661         require.NoError(t, err)
662         defer cl.Close()
663         dir, mi := testutil.GreetingTestTorrent()
664         defer os.RemoveAll(dir)
665         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
666                 InfoHash: mi.HashInfoBytes(),
667         })
668         require.NoError(t, err)
669         require.True(t, new)
670         require.Nil(t, tt.Info())
671         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
672         require.NoError(t, err)
673         require.False(t, new)
674         require.NotNil(t, tt.Info())
675 }
676
677 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
678         dir, mi := testutil.GreetingTestTorrent()
679         os.RemoveAll(dir)
680         cl, _ := NewClient(TestingConfig())
681         defer cl.Close()
682         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
683                 InfoHash: mi.HashInfoBytes(),
684         })
685         tt.Drop()
686         assert.EqualValues(t, 0, len(cl.Torrents()))
687         select {
688         case <-tt.GotInfo():
689                 t.FailNow()
690         default:
691         }
692 }
693
694 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
695         for i := range iter.N(info.NumPieces()) {
696                 p := info.Piece(i)
697                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
698         }
699 }
700
701 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
702         fileCacheDir, err := ioutil.TempDir("", "")
703         require.NoError(t, err)
704         defer os.RemoveAll(fileCacheDir)
705         fileCache, err := filecache.NewCache(fileCacheDir)
706         require.NoError(t, err)
707         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
708         defer os.RemoveAll(greetingDataTempDir)
709         filePieceStore := csf(fileCache)
710         defer filePieceStore.Close()
711         info, err := greetingMetainfo.UnmarshalInfo()
712         require.NoError(t, err)
713         ih := greetingMetainfo.HashInfoBytes()
714         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
715         require.NoError(t, err)
716         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
717         // require.Equal(t, len(testutil.GreetingFileContents), written)
718         // require.NoError(t, err)
719         for i := 0; i < info.NumPieces(); i++ {
720                 p := info.Piece(i)
721                 if alreadyCompleted {
722                         require.NoError(t, greetingData.Piece(p).MarkComplete())
723                 }
724         }
725         cfg := TestingConfig()
726         // TODO: Disable network option?
727         cfg.DisableTCP = true
728         cfg.DisableUTP = true
729         cfg.DefaultStorage = filePieceStore
730         cl, err := NewClient(cfg)
731         require.NoError(t, err)
732         defer cl.Close()
733         tt, err := cl.AddTorrent(greetingMetainfo)
734         require.NoError(t, err)
735         psrs := tt.PieceStateRuns()
736         assert.Len(t, psrs, 1)
737         assert.EqualValues(t, 3, psrs[0].Length)
738         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
739         if alreadyCompleted {
740                 r := tt.NewReader()
741                 b, err := ioutil.ReadAll(r)
742                 assert.NoError(t, err)
743                 assert.EqualValues(t, testutil.GreetingFileContents, b)
744         }
745 }
746
747 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
748         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
749 }
750
751 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
752         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
753 }
754
755 func TestAddMetainfoWithNodes(t *testing.T) {
756         cfg := TestingConfig()
757         cfg.ListenHost = func(string) string { return "" }
758         cfg.NoDHT = false
759         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
760         // For now, we want to just jam the nodes into the table, without
761         // verifying them first. Also the DHT code doesn't support mixing secure
762         // and insecure nodes if security is enabled (yet).
763         // cfg.DHTConfig.NoSecurity = true
764         cl, err := NewClient(cfg)
765         require.NoError(t, err)
766         defer cl.Close()
767         sum := func() (ret int64) {
768                 cl.eachDhtServer(func(s *dht.Server) {
769                         ret += s.Stats().OutboundQueriesAttempted
770                 })
771                 return
772         }
773         assert.EqualValues(t, 0, sum())
774         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
775         require.NoError(t, err)
776         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
777         // check if the announce-list is here instead. TODO: Add nodes.
778         assert.Len(t, tt.metainfo.AnnounceList, 5)
779         // There are 6 nodes in the torrent file.
780         for sum() != int64(6*len(cl.dhtServers)) {
781                 time.Sleep(time.Millisecond)
782         }
783 }
784
785 type testDownloadCancelParams struct {
786         SetLeecherStorageCapacity bool
787         LeecherStorageCapacity    int64
788         Cancel                    bool
789 }
790
791 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
792         greetingTempDir, mi := testutil.GreetingTestTorrent()
793         defer os.RemoveAll(greetingTempDir)
794         cfg := TestingConfig()
795         cfg.Seed = true
796         cfg.DataDir = greetingTempDir
797         seeder, err := NewClient(cfg)
798         require.NoError(t, err)
799         defer seeder.Close()
800         defer testutil.ExportStatusWriter(seeder, "s")()
801         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
802         seederTorrent.VerifyData()
803         leecherDataDir, err := ioutil.TempDir("", "")
804         require.NoError(t, err)
805         defer os.RemoveAll(leecherDataDir)
806         fc, err := filecache.NewCache(leecherDataDir)
807         require.NoError(t, err)
808         if ps.SetLeecherStorageCapacity {
809                 fc.SetCapacity(ps.LeecherStorageCapacity)
810         }
811         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
812         cfg.DataDir = leecherDataDir
813         leecher, err := NewClient(cfg)
814         require.NoError(t, err)
815         defer leecher.Close()
816         defer testutil.ExportStatusWriter(leecher, "l")()
817         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
818                 ret = TorrentSpecFromMetaInfo(mi)
819                 ret.ChunkSize = 2
820                 return
821         }())
822         require.NoError(t, err)
823         assert.True(t, new)
824         psc := leecherGreeting.SubscribePieceStateChanges()
825         defer psc.Close()
826
827         leecherGreeting.cl.lock()
828         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
829         if ps.Cancel {
830                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
831         }
832         leecherGreeting.cl.unlock()
833         done := make(chan struct{})
834         defer close(done)
835         go leecherGreeting.AddClientPeer(seeder)
836         completes := make(map[int]bool, 3)
837         expected := func() map[int]bool {
838                 if ps.Cancel {
839                         return map[int]bool{0: false, 1: false, 2: false}
840                 } else {
841                         return map[int]bool{0: true, 1: true, 2: true}
842                 }
843         }()
844         for !reflect.DeepEqual(completes, expected) {
845                 _v := <-psc.Values
846                 v := _v.(PieceStateChange)
847                 completes[v.Index] = v.Complete
848         }
849 }
850
851 func TestTorrentDownloadAll(t *testing.T) {
852         testDownloadCancel(t, testDownloadCancelParams{})
853 }
854
855 func TestTorrentDownloadAllThenCancel(t *testing.T) {
856         testDownloadCancel(t, testDownloadCancelParams{
857                 Cancel: true,
858         })
859 }
860
861 // Ensure that it's an error for a peer to send an invalid have message.
862 func TestPeerInvalidHave(t *testing.T) {
863         cl, err := NewClient(TestingConfig())
864         require.NoError(t, err)
865         defer cl.Close()
866         info := metainfo.Info{
867                 PieceLength: 1,
868                 Pieces:      make([]byte, 20),
869                 Files:       []metainfo.FileInfo{{Length: 1}},
870         }
871         infoBytes, err := bencode.Marshal(info)
872         require.NoError(t, err)
873         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
874                 InfoBytes: infoBytes,
875                 InfoHash:  metainfo.HashBytes(infoBytes),
876                 Storage:   badStorage{},
877         })
878         require.NoError(t, err)
879         assert.True(t, _new)
880         defer tt.Drop()
881         cn := &connection{
882                 t: tt,
883         }
884         assert.NoError(t, cn.peerSentHave(0))
885         assert.Error(t, cn.peerSentHave(1))
886 }
887
888 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
889         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
890         defer os.RemoveAll(greetingTempDir)
891         cfg := TestingConfig()
892         cfg.DataDir = greetingTempDir
893         seeder, err := NewClient(TestingConfig())
894         require.NoError(t, err)
895         seeder.AddTorrentSpec(&TorrentSpec{
896                 InfoBytes: greetingMetainfo.InfoBytes,
897         })
898 }
899
900 // Check that when the listen port is 0, all the protocols listened on have
901 // the same port, and it isn't zero.
902 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
903         cl, err := NewClient(TestingConfig())
904         require.NoError(t, err)
905         defer cl.Close()
906         port := cl.LocalPort()
907         assert.NotEqual(t, 0, port)
908         cl.eachListener(func(s socket) bool {
909                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
910                 return true
911         })
912 }
913
914 func TestClientDynamicListenTCPOnly(t *testing.T) {
915         cfg := TestingConfig()
916         cfg.DisableUTP = true
917         cfg.DisableTCP = false
918         cl, err := NewClient(cfg)
919         require.NoError(t, err)
920         defer cl.Close()
921         assert.NotEqual(t, 0, cl.LocalPort())
922 }
923
924 func TestClientDynamicListenUTPOnly(t *testing.T) {
925         cfg := TestingConfig()
926         cfg.DisableTCP = true
927         cfg.DisableUTP = false
928         cl, err := NewClient(cfg)
929         require.NoError(t, err)
930         defer cl.Close()
931         assert.NotEqual(t, 0, cl.LocalPort())
932 }
933
934 func totalConns(tts []*Torrent) (ret int) {
935         for _, tt := range tts {
936                 tt.cl.lock()
937                 ret += len(tt.conns)
938                 tt.cl.unlock()
939         }
940         return
941 }
942
943 func TestSetMaxEstablishedConn(t *testing.T) {
944         var tts []*Torrent
945         ih := testutil.GreetingMetaInfo().HashInfoBytes()
946         cfg := TestingConfig()
947         cfg.DisableAcceptRateLimiting = true
948         cfg.dropDuplicatePeerIds = true
949         for i := range iter.N(3) {
950                 cl, err := NewClient(cfg)
951                 require.NoError(t, err)
952                 defer cl.Close()
953                 tt, _ := cl.AddTorrentInfoHash(ih)
954                 tt.SetMaxEstablishedConns(2)
955                 defer testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))()
956                 tts = append(tts, tt)
957         }
958         addPeers := func() {
959                 for _, tt := range tts {
960                         for _, _tt := range tts {
961                                 // if tt != _tt {
962                                 tt.AddClientPeer(_tt.cl)
963                                 // }
964                         }
965                 }
966         }
967         waitTotalConns := func(num int) {
968                 for totalConns(tts) != num {
969                         addPeers()
970                         time.Sleep(time.Millisecond)
971                 }
972         }
973         addPeers()
974         waitTotalConns(6)
975         tts[0].SetMaxEstablishedConns(1)
976         waitTotalConns(4)
977         tts[0].SetMaxEstablishedConns(0)
978         waitTotalConns(2)
979         tts[0].SetMaxEstablishedConns(1)
980         addPeers()
981         waitTotalConns(4)
982         tts[0].SetMaxEstablishedConns(2)
983         addPeers()
984         waitTotalConns(6)
985 }
986
987 // Creates a file containing its own name as data. Make a metainfo from that, adds it to the given
988 // client, and returns a magnet link.
989 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
990         os.MkdirAll(dir, 0770)
991         file, err := os.Create(filepath.Join(dir, name))
992         require.NoError(t, err)
993         file.Write([]byte(name))
994         file.Close()
995         mi := metainfo.MetaInfo{}
996         mi.SetDefaults()
997         info := metainfo.Info{PieceLength: 256 * 1024}
998         err = info.BuildFromFilePath(filepath.Join(dir, name))
999         require.NoError(t, err)
1000         mi.InfoBytes, err = bencode.Marshal(info)
1001         require.NoError(t, err)
1002         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1003         tr, err := cl.AddTorrent(&mi)
1004         require.NoError(t, err)
1005         require.True(t, tr.Seeding())
1006         tr.VerifyData()
1007         return magnet
1008 }
1009
1010 // https://github.com/anacrolix/torrent/issues/114
1011 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1012         testSeederLeecherPair(
1013                 t,
1014                 func(cfg *ClientConfig) {
1015                         cfg.HeaderObfuscationPolicy.Preferred = true
1016                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1017                 },
1018                 func(cfg *ClientConfig) {
1019                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1020                 },
1021         )
1022 }
1023
1024 // Test that the leecher can download a torrent in its entirety from the seeder. Note that the
1025 // seeder config is done first.
1026 func testSeederLeecherPair(t *testing.T, seeder func(*ClientConfig), leecher func(*ClientConfig)) {
1027         cfg := TestingConfig()
1028         cfg.Seed = true
1029         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1030         os.Mkdir(cfg.DataDir, 0755)
1031         seeder(cfg)
1032         server, err := NewClient(cfg)
1033         require.NoError(t, err)
1034         defer server.Close()
1035         defer testutil.ExportStatusWriter(server, "s")()
1036         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1037         // Extra torrents are added to test the seeder having to match incoming obfuscated headers
1038         // against more than one torrent. See issue #114
1039         makeMagnet(t, server, cfg.DataDir, "test2")
1040         for i := 0; i < 100; i++ {
1041                 makeMagnet(t, server, cfg.DataDir, fmt.Sprintf("test%d", i+2))
1042         }
1043         cfg = TestingConfig()
1044         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1045         leecher(cfg)
1046         client, err := NewClient(cfg)
1047         require.NoError(t, err)
1048         defer client.Close()
1049         defer testutil.ExportStatusWriter(client, "c")()
1050         tr, err := client.AddMagnet(magnet1)
1051         require.NoError(t, err)
1052         tr.AddClientPeer(server)
1053         <-tr.GotInfo()
1054         tr.DownloadAll()
1055         client.WaitAll()
1056 }
1057
1058 // This appears to be the situation with the S3 BitTorrent client.
1059 func TestObfuscatedHeaderFallbackSeederDisallowsLeecherPrefers(t *testing.T) {
1060         // Leecher prefers obfuscation, but the seeder does not allow it.
1061         testSeederLeecherPair(
1062                 t,
1063                 func(cfg *ClientConfig) {
1064                         cfg.HeaderObfuscationPolicy.Preferred = false
1065                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1066                 },
1067                 func(cfg *ClientConfig) {
1068                         cfg.HeaderObfuscationPolicy.Preferred = true
1069                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1070                 },
1071         )
1072 }
1073
1074 func TestObfuscatedHeaderFallbackSeederRequiresLeecherPrefersNot(t *testing.T) {
1075         // Leecher prefers no obfuscation, but the seeder enforces it.
1076         testSeederLeecherPair(
1077                 t,
1078                 func(cfg *ClientConfig) {
1079                         cfg.HeaderObfuscationPolicy.Preferred = true
1080                         cfg.HeaderObfuscationPolicy.RequirePreferred = true
1081                 },
1082                 func(cfg *ClientConfig) {
1083                         cfg.HeaderObfuscationPolicy.Preferred = false
1084                         cfg.HeaderObfuscationPolicy.RequirePreferred = false
1085                 },
1086         )
1087 }
1088
1089 func TestClientAddressInUse(t *testing.T) {
1090         s, _ := NewUtpSocket("udp", ":50007", nil)
1091         if s != nil {
1092                 defer s.Close()
1093         }
1094         cfg := TestingConfig().SetListenAddr(":50007")
1095         cl, err := NewClient(cfg)
1096         require.Error(t, err)
1097         require.Nil(t, cl)
1098 }
1099
1100 func TestClientHasDhtServersWhenUtpDisabled(t *testing.T) {
1101         cc := TestingConfig()
1102         cc.DisableUTP = true
1103         cc.NoDHT = false
1104         cl, err := NewClient(cc)
1105         require.NoError(t, err)
1106         defer cl.Close()
1107         assert.NotEmpty(t, cl.DhtServers())
1108 }
1109
1110 func TestIssue335(t *testing.T) {
1111         dir, mi := testutil.GreetingTestTorrent()
1112         defer os.RemoveAll(dir)
1113         cfg := TestingConfig()
1114         cfg.Seed = false
1115         cfg.Debug = true
1116         cfg.DataDir = dir
1117         comp, err := storage.NewBoltPieceCompletion(dir)
1118         require.NoError(t, err)
1119         defer comp.Close()
1120         cfg.DefaultStorage = storage.NewMMapWithCompletion(dir, comp)
1121         cl, err := NewClient(cfg)
1122         require.NoError(t, err)
1123         defer cl.Close()
1124         tor, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1125         require.NoError(t, err)
1126         assert.True(t, new)
1127         require.True(t, cl.WaitAll())
1128         tor.Drop()
1129         tor, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1130         require.NoError(t, err)
1131         assert.True(t, new)
1132         require.True(t, cl.WaitAll())
1133 }