]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Add very tentative UPnP NAT traversal
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "math/rand"
10         "net"
11         "os"
12         "path/filepath"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/bradfitz/iter"
23         "github.com/stretchr/testify/assert"
24         "github.com/stretchr/testify/require"
25         "golang.org/x/time/rate"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/internal/testutil"
29         "github.com/anacrolix/torrent/iplist"
30         "github.com/anacrolix/torrent/metainfo"
31         "github.com/anacrolix/torrent/storage"
32 )
33
34 func TestingConfig() *Config {
35         return &Config{
36                 ListenAddr:              "localhost:0",
37                 NoDHT:                   true,
38                 DataDir:                 tempDir(),
39                 DisableTrackers:         true,
40                 NoDefaultPortForwarding: true,
41                 // Debug:           true,
42         }
43 }
44
45 func TestClientDefault(t *testing.T) {
46         cl, err := NewClient(TestingConfig())
47         require.NoError(t, err)
48         cl.Close()
49 }
50
51 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
52         cfg := TestingConfig()
53         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
54         require.NoError(t, err)
55         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
56         defer ci.Close()
57         cfg.DefaultStorage = ci
58         cl, err := NewClient(cfg)
59         require.NoError(t, err)
60         cl.Close()
61         // And again, https://github.com/anacrolix/torrent/issues/158
62         cl, err = NewClient(cfg)
63         require.NoError(t, err)
64         cl.Close()
65 }
66
67 func TestAddDropTorrent(t *testing.T) {
68         cl, err := NewClient(TestingConfig())
69         require.NoError(t, err)
70         defer cl.Close()
71         dir, mi := testutil.GreetingTestTorrent()
72         defer os.RemoveAll(dir)
73         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
74         require.NoError(t, err)
75         assert.True(t, new)
76         tt.SetMaxEstablishedConns(0)
77         tt.SetMaxEstablishedConns(1)
78         tt.Drop()
79 }
80
81 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
82         // TODO?
83         t.SkipNow()
84 }
85
86 func TestAddTorrentNoUsableURLs(t *testing.T) {
87         // TODO?
88         t.SkipNow()
89 }
90
91 func TestAddPeersToUnknownTorrent(t *testing.T) {
92         // TODO?
93         t.SkipNow()
94 }
95
96 func TestPieceHashSize(t *testing.T) {
97         assert.Equal(t, 20, pieceHash.Size())
98 }
99
100 func TestTorrentInitialState(t *testing.T) {
101         dir, mi := testutil.GreetingTestTorrent()
102         defer os.RemoveAll(dir)
103         tor := &Torrent{
104                 infoHash:          mi.HashInfoBytes(),
105                 pieceStateChanges: pubsub.NewPubSub(),
106         }
107         tor.chunkSize = 2
108         tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()))
109         // Needed to lock for asynchronous piece verification.
110         tor.cl = new(Client)
111         tor.cl.mu.Lock()
112         err := tor.setInfoBytes(mi.InfoBytes)
113         tor.cl.mu.Unlock()
114         require.NoError(t, err)
115         require.Len(t, tor.pieces, 3)
116         tor.pendAllChunkSpecs(0)
117         tor.cl.mu.Lock()
118         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
119         tor.cl.mu.Unlock()
120         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
121 }
122
123 func TestUnmarshalPEXMsg(t *testing.T) {
124         var m peerExchangeMessage
125         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
126                 t.Fatal(err)
127         }
128         if len(m.Added) != 2 {
129                 t.FailNow()
130         }
131         if m.Added[0].Port != 0x506 {
132                 t.FailNow()
133         }
134 }
135
136 func TestReducedDialTimeout(t *testing.T) {
137         cfg := &Config{}
138         cfg.setDefaults()
139         for _, _case := range []struct {
140                 Max             time.Duration
141                 HalfOpenLimit   int
142                 PendingPeers    int
143                 ExpectedReduced time.Duration
144         }{
145                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
146                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
147                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
148                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
149                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
150                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
151         } {
152                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
153                 expected := _case.ExpectedReduced
154                 if expected < cfg.MinDialTimeout {
155                         expected = cfg.MinDialTimeout
156                 }
157                 if reduced != expected {
158                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
159                 }
160         }
161 }
162
163 func TestUTPRawConn(t *testing.T) {
164         l, err := NewUtpSocket("udp", "")
165         require.NoError(t, err)
166         defer l.Close()
167         go func() {
168                 for {
169                         _, err := l.Accept()
170                         if err != nil {
171                                 break
172                         }
173                 }
174         }()
175         // Connect a UTP peer to see if the RawConn will still work.
176         s, err := NewUtpSocket("udp", "")
177         require.NoError(t, err)
178         defer s.Close()
179         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
180         require.NoError(t, err)
181         defer utpPeer.Close()
182         peer, err := net.ListenPacket("udp", ":0")
183         require.NoError(t, err)
184         defer peer.Close()
185
186         msgsReceived := 0
187         // How many messages to send. I've set this to double the channel buffer
188         // size in the raw packetConn.
189         const N = 200
190         readerStopped := make(chan struct{})
191         // The reader goroutine.
192         go func() {
193                 defer close(readerStopped)
194                 b := make([]byte, 500)
195                 for i := 0; i < N; i++ {
196                         n, _, err := l.ReadFrom(b)
197                         require.NoError(t, err)
198                         msgsReceived++
199                         var d int
200                         fmt.Sscan(string(b[:n]), &d)
201                         assert.Equal(t, i, d)
202                 }
203         }()
204         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
205         require.NoError(t, err)
206         for i := 0; i < N; i++ {
207                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
208                 require.NoError(t, err)
209                 time.Sleep(time.Millisecond)
210         }
211         select {
212         case <-readerStopped:
213         case <-time.After(time.Second):
214                 t.Fatal("reader timed out")
215         }
216         if msgsReceived != N {
217                 t.Fatalf("messages received: %d", msgsReceived)
218         }
219 }
220
221 func TestTwoClientsArbitraryPorts(t *testing.T) {
222         for i := 0; i < 2; i++ {
223                 cl, err := NewClient(TestingConfig())
224                 if err != nil {
225                         t.Fatal(err)
226                 }
227                 defer cl.Close()
228         }
229 }
230
231 func TestAddDropManyTorrents(t *testing.T) {
232         cl, err := NewClient(TestingConfig())
233         require.NoError(t, err)
234         defer cl.Close()
235         for i := range iter.N(1000) {
236                 var spec TorrentSpec
237                 binary.PutVarint(spec.InfoHash[:], int64(i))
238                 tt, new, err := cl.AddTorrentSpec(&spec)
239                 assert.NoError(t, err)
240                 assert.True(t, new)
241                 defer tt.Drop()
242         }
243 }
244
245 type FileCacheClientStorageFactoryParams struct {
246         Capacity    int64
247         SetCapacity bool
248         Wrapper     func(*filecache.Cache) storage.ClientImpl
249 }
250
251 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
252         return func(dataDir string) storage.ClientImpl {
253                 fc, err := filecache.NewCache(dataDir)
254                 if err != nil {
255                         panic(err)
256                 }
257                 if ps.SetCapacity {
258                         fc.SetCapacity(ps.Capacity)
259                 }
260                 return ps.Wrapper(fc)
261         }
262 }
263
264 type storageFactory func(string) storage.ClientImpl
265
266 func TestClientTransferDefault(t *testing.T) {
267         testClientTransfer(t, testClientTransferParams{
268                 ExportClientStatus: true,
269                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
270                         Wrapper: fileCachePieceResourceStorage,
271                 }),
272         })
273 }
274
275 func TestClientTransferRateLimitedUpload(t *testing.T) {
276         started := time.Now()
277         testClientTransfer(t, testClientTransferParams{
278                 // We are uploading 13 bytes (the length of the greeting torrent). The
279                 // chunks are 2 bytes in length. Then the smallest burst we can run
280                 // with is 2. Time taken is (13-burst)/rate.
281                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
282         })
283         require.True(t, time.Since(started) > time.Second)
284 }
285
286 func TestClientTransferRateLimitedDownload(t *testing.T) {
287         testClientTransfer(t, testClientTransferParams{
288                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
289         })
290 }
291
292 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
293         return storage.NewResourcePieces(fc.AsResourceProvider())
294 }
295
296 func TestClientTransferSmallCache(t *testing.T) {
297         testClientTransfer(t, testClientTransferParams{
298                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
299                         SetCapacity: true,
300                         // Going below the piece length means it can't complete a piece so
301                         // that it can be hashed.
302                         Capacity: 5,
303                         Wrapper:  fileCachePieceResourceStorage,
304                 }),
305                 SetReadahead: true,
306                 // Can't readahead too far or the cache will thrash and drop data we
307                 // thought we had.
308                 Readahead:          0,
309                 ExportClientStatus: true,
310         })
311 }
312
313 func TestClientTransferVarious(t *testing.T) {
314         // Leecher storage
315         for _, ls := range []storageFactory{
316                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
317                         Wrapper: fileCachePieceResourceStorage,
318                 }),
319                 storage.NewBoltDB,
320         } {
321                 // Seeder storage
322                 for _, ss := range []func(string) storage.ClientImpl{
323                         storage.NewFile,
324                         storage.NewMMap,
325                 } {
326                         for _, responsive := range []bool{false, true} {
327                                 testClientTransfer(t, testClientTransferParams{
328                                         Responsive:     responsive,
329                                         SeederStorage:  ss,
330                                         LeecherStorage: ls,
331                                 })
332                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
333                                         testClientTransfer(t, testClientTransferParams{
334                                                 SeederStorage:  ss,
335                                                 Responsive:     responsive,
336                                                 SetReadahead:   true,
337                                                 Readahead:      readahead,
338                                                 LeecherStorage: ls,
339                                         })
340                                 }
341                         }
342                 }
343         }
344 }
345
346 type testClientTransferParams struct {
347         Responsive                 bool
348         Readahead                  int64
349         SetReadahead               bool
350         ExportClientStatus         bool
351         LeecherStorage             func(string) storage.ClientImpl
352         SeederStorage              func(string) storage.ClientImpl
353         SeederUploadRateLimiter    *rate.Limiter
354         LeecherDownloadRateLimiter *rate.Limiter
355 }
356
357 // Creates a seeder and a leecher, and ensures the data transfers when a read
358 // is attempted on the leecher.
359 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
360         greetingTempDir, mi := testutil.GreetingTestTorrent()
361         defer os.RemoveAll(greetingTempDir)
362         // Create seeder and a Torrent.
363         cfg := TestingConfig()
364         cfg.Seed = true
365         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
366         // cfg.ListenAddr = "localhost:4000"
367         if ps.SeederStorage != nil {
368                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
369                 defer cfg.DefaultStorage.Close()
370         } else {
371                 cfg.DataDir = greetingTempDir
372         }
373         seeder, err := NewClient(cfg)
374         require.NoError(t, err)
375         if ps.ExportClientStatus {
376                 testutil.ExportStatusWriter(seeder, "s")
377         }
378         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
379         // Run a Stats right after Closing the Client. This will trigger the Stats
380         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
381         defer seederTorrent.Stats()
382         defer seeder.Close()
383         seederTorrent.VerifyData()
384         // Create leecher and a Torrent.
385         leecherDataDir, err := ioutil.TempDir("", "")
386         require.NoError(t, err)
387         defer os.RemoveAll(leecherDataDir)
388         if ps.LeecherStorage == nil {
389                 cfg.DataDir = leecherDataDir
390         } else {
391                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
392         }
393         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
394         // cfg.ListenAddr = "localhost:4001"
395         leecher, err := NewClient(cfg)
396         require.NoError(t, err)
397         defer leecher.Close()
398         if ps.ExportClientStatus {
399                 testutil.ExportStatusWriter(leecher, "l")
400         }
401         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
402                 ret = TorrentSpecFromMetaInfo(mi)
403                 ret.ChunkSize = 2
404                 return
405         }())
406         require.NoError(t, err)
407         assert.True(t, new)
408         // Now do some things with leecher and seeder.
409         addClientPeer(leecherGreeting, seeder)
410         r := leecherGreeting.NewReader()
411         defer r.Close()
412         if ps.Responsive {
413                 r.SetResponsive()
414         }
415         if ps.SetReadahead {
416                 r.SetReadahead(ps.Readahead)
417         }
418         assertReadAllGreeting(t, r)
419         // After one read through, we can assume certain torrent statistics.
420         // These are not a strict requirement. It is however interesting to
421         // follow.
422         // t.Logf("%#v", seederTorrent.Stats())
423         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
424         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
425         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
426         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
427         // Read through again for the cases where the torrent data size exceeds
428         // the size of the cache.
429         assertReadAllGreeting(t, r)
430 }
431
432 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
433         pos, err := r.Seek(0, io.SeekStart)
434         assert.NoError(t, err)
435         assert.EqualValues(t, 0, pos)
436         _greeting, err := ioutil.ReadAll(r)
437         assert.NoError(t, err)
438         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
439 }
440
441 // Check that after completing leeching, a leecher transitions to a seeding
442 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
443 func TestSeedAfterDownloading(t *testing.T) {
444         greetingTempDir, mi := testutil.GreetingTestTorrent()
445         defer os.RemoveAll(greetingTempDir)
446         cfg := TestingConfig()
447         cfg.Seed = true
448         cfg.DataDir = greetingTempDir
449         seeder, err := NewClient(cfg)
450         require.NoError(t, err)
451         defer seeder.Close()
452         testutil.ExportStatusWriter(seeder, "s")
453         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
454         seederTorrent.VerifyData()
455         cfg.DataDir, err = ioutil.TempDir("", "")
456         require.NoError(t, err)
457         defer os.RemoveAll(cfg.DataDir)
458         leecher, err := NewClient(cfg)
459         require.NoError(t, err)
460         defer leecher.Close()
461         testutil.ExportStatusWriter(leecher, "l")
462         cfg.Seed = false
463         cfg.DataDir, err = ioutil.TempDir("", "")
464         require.NoError(t, err)
465         defer os.RemoveAll(cfg.DataDir)
466         leecherLeecher, _ := NewClient(cfg)
467         defer leecherLeecher.Close()
468         testutil.ExportStatusWriter(leecherLeecher, "ll")
469         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
470                 ret = TorrentSpecFromMetaInfo(mi)
471                 ret.ChunkSize = 2
472                 return
473         }())
474         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
475                 ret = TorrentSpecFromMetaInfo(mi)
476                 ret.ChunkSize = 3
477                 return
478         }())
479         // Simultaneously DownloadAll in Leecher, and read the contents
480         // consecutively in LeecherLeecher. This non-deterministically triggered a
481         // case where the leecher wouldn't unchoke the LeecherLeecher.
482         var wg sync.WaitGroup
483         wg.Add(1)
484         go func() {
485                 defer wg.Done()
486                 r := llg.NewReader()
487                 defer r.Close()
488                 b, err := ioutil.ReadAll(r)
489                 require.NoError(t, err)
490                 assert.EqualValues(t, testutil.GreetingFileContents, b)
491         }()
492         addClientPeer(leecherGreeting, seeder)
493         addClientPeer(leecherGreeting, leecherLeecher)
494         wg.Add(1)
495         go func() {
496                 defer wg.Done()
497                 leecherGreeting.DownloadAll()
498                 leecher.WaitAll()
499         }()
500         wg.Wait()
501 }
502
503 func TestMergingTrackersByAddingSpecs(t *testing.T) {
504         cl, err := NewClient(TestingConfig())
505         require.NoError(t, err)
506         defer cl.Close()
507         spec := TorrentSpec{}
508         T, new, _ := cl.AddTorrentSpec(&spec)
509         if !new {
510                 t.FailNow()
511         }
512         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
513         _, new, _ = cl.AddTorrentSpec(&spec)
514         assert.False(t, new)
515         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
516         // Because trackers are disabled in TestingConfig.
517         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
518 }
519
520 type badStorage struct{}
521
522 var _ storage.ClientImpl = badStorage{}
523
524 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
525         return bs, nil
526 }
527
528 func (bs badStorage) Close() error {
529         return nil
530 }
531
532 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
533         return badStoragePiece{p}
534 }
535
536 type badStoragePiece struct {
537         p metainfo.Piece
538 }
539
540 var _ storage.PieceImpl = badStoragePiece{}
541
542 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
543         return 0, nil
544 }
545
546 func (p badStoragePiece) Completion() storage.Completion {
547         return storage.Completion{Complete: true, Ok: true}
548 }
549
550 func (p badStoragePiece) MarkComplete() error {
551         return errors.New("psyyyyyyyche")
552 }
553
554 func (p badStoragePiece) MarkNotComplete() error {
555         return errors.New("psyyyyyyyche")
556 }
557
558 func (p badStoragePiece) randomlyTruncatedDataString() string {
559         return "hello, world\n"[:rand.Intn(14)]
560 }
561
562 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
563         r := strings.NewReader(p.randomlyTruncatedDataString())
564         return r.ReadAt(b, off+p.p.Offset())
565 }
566
567 // We read from a piece which is marked completed, but is missing data.
568 func TestCompletedPieceWrongSize(t *testing.T) {
569         cfg := TestingConfig()
570         cfg.DefaultStorage = badStorage{}
571         cl, err := NewClient(cfg)
572         require.NoError(t, err)
573         defer cl.Close()
574         info := metainfo.Info{
575                 PieceLength: 15,
576                 Pieces:      make([]byte, 20),
577                 Files: []metainfo.FileInfo{
578                         {Path: []string{"greeting"}, Length: 13},
579                 },
580         }
581         b, err := bencode.Marshal(info)
582         require.NoError(t, err)
583         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
584                 InfoBytes: b,
585                 InfoHash:  metainfo.HashBytes(b),
586         })
587         require.NoError(t, err)
588         defer tt.Drop()
589         assert.True(t, new)
590         r := tt.NewReader()
591         defer r.Close()
592         b, err = ioutil.ReadAll(r)
593         assert.Len(t, b, 13)
594         assert.NoError(t, err)
595 }
596
597 func BenchmarkAddLargeTorrent(b *testing.B) {
598         cfg := TestingConfig()
599         cfg.DisableTCP = true
600         cfg.DisableUTP = true
601         cfg.ListenAddr = "redonk"
602         cl, err := NewClient(cfg)
603         require.NoError(b, err)
604         defer cl.Close()
605         for range iter.N(b.N) {
606                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
607                 if err != nil {
608                         b.Fatal(err)
609                 }
610                 t.Drop()
611         }
612 }
613
614 func TestResponsive(t *testing.T) {
615         seederDataDir, mi := testutil.GreetingTestTorrent()
616         defer os.RemoveAll(seederDataDir)
617         cfg := TestingConfig()
618         cfg.Seed = true
619         cfg.DataDir = seederDataDir
620         seeder, err := NewClient(cfg)
621         require.Nil(t, err)
622         defer seeder.Close()
623         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
624         seederTorrent.VerifyData()
625         leecherDataDir, err := ioutil.TempDir("", "")
626         require.Nil(t, err)
627         defer os.RemoveAll(leecherDataDir)
628         cfg = TestingConfig()
629         cfg.DataDir = leecherDataDir
630         leecher, err := NewClient(cfg)
631         require.Nil(t, err)
632         defer leecher.Close()
633         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
634                 ret = TorrentSpecFromMetaInfo(mi)
635                 ret.ChunkSize = 2
636                 return
637         }())
638         addClientPeer(leecherTorrent, seeder)
639         reader := leecherTorrent.NewReader()
640         defer reader.Close()
641         reader.SetReadahead(0)
642         reader.SetResponsive()
643         b := make([]byte, 2)
644         _, err = reader.Seek(3, io.SeekStart)
645         require.NoError(t, err)
646         _, err = io.ReadFull(reader, b)
647         assert.Nil(t, err)
648         assert.EqualValues(t, "lo", string(b))
649         _, err = reader.Seek(11, io.SeekStart)
650         require.NoError(t, err)
651         n, err := io.ReadFull(reader, b)
652         assert.Nil(t, err)
653         assert.EqualValues(t, 2, n)
654         assert.EqualValues(t, "d\n", string(b))
655 }
656
657 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
658         seederDataDir, mi := testutil.GreetingTestTorrent()
659         defer os.RemoveAll(seederDataDir)
660         cfg := TestingConfig()
661         cfg.Seed = true
662         cfg.DataDir = seederDataDir
663         seeder, err := NewClient(cfg)
664         require.Nil(t, err)
665         defer seeder.Close()
666         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
667         seederTorrent.VerifyData()
668         leecherDataDir, err := ioutil.TempDir("", "")
669         require.Nil(t, err)
670         defer os.RemoveAll(leecherDataDir)
671         cfg = TestingConfig()
672         cfg.DataDir = leecherDataDir
673         leecher, err := NewClient(cfg)
674         require.Nil(t, err)
675         defer leecher.Close()
676         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
677                 ret = TorrentSpecFromMetaInfo(mi)
678                 ret.ChunkSize = 2
679                 return
680         }())
681         addClientPeer(leecherTorrent, seeder)
682         reader := leecherTorrent.NewReader()
683         defer reader.Close()
684         reader.SetReadahead(0)
685         reader.SetResponsive()
686         b := make([]byte, 2)
687         _, err = reader.Seek(3, io.SeekStart)
688         require.NoError(t, err)
689         _, err = io.ReadFull(reader, b)
690         assert.Nil(t, err)
691         assert.EqualValues(t, "lo", string(b))
692         go leecherTorrent.Drop()
693         _, err = reader.Seek(11, io.SeekStart)
694         require.NoError(t, err)
695         n, err := reader.Read(b)
696         assert.EqualError(t, err, "torrent closed")
697         assert.EqualValues(t, 0, n)
698 }
699
700 func TestDHTInheritBlocklist(t *testing.T) {
701         ipl := iplist.New(nil)
702         require.NotNil(t, ipl)
703         cfg := TestingConfig()
704         cfg.IPBlocklist = ipl
705         cfg.NoDHT = false
706         cl, err := NewClient(cfg)
707         require.NoError(t, err)
708         defer cl.Close()
709         require.Equal(t, ipl, cl.DHT().IPBlocklist())
710 }
711
712 // Check that stuff is merged in subsequent AddTorrentSpec for the same
713 // infohash.
714 func TestAddTorrentSpecMerging(t *testing.T) {
715         cl, err := NewClient(TestingConfig())
716         require.NoError(t, err)
717         defer cl.Close()
718         dir, mi := testutil.GreetingTestTorrent()
719         defer os.RemoveAll(dir)
720         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
721                 InfoHash: mi.HashInfoBytes(),
722         })
723         require.NoError(t, err)
724         require.True(t, new)
725         require.Nil(t, tt.Info())
726         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
727         require.NoError(t, err)
728         require.False(t, new)
729         require.NotNil(t, tt.Info())
730 }
731
732 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
733         dir, mi := testutil.GreetingTestTorrent()
734         os.RemoveAll(dir)
735         cl, _ := NewClient(TestingConfig())
736         defer cl.Close()
737         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
738                 InfoHash: mi.HashInfoBytes(),
739         })
740         tt.Drop()
741         assert.EqualValues(t, 0, len(cl.Torrents()))
742         select {
743         case <-tt.GotInfo():
744                 t.FailNow()
745         default:
746         }
747 }
748
749 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
750         for i := range iter.N(info.NumPieces()) {
751                 p := info.Piece(i)
752                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
753         }
754 }
755
756 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
757         fileCacheDir, err := ioutil.TempDir("", "")
758         require.NoError(t, err)
759         defer os.RemoveAll(fileCacheDir)
760         fileCache, err := filecache.NewCache(fileCacheDir)
761         require.NoError(t, err)
762         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
763         defer os.RemoveAll(greetingDataTempDir)
764         filePieceStore := csf(fileCache)
765         defer filePieceStore.Close()
766         info, err := greetingMetainfo.UnmarshalInfo()
767         require.NoError(t, err)
768         ih := greetingMetainfo.HashInfoBytes()
769         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
770         require.NoError(t, err)
771         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
772         // require.Equal(t, len(testutil.GreetingFileContents), written)
773         // require.NoError(t, err)
774         for i := 0; i < info.NumPieces(); i++ {
775                 p := info.Piece(i)
776                 if alreadyCompleted {
777                         require.NoError(t, greetingData.Piece(p).MarkComplete())
778                 }
779         }
780         cfg := TestingConfig()
781         // TODO: Disable network option?
782         cfg.DisableTCP = true
783         cfg.DisableUTP = true
784         cfg.DefaultStorage = filePieceStore
785         cl, err := NewClient(cfg)
786         require.NoError(t, err)
787         defer cl.Close()
788         tt, err := cl.AddTorrent(greetingMetainfo)
789         require.NoError(t, err)
790         psrs := tt.PieceStateRuns()
791         assert.Len(t, psrs, 1)
792         assert.EqualValues(t, 3, psrs[0].Length)
793         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
794         if alreadyCompleted {
795                 r := tt.NewReader()
796                 b, err := ioutil.ReadAll(r)
797                 assert.NoError(t, err)
798                 assert.EqualValues(t, testutil.GreetingFileContents, b)
799         }
800 }
801
802 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
803         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
804 }
805
806 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
807         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
808 }
809
810 func TestAddMetainfoWithNodes(t *testing.T) {
811         cfg := TestingConfig()
812         cfg.ListenAddr = ":0"
813         cfg.NoDHT = false
814         // For now, we want to just jam the nodes into the table, without
815         // verifying them first. Also the DHT code doesn't support mixing secure
816         // and insecure nodes if security is enabled (yet).
817         cfg.DHTConfig.NoSecurity = true
818         cl, err := NewClient(cfg)
819         require.NoError(t, err)
820         defer cl.Close()
821         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
822         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
823         require.NoError(t, err)
824         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
825         // check if the announce-list is here instead. TODO: Add nodes.
826         assert.Len(t, tt.metainfo.AnnounceList, 5)
827         // There are 6 nodes in the torrent file.
828         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
829 }
830
831 type testDownloadCancelParams struct {
832         ExportClientStatus        bool
833         SetLeecherStorageCapacity bool
834         LeecherStorageCapacity    int64
835         Cancel                    bool
836 }
837
838 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
839         greetingTempDir, mi := testutil.GreetingTestTorrent()
840         defer os.RemoveAll(greetingTempDir)
841         cfg := TestingConfig()
842         cfg.Seed = true
843         cfg.DataDir = greetingTempDir
844         seeder, err := NewClient(cfg)
845         require.NoError(t, err)
846         defer seeder.Close()
847         if ps.ExportClientStatus {
848                 testutil.ExportStatusWriter(seeder, "s")
849         }
850         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
851         seederTorrent.VerifyData()
852         leecherDataDir, err := ioutil.TempDir("", "")
853         require.NoError(t, err)
854         defer os.RemoveAll(leecherDataDir)
855         fc, err := filecache.NewCache(leecherDataDir)
856         require.NoError(t, err)
857         if ps.SetLeecherStorageCapacity {
858                 fc.SetCapacity(ps.LeecherStorageCapacity)
859         }
860         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
861         cfg.DataDir = leecherDataDir
862         leecher, _ := NewClient(cfg)
863         defer leecher.Close()
864         if ps.ExportClientStatus {
865                 testutil.ExportStatusWriter(leecher, "l")
866         }
867         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
868                 ret = TorrentSpecFromMetaInfo(mi)
869                 ret.ChunkSize = 2
870                 return
871         }())
872         require.NoError(t, err)
873         assert.True(t, new)
874         psc := leecherGreeting.SubscribePieceStateChanges()
875         defer psc.Close()
876         leecherGreeting.DownloadAll()
877         if ps.Cancel {
878                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
879         }
880         addClientPeer(leecherGreeting, seeder)
881         completes := make(map[int]bool, 3)
882 values:
883         for {
884                 // started := time.Now()
885                 select {
886                 case _v := <-psc.Values:
887                         // log.Print(time.Since(started))
888                         v := _v.(PieceStateChange)
889                         completes[v.Index] = v.Complete
890                 case <-time.After(100 * time.Millisecond):
891                         break values
892                 }
893         }
894         if ps.Cancel {
895                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
896         } else {
897                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
898         }
899
900 }
901
902 func TestTorrentDownloadAll(t *testing.T) {
903         testDownloadCancel(t, testDownloadCancelParams{})
904 }
905
906 func TestTorrentDownloadAllThenCancel(t *testing.T) {
907         testDownloadCancel(t, testDownloadCancelParams{
908                 Cancel: true,
909         })
910 }
911
912 // Ensure that it's an error for a peer to send an invalid have message.
913 func TestPeerInvalidHave(t *testing.T) {
914         cl, err := NewClient(TestingConfig())
915         require.NoError(t, err)
916         defer cl.Close()
917         info := metainfo.Info{
918                 PieceLength: 1,
919                 Pieces:      make([]byte, 20),
920                 Files:       []metainfo.FileInfo{{Length: 1}},
921         }
922         infoBytes, err := bencode.Marshal(info)
923         require.NoError(t, err)
924         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
925                 InfoBytes: infoBytes,
926                 InfoHash:  metainfo.HashBytes(infoBytes),
927                 Storage:   badStorage{},
928         })
929         require.NoError(t, err)
930         assert.True(t, _new)
931         defer tt.Drop()
932         cn := &connection{
933                 t: tt,
934         }
935         assert.NoError(t, cn.peerSentHave(0))
936         assert.Error(t, cn.peerSentHave(1))
937 }
938
939 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
940         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
941         defer os.RemoveAll(greetingTempDir)
942         cfg := TestingConfig()
943         cfg.DataDir = greetingTempDir
944         seeder, err := NewClient(TestingConfig())
945         require.NoError(t, err)
946         seeder.AddTorrentSpec(&TorrentSpec{
947                 InfoBytes: greetingMetainfo.InfoBytes,
948         })
949 }
950
951 func TestPrepareTrackerAnnounce(t *testing.T) {
952         cl := &Client{}
953         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
954         require.NoError(t, err)
955         assert.False(t, blocked)
956         assert.EqualValues(t, "localhost:1234", host)
957         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
958 }
959
960 // Check that when the listen port is 0, all the protocols listened on have
961 // the same port, and it isn't zero.
962 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
963         cl, err := NewClient(TestingConfig())
964         require.NoError(t, err)
965         defer cl.Close()
966         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
967         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
968 }
969
970 func TestClientDynamicListenTCPOnly(t *testing.T) {
971         cfg := TestingConfig()
972         cfg.DisableUTP = true
973         cl, err := NewClient(cfg)
974         require.NoError(t, err)
975         defer cl.Close()
976         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
977         assert.Nil(t, cl.utpSock)
978 }
979
980 func TestClientDynamicListenUTPOnly(t *testing.T) {
981         cfg := TestingConfig()
982         cfg.DisableTCP = true
983         cl, err := NewClient(cfg)
984         require.NoError(t, err)
985         defer cl.Close()
986         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
987         assert.Nil(t, cl.tcpListener)
988 }
989
990 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
991         cfg := TestingConfig()
992         cfg.DisableTCP = true
993         cfg.DisableUTP = true
994         cl, err := NewClient(cfg)
995         require.NoError(t, err)
996         defer cl.Close()
997         assert.Nil(t, cl.ListenAddr())
998 }
999
1000 func addClientPeer(t *Torrent, cl *Client) {
1001         t.AddPeers([]Peer{
1002                 {
1003                         IP:   missinggo.AddrIP(cl.ListenAddr()),
1004                         Port: missinggo.AddrPort(cl.ListenAddr()),
1005                 },
1006         })
1007 }
1008
1009 func totalConns(tts []*Torrent) (ret int) {
1010         for _, tt := range tts {
1011                 tt.cl.mu.Lock()
1012                 ret += len(tt.conns)
1013                 tt.cl.mu.Unlock()
1014         }
1015         return
1016 }
1017
1018 func TestSetMaxEstablishedConn(t *testing.T) {
1019         var tts []*Torrent
1020         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1021         for i := range iter.N(3) {
1022                 cl, err := NewClient(TestingConfig())
1023                 require.NoError(t, err)
1024                 defer cl.Close()
1025                 tt, _ := cl.AddTorrentInfoHash(ih)
1026                 tt.SetMaxEstablishedConns(2)
1027                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1028                 tts = append(tts, tt)
1029         }
1030         addPeers := func() {
1031                 for i, tt := range tts {
1032                         for _, _tt := range tts[:i] {
1033                                 addClientPeer(tt, _tt.cl)
1034                         }
1035                 }
1036         }
1037         waitTotalConns := func(num int) {
1038                 for totalConns(tts) != num {
1039                         time.Sleep(time.Millisecond)
1040                 }
1041         }
1042         addPeers()
1043         waitTotalConns(6)
1044         tts[0].SetMaxEstablishedConns(1)
1045         waitTotalConns(4)
1046         tts[0].SetMaxEstablishedConns(0)
1047         waitTotalConns(2)
1048         tts[0].SetMaxEstablishedConns(1)
1049         addPeers()
1050         waitTotalConns(4)
1051         tts[0].SetMaxEstablishedConns(2)
1052         addPeers()
1053         waitTotalConns(6)
1054 }
1055
1056 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1057         os.MkdirAll(dir, 0770)
1058         file, err := os.Create(filepath.Join(dir, name))
1059         require.NoError(t, err)
1060         file.Write([]byte(name))
1061         file.Close()
1062         mi := metainfo.MetaInfo{}
1063         mi.SetDefaults()
1064         info := metainfo.Info{PieceLength: 256 * 1024}
1065         err = info.BuildFromFilePath(filepath.Join(dir, name))
1066         require.NoError(t, err)
1067         mi.InfoBytes, err = bencode.Marshal(info)
1068         require.NoError(t, err)
1069         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1070         tr, err := cl.AddTorrent(&mi)
1071         require.NoError(t, err)
1072         require.True(t, tr.Seeding())
1073         tr.VerifyData()
1074         return magnet
1075 }
1076
1077 // https://github.com/anacrolix/torrent/issues/114
1078 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1079         cfg := TestingConfig()
1080         cfg.DisableUTP = true
1081         cfg.Seed = true
1082         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1083         cfg.Debug = true
1084         cfg.ForceEncryption = true
1085         os.Mkdir(cfg.DataDir, 0755)
1086         server, err := NewClient(cfg)
1087         require.NoError(t, err)
1088         defer server.Close()
1089         testutil.ExportStatusWriter(server, "s")
1090         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1091         makeMagnet(t, server, cfg.DataDir, "test2")
1092         cfg = TestingConfig()
1093         cfg.DisableUTP = true
1094         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1095         cfg.Debug = true
1096         cfg.ForceEncryption = true
1097         client, err := NewClient(cfg)
1098         require.NoError(t, err)
1099         defer client.Close()
1100         testutil.ExportStatusWriter(client, "c")
1101         tr, err := client.AddMagnet(magnet1)
1102         require.NoError(t, err)
1103         tr.AddPeers([]Peer{{
1104                 IP:   missinggo.AddrIP(server.ListenAddr()),
1105                 Port: missinggo.AddrPort(server.ListenAddr()),
1106         }})
1107         <-tr.GotInfo()
1108         tr.DownloadAll()
1109         client.WaitAll()
1110 }
1111
1112 func TestClientAddressInUse(t *testing.T) {
1113         s, _ := NewUtpSocket("udp", ":50007")
1114         if s != nil {
1115                 defer s.Close()
1116         }
1117         cfg := TestingConfig()
1118         cfg.ListenAddr = ":50007"
1119         cl, err := NewClient(cfg)
1120         require.Error(t, err)
1121         require.Nil(t, cl)
1122 }