]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
TestUTPRawConn: More tidying, and slow down the rate
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "math/rand"
11         "net"
12         "os"
13         "path/filepath"
14         "strings"
15         "sync"
16         "testing"
17         "time"
18
19         _ "github.com/anacrolix/envpprof"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/filecache"
22         "github.com/anacrolix/missinggo/pubsub"
23         "github.com/bradfitz/iter"
24         "github.com/stretchr/testify/assert"
25         "github.com/stretchr/testify/require"
26         "golang.org/x/time/rate"
27
28         "github.com/anacrolix/torrent/bencode"
29         "github.com/anacrolix/torrent/internal/testutil"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32         "github.com/anacrolix/torrent/storage"
33 )
34
35 func init() {
36         log.SetFlags(log.LstdFlags | log.Llongfile)
37 }
38
39 func TestingConfig() *Config {
40         return &Config{
41                 ListenAddr:      "localhost:0",
42                 NoDHT:           true,
43                 DisableTrackers: true,
44                 DataDir: func() string {
45                         ret, err := ioutil.TempDir("", "")
46                         if err != nil {
47                                 panic(err)
48                         }
49                         return ret
50                 }(),
51                 Debug: true,
52         }
53 }
54
55 func TestClientDefault(t *testing.T) {
56         cl, err := NewClient(TestingConfig())
57         require.NoError(t, err)
58         cl.Close()
59 }
60
61 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
62         cfg := TestingConfig()
63         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
64         require.NoError(t, err)
65         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
66         defer ci.Close()
67         cfg.DefaultStorage = ci
68         cl, err := NewClient(cfg)
69         require.NoError(t, err)
70         cl.Close()
71         // And again, https://github.com/anacrolix/torrent/issues/158
72         cl, err = NewClient(cfg)
73         require.NoError(t, err)
74         cl.Close()
75 }
76
77 func TestAddDropTorrent(t *testing.T) {
78         cl, err := NewClient(TestingConfig())
79         require.NoError(t, err)
80         defer cl.Close()
81         dir, mi := testutil.GreetingTestTorrent()
82         defer os.RemoveAll(dir)
83         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
84         require.NoError(t, err)
85         assert.True(t, new)
86         tt.SetMaxEstablishedConns(0)
87         tt.SetMaxEstablishedConns(1)
88         tt.Drop()
89 }
90
91 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
92         t.SkipNow()
93 }
94
95 func TestAddTorrentNoUsableURLs(t *testing.T) {
96         t.SkipNow()
97 }
98
99 func TestAddPeersToUnknownTorrent(t *testing.T) {
100         t.SkipNow()
101 }
102
103 func TestPieceHashSize(t *testing.T) {
104         if pieceHash.Size() != 20 {
105                 t.FailNow()
106         }
107 }
108
109 func TestTorrentInitialState(t *testing.T) {
110         dir, mi := testutil.GreetingTestTorrent()
111         defer os.RemoveAll(dir)
112         tor := &Torrent{
113                 infoHash:          mi.HashInfoBytes(),
114                 pieceStateChanges: pubsub.NewPubSub(),
115         }
116         tor.chunkSize = 2
117         tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion("/dev/null", storage.NewMapPieceCompletion()))
118         // Needed to lock for asynchronous piece verification.
119         tor.cl = new(Client)
120         err := tor.setInfoBytes(mi.InfoBytes)
121         require.NoError(t, err)
122         require.Len(t, tor.pieces, 3)
123         tor.pendAllChunkSpecs(0)
124         tor.cl.mu.Lock()
125         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
126         tor.cl.mu.Unlock()
127         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
128 }
129
130 func TestUnmarshalPEXMsg(t *testing.T) {
131         var m peerExchangeMessage
132         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
133                 t.Fatal(err)
134         }
135         if len(m.Added) != 2 {
136                 t.FailNow()
137         }
138         if m.Added[0].Port != 0x506 {
139                 t.FailNow()
140         }
141 }
142
143 func TestReducedDialTimeout(t *testing.T) {
144         for _, _case := range []struct {
145                 Max             time.Duration
146                 HalfOpenLimit   int
147                 PendingPeers    int
148                 ExpectedReduced time.Duration
149         }{
150                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
151                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
152                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
153                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
154                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
155                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
156         } {
157                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
158                 expected := _case.ExpectedReduced
159                 if expected < minDialTimeout {
160                         expected = minDialTimeout
161                 }
162                 if reduced != expected {
163                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
164                 }
165         }
166 }
167
168 func TestUTPRawConn(t *testing.T) {
169         l, err := NewUtpSocket("udp", "")
170         require.NoError(t, err)
171         defer l.Close()
172         go func() {
173                 for {
174                         _, err := l.Accept()
175                         if err != nil {
176                                 break
177                         }
178                 }
179         }()
180         // Connect a UTP peer to see if the RawConn will still work.
181         s, err := NewUtpSocket("udp", "")
182         require.NoError(t, err)
183         defer s.Close()
184         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
185         require.NoError(t, err)
186         defer utpPeer.Close()
187         peer, err := net.ListenPacket("udp", ":0")
188         require.NoError(t, err)
189         defer peer.Close()
190
191         msgsReceived := 0
192         // How many messages to send. I've set this to double the channel buffer
193         // size in the raw packetConn.
194         const N = 200
195         readerStopped := make(chan struct{})
196         // The reader goroutine.
197         go func() {
198                 defer close(readerStopped)
199                 b := make([]byte, 500)
200                 for i := 0; i < N; i++ {
201                         n, _, err := l.ReadFrom(b)
202                         require.NoError(t, err)
203                         msgsReceived++
204                         var d int
205                         fmt.Sscan(string(b[:n]), &d)
206                         assert.Equal(t, i, d)
207                 }
208         }()
209         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
210         require.NoError(t, err)
211         for i := 0; i < N; i++ {
212                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
213                 require.NoError(t, err)
214                 time.Sleep(time.Millisecond)
215         }
216         select {
217         case <-readerStopped:
218         case <-time.After(time.Second):
219                 t.Fatal("reader timed out")
220         }
221         if msgsReceived != N {
222                 t.Fatalf("messages received: %d", msgsReceived)
223         }
224 }
225
226 func TestTwoClientsArbitraryPorts(t *testing.T) {
227         for i := 0; i < 2; i++ {
228                 cl, err := NewClient(TestingConfig())
229                 if err != nil {
230                         t.Fatal(err)
231                 }
232                 defer cl.Close()
233         }
234 }
235
236 func TestAddDropManyTorrents(t *testing.T) {
237         cl, err := NewClient(TestingConfig())
238         require.NoError(t, err)
239         defer cl.Close()
240         for i := range iter.N(1000) {
241                 var spec TorrentSpec
242                 binary.PutVarint(spec.InfoHash[:], int64(i))
243                 tt, new, err := cl.AddTorrentSpec(&spec)
244                 assert.NoError(t, err)
245                 assert.True(t, new)
246                 defer tt.Drop()
247         }
248 }
249
250 type FileCacheClientStorageFactoryParams struct {
251         Capacity    int64
252         SetCapacity bool
253         Wrapper     func(*filecache.Cache) storage.ClientImpl
254 }
255
256 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
257         return func(dataDir string) storage.ClientImpl {
258                 fc, err := filecache.NewCache(dataDir)
259                 if err != nil {
260                         panic(err)
261                 }
262                 if ps.SetCapacity {
263                         fc.SetCapacity(ps.Capacity)
264                 }
265                 return ps.Wrapper(fc)
266         }
267 }
268
269 type storageFactory func(string) storage.ClientImpl
270
271 func TestClientTransferDefault(t *testing.T) {
272         testClientTransfer(t, testClientTransferParams{
273                 ExportClientStatus: true,
274                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
275                         Wrapper: fileCachePieceResourceStorage,
276                 }),
277         })
278 }
279
280 func TestClientTransferRateLimitedUpload(t *testing.T) {
281         started := time.Now()
282         testClientTransfer(t, testClientTransferParams{
283                 // We are uploading 13 bytes (the length of the greeting torrent). The
284                 // chunks are 2 bytes in length. Then the smallest burst we can run
285                 // with is 2. Time taken is (13-burst)/rate.
286                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
287         })
288         require.True(t, time.Since(started) > time.Second)
289 }
290
291 func TestClientTransferRateLimitedDownload(t *testing.T) {
292         testClientTransfer(t, testClientTransferParams{
293                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
294         })
295 }
296
297 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
298         return storage.NewResourcePieces(fc.AsResourceProvider())
299 }
300
301 func TestClientTransferSmallCache(t *testing.T) {
302         testClientTransfer(t, testClientTransferParams{
303                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
304                         SetCapacity: true,
305                         // Going below the piece length means it can't complete a piece so
306                         // that it can be hashed.
307                         Capacity: 5,
308                         Wrapper:  fileCachePieceResourceStorage,
309                 }),
310                 SetReadahead: true,
311                 // Can't readahead too far or the cache will thrash and drop data we
312                 // thought we had.
313                 Readahead:          0,
314                 ExportClientStatus: true,
315         })
316 }
317
318 func TestClientTransferVarious(t *testing.T) {
319         // Leecher storage
320         for _, ls := range []storageFactory{
321                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
322                         Wrapper: fileCachePieceResourceStorage,
323                 }),
324                 storage.NewBoltDB,
325         } {
326                 // Seeder storage
327                 for _, ss := range []func(string) storage.ClientImpl{
328                         storage.NewFile,
329                         storage.NewMMap,
330                 } {
331                         for _, responsive := range []bool{false, true} {
332                                 testClientTransfer(t, testClientTransferParams{
333                                         Responsive:     responsive,
334                                         SeederStorage:  ss,
335                                         LeecherStorage: ls,
336                                 })
337                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
338                                         testClientTransfer(t, testClientTransferParams{
339                                                 SeederStorage:  ss,
340                                                 Responsive:     responsive,
341                                                 SetReadahead:   true,
342                                                 Readahead:      readahead,
343                                                 LeecherStorage: ls,
344                                         })
345                                 }
346                         }
347                 }
348         }
349 }
350
351 type testClientTransferParams struct {
352         Responsive                 bool
353         Readahead                  int64
354         SetReadahead               bool
355         ExportClientStatus         bool
356         LeecherStorage             func(string) storage.ClientImpl
357         SeederStorage              func(string) storage.ClientImpl
358         SeederUploadRateLimiter    *rate.Limiter
359         LeecherDownloadRateLimiter *rate.Limiter
360 }
361
362 // Creates a seeder and a leecher, and ensures the data transfers when a read
363 // is attempted on the leecher.
364 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
365         greetingTempDir, mi := testutil.GreetingTestTorrent()
366         defer os.RemoveAll(greetingTempDir)
367         // Create seeder and a Torrent.
368         cfg := TestingConfig()
369         cfg.Seed = true
370         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
371         // cfg.ListenAddr = "localhost:4000"
372         if ps.SeederStorage != nil {
373                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
374                 defer cfg.DefaultStorage.Close()
375         } else {
376                 cfg.DataDir = greetingTempDir
377         }
378         seeder, err := NewClient(cfg)
379         require.NoError(t, err)
380         defer seeder.Close()
381         if ps.ExportClientStatus {
382                 testutil.ExportStatusWriter(seeder, "s")
383         }
384         // seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
385         _, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
386         require.NoError(t, err)
387         assert.True(t, new)
388         // Create leecher and a Torrent.
389         leecherDataDir, err := ioutil.TempDir("", "")
390         require.NoError(t, err)
391         defer os.RemoveAll(leecherDataDir)
392         if ps.LeecherStorage == nil {
393                 cfg.DataDir = leecherDataDir
394         } else {
395                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
396         }
397         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
398         // cfg.ListenAddr = "localhost:4001"
399         leecher, err := NewClient(cfg)
400         require.NoError(t, err)
401         defer leecher.Close()
402         if ps.ExportClientStatus {
403                 testutil.ExportStatusWriter(leecher, "l")
404         }
405         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
406                 ret = TorrentSpecFromMetaInfo(mi)
407                 ret.ChunkSize = 2
408                 return
409         }())
410         require.NoError(t, err)
411         assert.True(t, new)
412         // Now do some things with leecher and seeder.
413         addClientPeer(leecherGreeting, seeder)
414         r := leecherGreeting.NewReader()
415         defer r.Close()
416         if ps.Responsive {
417                 r.SetResponsive()
418         }
419         if ps.SetReadahead {
420                 r.SetReadahead(ps.Readahead)
421         }
422         assertReadAllGreeting(t, r)
423         // After one read through, we can assume certain torrent statistics.
424         // These are not a strict requirement. It is however interesting to
425         // follow.
426         // t.Logf("%#v", seederTorrent.Stats())
427         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
428         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
429         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
430         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
431         // Read through again for the cases where the torrent data size exceeds
432         // the size of the cache.
433         assertReadAllGreeting(t, r)
434 }
435
436 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
437         pos, err := r.Seek(0, os.SEEK_SET)
438         assert.NoError(t, err)
439         assert.EqualValues(t, 0, pos)
440         _greeting, err := ioutil.ReadAll(r)
441         assert.NoError(t, err)
442         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
443 }
444
445 // Check that after completing leeching, a leecher transitions to a seeding
446 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
447 func TestSeedAfterDownloading(t *testing.T) {
448         greetingTempDir, mi := testutil.GreetingTestTorrent()
449         defer os.RemoveAll(greetingTempDir)
450         cfg := TestingConfig()
451         cfg.Seed = true
452         cfg.DataDir = greetingTempDir
453         seeder, err := NewClient(cfg)
454         require.NoError(t, err)
455         defer seeder.Close()
456         testutil.ExportStatusWriter(seeder, "s")
457         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
458         cfg.DataDir, err = ioutil.TempDir("", "")
459         require.NoError(t, err)
460         defer os.RemoveAll(cfg.DataDir)
461         leecher, err := NewClient(cfg)
462         require.NoError(t, err)
463         defer leecher.Close()
464         testutil.ExportStatusWriter(leecher, "l")
465         cfg.Seed = false
466         // cfg.TorrentDataOpener = nil
467         cfg.DataDir, err = ioutil.TempDir("", "")
468         require.NoError(t, err)
469         defer os.RemoveAll(cfg.DataDir)
470         leecherLeecher, _ := NewClient(cfg)
471         defer leecherLeecher.Close()
472         testutil.ExportStatusWriter(leecherLeecher, "ll")
473         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
474                 ret = TorrentSpecFromMetaInfo(mi)
475                 ret.ChunkSize = 2
476                 return
477         }())
478         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
479                 ret = TorrentSpecFromMetaInfo(mi)
480                 ret.ChunkSize = 3
481                 return
482         }())
483         // Simultaneously DownloadAll in Leecher, and read the contents
484         // consecutively in LeecherLeecher. This non-deterministically triggered a
485         // case where the leecher wouldn't unchoke the LeecherLeecher.
486         var wg sync.WaitGroup
487         wg.Add(1)
488         go func() {
489                 defer wg.Done()
490                 r := llg.NewReader()
491                 defer r.Close()
492                 b, err := ioutil.ReadAll(r)
493                 require.NoError(t, err)
494                 assert.EqualValues(t, testutil.GreetingFileContents, b)
495         }()
496         addClientPeer(leecherGreeting, seeder)
497         addClientPeer(leecherGreeting, leecherLeecher)
498         wg.Add(1)
499         go func() {
500                 defer wg.Done()
501                 leecherGreeting.DownloadAll()
502                 leecher.WaitAll()
503         }()
504         wg.Wait()
505 }
506
507 func TestMergingTrackersByAddingSpecs(t *testing.T) {
508         cl, err := NewClient(TestingConfig())
509         require.NoError(t, err)
510         defer cl.Close()
511         spec := TorrentSpec{}
512         T, new, _ := cl.AddTorrentSpec(&spec)
513         if !new {
514                 t.FailNow()
515         }
516         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
517         _, new, _ = cl.AddTorrentSpec(&spec)
518         assert.False(t, new)
519         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
520         // Because trackers are disabled in TestingConfig.
521         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
522 }
523
524 type badStorage struct{}
525
526 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
527         return bs, nil
528 }
529
530 func (bs badStorage) Close() error {
531         return nil
532 }
533
534 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
535         return badStoragePiece{p}
536 }
537
538 type badStoragePiece struct {
539         p metainfo.Piece
540 }
541
542 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
543         return 0, nil
544 }
545
546 func (p badStoragePiece) GetIsComplete() bool {
547         return true
548 }
549
550 func (p badStoragePiece) MarkComplete() error {
551         return errors.New("psyyyyyyyche")
552 }
553
554 func (p badStoragePiece) MarkNotComplete() error {
555         return errors.New("psyyyyyyyche")
556 }
557
558 func (p badStoragePiece) randomlyTruncatedDataString() string {
559         return "hello, world\n"[:rand.Intn(14)]
560 }
561
562 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
563         r := strings.NewReader(p.randomlyTruncatedDataString())
564         return r.ReadAt(b, off+p.p.Offset())
565 }
566
567 // We read from a piece which is marked completed, but is missing data.
568 func TestCompletedPieceWrongSize(t *testing.T) {
569         cfg := TestingConfig()
570         cfg.DefaultStorage = badStorage{}
571         cl, err := NewClient(cfg)
572         require.NoError(t, err)
573         defer cl.Close()
574         info := metainfo.Info{
575                 PieceLength: 15,
576                 Pieces:      make([]byte, 20),
577                 Files: []metainfo.FileInfo{
578                         {Path: []string{"greeting"}, Length: 13},
579                 },
580         }
581         b, err := bencode.Marshal(info)
582         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
583                 InfoBytes: b,
584                 InfoHash:  metainfo.HashBytes(b),
585         })
586         require.NoError(t, err)
587         defer tt.Drop()
588         assert.True(t, new)
589         r := tt.NewReader()
590         defer r.Close()
591         b, err = ioutil.ReadAll(r)
592         assert.Len(t, b, 13)
593         assert.NoError(t, err)
594 }
595
596 func BenchmarkAddLargeTorrent(b *testing.B) {
597         cfg := TestingConfig()
598         cfg.DisableTCP = true
599         cfg.DisableUTP = true
600         cfg.ListenAddr = "redonk"
601         cl, err := NewClient(cfg)
602         require.NoError(b, err)
603         defer cl.Close()
604         for range iter.N(b.N) {
605                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
606                 if err != nil {
607                         b.Fatal(err)
608                 }
609                 t.Drop()
610         }
611 }
612
613 func TestResponsive(t *testing.T) {
614         seederDataDir, mi := testutil.GreetingTestTorrent()
615         defer os.RemoveAll(seederDataDir)
616         cfg := TestingConfig()
617         cfg.Seed = true
618         cfg.DataDir = seederDataDir
619         seeder, err := NewClient(cfg)
620         require.Nil(t, err)
621         defer seeder.Close()
622         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
623         leecherDataDir, err := ioutil.TempDir("", "")
624         require.Nil(t, err)
625         defer os.RemoveAll(leecherDataDir)
626         cfg = TestingConfig()
627         cfg.DataDir = leecherDataDir
628         leecher, err := NewClient(cfg)
629         require.Nil(t, err)
630         defer leecher.Close()
631         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
632                 ret = TorrentSpecFromMetaInfo(mi)
633                 ret.ChunkSize = 2
634                 return
635         }())
636         addClientPeer(leecherTorrent, seeder)
637         reader := leecherTorrent.NewReader()
638         defer reader.Close()
639         reader.SetReadahead(0)
640         reader.SetResponsive()
641         b := make([]byte, 2)
642         _, err = reader.Seek(3, os.SEEK_SET)
643         require.NoError(t, err)
644         _, err = io.ReadFull(reader, b)
645         assert.Nil(t, err)
646         assert.EqualValues(t, "lo", string(b))
647         _, err = reader.Seek(11, os.SEEK_SET)
648         require.NoError(t, err)
649         n, err := io.ReadFull(reader, b)
650         assert.Nil(t, err)
651         assert.EqualValues(t, 2, n)
652         assert.EqualValues(t, "d\n", string(b))
653 }
654
655 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
656         seederDataDir, mi := testutil.GreetingTestTorrent()
657         defer os.RemoveAll(seederDataDir)
658         cfg := TestingConfig()
659         cfg.Seed = true
660         cfg.DataDir = seederDataDir
661         seeder, err := NewClient(cfg)
662         require.Nil(t, err)
663         defer seeder.Close()
664         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
665         leecherDataDir, err := ioutil.TempDir("", "")
666         require.Nil(t, err)
667         defer os.RemoveAll(leecherDataDir)
668         cfg = TestingConfig()
669         cfg.DataDir = leecherDataDir
670         leecher, err := NewClient(cfg)
671         require.Nil(t, err)
672         defer leecher.Close()
673         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
674                 ret = TorrentSpecFromMetaInfo(mi)
675                 ret.ChunkSize = 2
676                 return
677         }())
678         addClientPeer(leecherTorrent, seeder)
679         reader := leecherTorrent.NewReader()
680         defer reader.Close()
681         reader.SetReadahead(0)
682         reader.SetResponsive()
683         b := make([]byte, 2)
684         _, err = reader.Seek(3, os.SEEK_SET)
685         require.NoError(t, err)
686         _, err = io.ReadFull(reader, b)
687         assert.Nil(t, err)
688         assert.EqualValues(t, "lo", string(b))
689         go leecherTorrent.Drop()
690         _, err = reader.Seek(11, os.SEEK_SET)
691         require.NoError(t, err)
692         n, err := reader.Read(b)
693         assert.EqualError(t, err, "torrent closed")
694         assert.EqualValues(t, 0, n)
695 }
696
697 func TestDHTInheritBlocklist(t *testing.T) {
698         ipl := iplist.New(nil)
699         require.NotNil(t, ipl)
700         cfg := TestingConfig()
701         cfg.IPBlocklist = ipl
702         cfg.NoDHT = false
703         cl, err := NewClient(cfg)
704         require.NoError(t, err)
705         defer cl.Close()
706         require.Equal(t, ipl, cl.DHT().IPBlocklist())
707 }
708
709 // Check that stuff is merged in subsequent AddTorrentSpec for the same
710 // infohash.
711 func TestAddTorrentSpecMerging(t *testing.T) {
712         cl, err := NewClient(TestingConfig())
713         require.NoError(t, err)
714         defer cl.Close()
715         dir, mi := testutil.GreetingTestTorrent()
716         defer os.RemoveAll(dir)
717         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
718                 InfoHash: mi.HashInfoBytes(),
719         })
720         require.NoError(t, err)
721         require.True(t, new)
722         require.Nil(t, tt.Info())
723         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
724         require.NoError(t, err)
725         require.False(t, new)
726         require.NotNil(t, tt.Info())
727 }
728
729 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
730         dir, mi := testutil.GreetingTestTorrent()
731         os.RemoveAll(dir)
732         cl, _ := NewClient(TestingConfig())
733         defer cl.Close()
734         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
735                 InfoHash: mi.HashInfoBytes(),
736         })
737         tt.Drop()
738         assert.EqualValues(t, 0, len(cl.Torrents()))
739         select {
740         case <-tt.GotInfo():
741                 t.FailNow()
742         default:
743         }
744 }
745
746 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
747         for i := range iter.N(info.NumPieces()) {
748                 p := info.Piece(i)
749                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
750         }
751 }
752
753 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
754         fileCacheDir, err := ioutil.TempDir("", "")
755         require.NoError(t, err)
756         defer os.RemoveAll(fileCacheDir)
757         fileCache, err := filecache.NewCache(fileCacheDir)
758         require.NoError(t, err)
759         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
760         defer os.RemoveAll(greetingDataTempDir)
761         filePieceStore := csf(fileCache)
762         info, err := greetingMetainfo.UnmarshalInfo()
763         require.NoError(t, err)
764         ih := greetingMetainfo.HashInfoBytes()
765         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
766         require.NoError(t, err)
767         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
768         // require.Equal(t, len(testutil.GreetingFileContents), written)
769         // require.NoError(t, err)
770         for i := 0; i < info.NumPieces(); i++ {
771                 p := info.Piece(i)
772                 if alreadyCompleted {
773                         err := greetingData.Piece(p).MarkComplete()
774                         assert.NoError(t, err)
775                 }
776         }
777         cfg := TestingConfig()
778         // TODO: Disable network option?
779         cfg.DisableTCP = true
780         cfg.DisableUTP = true
781         cfg.DefaultStorage = filePieceStore
782         cl, err := NewClient(cfg)
783         require.NoError(t, err)
784         defer cl.Close()
785         tt, err := cl.AddTorrent(greetingMetainfo)
786         require.NoError(t, err)
787         psrs := tt.PieceStateRuns()
788         assert.Len(t, psrs, 1)
789         assert.EqualValues(t, 3, psrs[0].Length)
790         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
791         if alreadyCompleted {
792                 r := tt.NewReader()
793                 b, err := ioutil.ReadAll(r)
794                 assert.NoError(t, err)
795                 assert.EqualValues(t, testutil.GreetingFileContents, b)
796         }
797 }
798
799 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
800         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
801 }
802
803 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
804         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
805 }
806
807 func TestAddMetainfoWithNodes(t *testing.T) {
808         cfg := TestingConfig()
809         cfg.ListenAddr = ":0"
810         cfg.NoDHT = false
811         // For now, we want to just jam the nodes into the table, without
812         // verifying them first. Also the DHT code doesn't support mixing secure
813         // and insecure nodes if security is enabled (yet).
814         cfg.DHTConfig.NoSecurity = true
815         cl, err := NewClient(cfg)
816         require.NoError(t, err)
817         defer cl.Close()
818         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
819         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
820         require.NoError(t, err)
821         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
822         // check if the announce-list is here instead. TODO: Add nodes.
823         assert.Len(t, tt.metainfo.AnnounceList, 5)
824         // There are 6 nodes in the torrent file.
825         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
826 }
827
828 type testDownloadCancelParams struct {
829         ExportClientStatus        bool
830         SetLeecherStorageCapacity bool
831         LeecherStorageCapacity    int64
832         Cancel                    bool
833 }
834
835 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
836         greetingTempDir, mi := testutil.GreetingTestTorrent()
837         defer os.RemoveAll(greetingTempDir)
838         cfg := TestingConfig()
839         cfg.Seed = true
840         cfg.DataDir = greetingTempDir
841         seeder, err := NewClient(cfg)
842         require.NoError(t, err)
843         defer seeder.Close()
844         if ps.ExportClientStatus {
845                 testutil.ExportStatusWriter(seeder, "s")
846         }
847         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
848         leecherDataDir, err := ioutil.TempDir("", "")
849         require.NoError(t, err)
850         defer os.RemoveAll(leecherDataDir)
851         fc, err := filecache.NewCache(leecherDataDir)
852         require.NoError(t, err)
853         if ps.SetLeecherStorageCapacity {
854                 fc.SetCapacity(ps.LeecherStorageCapacity)
855         }
856         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
857         cfg.DataDir = leecherDataDir
858         leecher, _ := NewClient(cfg)
859         defer leecher.Close()
860         if ps.ExportClientStatus {
861                 testutil.ExportStatusWriter(leecher, "l")
862         }
863         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
864                 ret = TorrentSpecFromMetaInfo(mi)
865                 ret.ChunkSize = 2
866                 return
867         }())
868         require.NoError(t, err)
869         assert.True(t, new)
870         psc := leecherGreeting.SubscribePieceStateChanges()
871         defer psc.Close()
872         leecherGreeting.DownloadAll()
873         if ps.Cancel {
874                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
875         }
876         addClientPeer(leecherGreeting, seeder)
877         completes := make(map[int]bool, 3)
878 values:
879         for {
880                 // started := time.Now()
881                 select {
882                 case _v := <-psc.Values:
883                         // log.Print(time.Since(started))
884                         v := _v.(PieceStateChange)
885                         completes[v.Index] = v.Complete
886                 case <-time.After(100 * time.Millisecond):
887                         break values
888                 }
889         }
890         if ps.Cancel {
891                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
892         } else {
893                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
894         }
895
896 }
897
898 func TestTorrentDownloadAll(t *testing.T) {
899         testDownloadCancel(t, testDownloadCancelParams{})
900 }
901
902 func TestTorrentDownloadAllThenCancel(t *testing.T) {
903         testDownloadCancel(t, testDownloadCancelParams{
904                 Cancel: true,
905         })
906 }
907
908 // Ensure that it's an error for a peer to send an invalid have message.
909 func TestPeerInvalidHave(t *testing.T) {
910         cl, err := NewClient(TestingConfig())
911         require.NoError(t, err)
912         defer cl.Close()
913         info := metainfo.Info{
914                 PieceLength: 1,
915                 Pieces:      make([]byte, 20),
916                 Files:       []metainfo.FileInfo{{Length: 1}},
917         }
918         infoBytes, err := bencode.Marshal(info)
919         require.NoError(t, err)
920         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
921                 InfoBytes: infoBytes,
922                 InfoHash:  metainfo.HashBytes(infoBytes),
923         })
924         require.NoError(t, err)
925         assert.True(t, _new)
926         defer tt.Drop()
927         cn := &connection{
928                 t: tt,
929         }
930         assert.NoError(t, cn.peerSentHave(0))
931         assert.Error(t, cn.peerSentHave(1))
932 }
933
934 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
935         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
936         defer os.RemoveAll(greetingTempDir)
937         cfg := TestingConfig()
938         cfg.DataDir = greetingTempDir
939         seeder, err := NewClient(TestingConfig())
940         require.NoError(t, err)
941         seeder.AddTorrentSpec(&TorrentSpec{
942                 InfoBytes: greetingMetainfo.InfoBytes,
943         })
944 }
945
946 func TestPrepareTrackerAnnounce(t *testing.T) {
947         cl := &Client{}
948         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
949         require.NoError(t, err)
950         assert.False(t, blocked)
951         assert.EqualValues(t, "localhost:1234", host)
952         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
953 }
954
955 // Check that when the listen port is 0, all the protocols listened on have
956 // the same port, and it isn't zero.
957 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
958         cl, err := NewClient(TestingConfig())
959         require.NoError(t, err)
960         defer cl.Close()
961         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
962         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
963 }
964
965 func TestClientDynamicListenTCPOnly(t *testing.T) {
966         cfg := TestingConfig()
967         cfg.DisableUTP = true
968         cl, err := NewClient(cfg)
969         require.NoError(t, err)
970         defer cl.Close()
971         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
972         assert.Nil(t, cl.utpSock)
973 }
974
975 func TestClientDynamicListenUTPOnly(t *testing.T) {
976         cfg := TestingConfig()
977         cfg.DisableTCP = true
978         cl, err := NewClient(cfg)
979         require.NoError(t, err)
980         defer cl.Close()
981         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
982         assert.Nil(t, cl.tcpListener)
983 }
984
985 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
986         cfg := TestingConfig()
987         cfg.DisableTCP = true
988         cfg.DisableUTP = true
989         cl, err := NewClient(cfg)
990         require.NoError(t, err)
991         defer cl.Close()
992         assert.Nil(t, cl.ListenAddr())
993 }
994
995 func addClientPeer(t *Torrent, cl *Client) {
996         t.AddPeers([]Peer{
997                 {
998                         IP:   missinggo.AddrIP(cl.ListenAddr()),
999                         Port: missinggo.AddrPort(cl.ListenAddr()),
1000                 },
1001         })
1002 }
1003
1004 func totalConns(tts []*Torrent) (ret int) {
1005         for _, tt := range tts {
1006                 tt.cl.mu.Lock()
1007                 ret += len(tt.conns)
1008                 tt.cl.mu.Unlock()
1009         }
1010         return
1011 }
1012
1013 func TestSetMaxEstablishedConn(t *testing.T) {
1014         var tts []*Torrent
1015         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1016         for i := range iter.N(3) {
1017                 cl, err := NewClient(TestingConfig())
1018                 require.NoError(t, err)
1019                 defer cl.Close()
1020                 tt, _ := cl.AddTorrentInfoHash(ih)
1021                 tt.SetMaxEstablishedConns(2)
1022                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1023                 tts = append(tts, tt)
1024         }
1025         addPeers := func() {
1026                 for i, tt := range tts {
1027                         for _, _tt := range tts[:i] {
1028                                 addClientPeer(tt, _tt.cl)
1029                         }
1030                 }
1031         }
1032         waitTotalConns := func(num int) {
1033                 for totalConns(tts) != num {
1034                         time.Sleep(time.Millisecond)
1035                 }
1036         }
1037         addPeers()
1038         waitTotalConns(6)
1039         tts[0].SetMaxEstablishedConns(1)
1040         waitTotalConns(4)
1041         tts[0].SetMaxEstablishedConns(0)
1042         waitTotalConns(2)
1043         tts[0].SetMaxEstablishedConns(1)
1044         addPeers()
1045         waitTotalConns(4)
1046         tts[0].SetMaxEstablishedConns(2)
1047         addPeers()
1048         waitTotalConns(6)
1049 }
1050
1051 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1052         os.MkdirAll(dir, 0770)
1053         file, err := os.Create(filepath.Join(dir, name))
1054         require.NoError(t, err)
1055         file.Write([]byte(name))
1056         file.Close()
1057         mi := metainfo.MetaInfo{}
1058         mi.SetDefaults()
1059         info := metainfo.Info{PieceLength: 256 * 1024}
1060         err = info.BuildFromFilePath(filepath.Join(dir, name))
1061         require.NoError(t, err)
1062         mi.InfoBytes, err = bencode.Marshal(info)
1063         require.NoError(t, err)
1064         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1065         tr, err := cl.AddTorrent(&mi)
1066         require.NoError(t, err)
1067         assert.True(t, tr.Seeding())
1068         return magnet
1069 }
1070
1071 // https://github.com/anacrolix/torrent/issues/114
1072 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1073         cfg := TestingConfig()
1074         cfg.DisableUTP = true
1075         cfg.Seed = true
1076         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1077         cfg.Debug = true
1078         cfg.ForceEncryption = true
1079         os.Mkdir(cfg.DataDir, 0755)
1080         server, err := NewClient(cfg)
1081         require.NoError(t, err)
1082         defer server.Close()
1083         testutil.ExportStatusWriter(server, "s")
1084         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1085         makeMagnet(t, server, cfg.DataDir, "test2")
1086         cfg = TestingConfig()
1087         cfg.DisableUTP = true
1088         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1089         cfg.Debug = true
1090         cfg.ForceEncryption = true
1091         client, err := NewClient(cfg)
1092         require.NoError(t, err)
1093         defer client.Close()
1094         testutil.ExportStatusWriter(client, "c")
1095         tr, err := client.AddMagnet(magnet1)
1096         require.NoError(t, err)
1097         tr.AddPeers([]Peer{{
1098                 IP:   missinggo.AddrIP(server.ListenAddr()),
1099                 Port: missinggo.AddrPort(server.ListenAddr()),
1100         }})
1101         <-tr.GotInfo()
1102         tr.DownloadAll()
1103         client.WaitAll()
1104 }