]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
56390b3d55af7986164451061d0e13ce954fc904
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "math/rand"
10         "net"
11         "os"
12         "path/filepath"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/bradfitz/iter"
23         "github.com/stretchr/testify/assert"
24         "github.com/stretchr/testify/require"
25         "golang.org/x/time/rate"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/internal/testutil"
29         "github.com/anacrolix/torrent/iplist"
30         "github.com/anacrolix/torrent/metainfo"
31         "github.com/anacrolix/torrent/storage"
32 )
33
34 func TestingConfig() *Config {
35         return &Config{
36                 ListenAddr:              "localhost:0",
37                 NoDHT:                   true,
38                 DataDir:                 tempDir(),
39                 DisableTrackers:         true,
40                 NoDefaultPortForwarding: true,
41                 // Debug:           true,
42         }
43 }
44
45 func TestClientDefault(t *testing.T) {
46         cl, err := NewClient(TestingConfig())
47         require.NoError(t, err)
48         cl.Close()
49 }
50
51 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
52         cfg := TestingConfig()
53         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
54         require.NoError(t, err)
55         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
56         defer ci.Close()
57         cfg.DefaultStorage = ci
58         cl, err := NewClient(cfg)
59         require.NoError(t, err)
60         cl.Close()
61         // And again, https://github.com/anacrolix/torrent/issues/158
62         cl, err = NewClient(cfg)
63         require.NoError(t, err)
64         cl.Close()
65 }
66
67 func TestAddDropTorrent(t *testing.T) {
68         cl, err := NewClient(TestingConfig())
69         require.NoError(t, err)
70         defer cl.Close()
71         dir, mi := testutil.GreetingTestTorrent()
72         defer os.RemoveAll(dir)
73         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
74         require.NoError(t, err)
75         assert.True(t, new)
76         tt.SetMaxEstablishedConns(0)
77         tt.SetMaxEstablishedConns(1)
78         tt.Drop()
79 }
80
81 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
82         // TODO?
83         t.SkipNow()
84 }
85
86 func TestAddTorrentNoUsableURLs(t *testing.T) {
87         // TODO?
88         t.SkipNow()
89 }
90
91 func TestAddPeersToUnknownTorrent(t *testing.T) {
92         // TODO?
93         t.SkipNow()
94 }
95
96 func TestPieceHashSize(t *testing.T) {
97         assert.Equal(t, 20, pieceHash.Size())
98 }
99
100 func TestTorrentInitialState(t *testing.T) {
101         dir, mi := testutil.GreetingTestTorrent()
102         defer os.RemoveAll(dir)
103         tor := &Torrent{
104                 infoHash:          mi.HashInfoBytes(),
105                 pieceStateChanges: pubsub.NewPubSub(),
106         }
107         tor.chunkSize = 2
108         tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()))
109         // Needed to lock for asynchronous piece verification.
110         tor.cl = new(Client)
111         tor.cl.mu.Lock()
112         err := tor.setInfoBytes(mi.InfoBytes)
113         tor.cl.mu.Unlock()
114         require.NoError(t, err)
115         require.Len(t, tor.pieces, 3)
116         tor.pendAllChunkSpecs(0)
117         tor.cl.mu.Lock()
118         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
119         tor.cl.mu.Unlock()
120         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
121 }
122
123 func TestUnmarshalPEXMsg(t *testing.T) {
124         var m peerExchangeMessage
125         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
126                 t.Fatal(err)
127         }
128         if len(m.Added) != 2 {
129                 t.FailNow()
130         }
131         if m.Added[0].Port != 0x506 {
132                 t.FailNow()
133         }
134 }
135
136 func TestReducedDialTimeout(t *testing.T) {
137         cfg := &Config{}
138         cfg.setDefaults()
139         for _, _case := range []struct {
140                 Max             time.Duration
141                 HalfOpenLimit   int
142                 PendingPeers    int
143                 ExpectedReduced time.Duration
144         }{
145                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
146                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
147                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
148                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
149                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
150                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
151         } {
152                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
153                 expected := _case.ExpectedReduced
154                 if expected < cfg.MinDialTimeout {
155                         expected = cfg.MinDialTimeout
156                 }
157                 if reduced != expected {
158                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
159                 }
160         }
161 }
162
163 func TestUTPRawConn(t *testing.T) {
164         l, err := NewUtpSocket("udp", "")
165         require.NoError(t, err)
166         defer l.Close()
167         go func() {
168                 for {
169                         _, err := l.Accept()
170                         if err != nil {
171                                 break
172                         }
173                 }
174         }()
175         // Connect a UTP peer to see if the RawConn will still work.
176         s, err := NewUtpSocket("udp", "")
177         require.NoError(t, err)
178         defer s.Close()
179         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
180         require.NoError(t, err)
181         defer utpPeer.Close()
182         peer, err := net.ListenPacket("udp", ":0")
183         require.NoError(t, err)
184         defer peer.Close()
185
186         msgsReceived := 0
187         // How many messages to send. I've set this to double the channel buffer
188         // size in the raw packetConn.
189         const N = 200
190         readerStopped := make(chan struct{})
191         // The reader goroutine.
192         go func() {
193                 defer close(readerStopped)
194                 b := make([]byte, 500)
195                 for i := 0; i < N; i++ {
196                         n, _, err := l.ReadFrom(b)
197                         require.NoError(t, err)
198                         msgsReceived++
199                         var d int
200                         fmt.Sscan(string(b[:n]), &d)
201                         assert.Equal(t, i, d)
202                 }
203         }()
204         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
205         require.NoError(t, err)
206         for i := 0; i < N; i++ {
207                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
208                 require.NoError(t, err)
209                 time.Sleep(time.Millisecond)
210         }
211         select {
212         case <-readerStopped:
213         case <-time.After(time.Second):
214                 t.Fatal("reader timed out")
215         }
216         if msgsReceived != N {
217                 t.Fatalf("messages received: %d", msgsReceived)
218         }
219 }
220
221 func TestTwoClientsArbitraryPorts(t *testing.T) {
222         for i := 0; i < 2; i++ {
223                 cl, err := NewClient(TestingConfig())
224                 if err != nil {
225                         t.Fatal(err)
226                 }
227                 defer cl.Close()
228         }
229 }
230
231 func TestAddDropManyTorrents(t *testing.T) {
232         cl, err := NewClient(TestingConfig())
233         require.NoError(t, err)
234         defer cl.Close()
235         for i := range iter.N(1000) {
236                 var spec TorrentSpec
237                 binary.PutVarint(spec.InfoHash[:], int64(i))
238                 tt, new, err := cl.AddTorrentSpec(&spec)
239                 assert.NoError(t, err)
240                 assert.True(t, new)
241                 defer tt.Drop()
242         }
243 }
244
245 type FileCacheClientStorageFactoryParams struct {
246         Capacity    int64
247         SetCapacity bool
248         Wrapper     func(*filecache.Cache) storage.ClientImpl
249 }
250
251 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
252         return func(dataDir string) storage.ClientImpl {
253                 fc, err := filecache.NewCache(dataDir)
254                 if err != nil {
255                         panic(err)
256                 }
257                 if ps.SetCapacity {
258                         fc.SetCapacity(ps.Capacity)
259                 }
260                 return ps.Wrapper(fc)
261         }
262 }
263
264 type storageFactory func(string) storage.ClientImpl
265
266 func TestClientTransferDefault(t *testing.T) {
267         testClientTransfer(t, testClientTransferParams{
268                 ExportClientStatus: true,
269                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
270                         Wrapper: fileCachePieceResourceStorage,
271                 }),
272         })
273 }
274
275 func TestClientTransferRateLimitedUpload(t *testing.T) {
276         started := time.Now()
277         testClientTransfer(t, testClientTransferParams{
278                 // We are uploading 13 bytes (the length of the greeting torrent). The
279                 // chunks are 2 bytes in length. Then the smallest burst we can run
280                 // with is 2. Time taken is (13-burst)/rate.
281                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
282         })
283         require.True(t, time.Since(started) > time.Second)
284 }
285
286 func TestClientTransferRateLimitedDownload(t *testing.T) {
287         testClientTransfer(t, testClientTransferParams{
288                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
289         })
290 }
291
292 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
293         return storage.NewResourcePieces(fc.AsResourceProvider())
294 }
295
296 func TestClientTransferSmallCache(t *testing.T) {
297         testClientTransfer(t, testClientTransferParams{
298                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
299                         SetCapacity: true,
300                         // Going below the piece length means it can't complete a piece so
301                         // that it can be hashed.
302                         Capacity: 5,
303                         Wrapper:  fileCachePieceResourceStorage,
304                 }),
305                 SetReadahead: true,
306                 // Can't readahead too far or the cache will thrash and drop data we
307                 // thought we had.
308                 Readahead:          0,
309                 ExportClientStatus: true,
310         })
311 }
312
313 func TestClientTransferVarious(t *testing.T) {
314         // Leecher storage
315         for _, ls := range []storageFactory{
316                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
317                         Wrapper: fileCachePieceResourceStorage,
318                 }),
319                 storage.NewBoltDB,
320         } {
321                 // Seeder storage
322                 for _, ss := range []func(string) storage.ClientImpl{
323                         storage.NewFile,
324                         storage.NewMMap,
325                 } {
326                         for _, responsive := range []bool{false, true} {
327                                 testClientTransfer(t, testClientTransferParams{
328                                         Responsive:     responsive,
329                                         SeederStorage:  ss,
330                                         LeecherStorage: ls,
331                                 })
332                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
333                                         testClientTransfer(t, testClientTransferParams{
334                                                 SeederStorage:  ss,
335                                                 Responsive:     responsive,
336                                                 SetReadahead:   true,
337                                                 Readahead:      readahead,
338                                                 LeecherStorage: ls,
339                                         })
340                                 }
341                         }
342                 }
343         }
344 }
345
346 type testClientTransferParams struct {
347         Responsive                 bool
348         Readahead                  int64
349         SetReadahead               bool
350         ExportClientStatus         bool
351         LeecherStorage             func(string) storage.ClientImpl
352         SeederStorage              func(string) storage.ClientImpl
353         SeederUploadRateLimiter    *rate.Limiter
354         LeecherDownloadRateLimiter *rate.Limiter
355 }
356
357 // Creates a seeder and a leecher, and ensures the data transfers when a read
358 // is attempted on the leecher.
359 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
360         greetingTempDir, mi := testutil.GreetingTestTorrent()
361         defer os.RemoveAll(greetingTempDir)
362         // Create seeder and a Torrent.
363         cfg := TestingConfig()
364         cfg.Seed = true
365         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
366         // cfg.ListenAddr = "localhost:4000"
367         if ps.SeederStorage != nil {
368                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
369                 defer cfg.DefaultStorage.Close()
370         } else {
371                 cfg.DataDir = greetingTempDir
372         }
373         seeder, err := NewClient(cfg)
374         require.NoError(t, err)
375         if ps.ExportClientStatus {
376                 testutil.ExportStatusWriter(seeder, "s")
377         }
378         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
379         // Run a Stats right after Closing the Client. This will trigger the Stats
380         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
381         defer seederTorrent.Stats()
382         defer seeder.Close()
383         seederTorrent.VerifyData()
384         // Create leecher and a Torrent.
385         leecherDataDir, err := ioutil.TempDir("", "")
386         require.NoError(t, err)
387         defer os.RemoveAll(leecherDataDir)
388         if ps.LeecherStorage == nil {
389                 cfg.DataDir = leecherDataDir
390         } else {
391                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
392         }
393         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
394         // cfg.ListenAddr = "localhost:4001"
395         leecher, err := NewClient(cfg)
396         require.NoError(t, err)
397         defer leecher.Close()
398         if ps.ExportClientStatus {
399                 testutil.ExportStatusWriter(leecher, "l")
400         }
401         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
402                 ret = TorrentSpecFromMetaInfo(mi)
403                 ret.ChunkSize = 2
404                 return
405         }())
406         require.NoError(t, err)
407         assert.True(t, new)
408         // Now do some things with leecher and seeder.
409         addClientPeer(leecherGreeting, seeder)
410         r := leecherGreeting.NewReader()
411         defer r.Close()
412         if ps.Responsive {
413                 r.SetResponsive()
414         }
415         if ps.SetReadahead {
416                 r.SetReadahead(ps.Readahead)
417         }
418         assertReadAllGreeting(t, r)
419         // After one read through, we can assume certain torrent statistics.
420         // These are not a strict requirement. It is however interesting to
421         // follow.
422         // t.Logf("%#v", seederTorrent.Stats())
423         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
424         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
425         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
426         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
427         // Read through again for the cases where the torrent data size exceeds
428         // the size of the cache.
429         assertReadAllGreeting(t, r)
430 }
431
432 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
433         pos, err := r.Seek(0, io.SeekStart)
434         assert.NoError(t, err)
435         assert.EqualValues(t, 0, pos)
436         _greeting, err := ioutil.ReadAll(r)
437         assert.NoError(t, err)
438         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
439 }
440
441 // Check that after completing leeching, a leecher transitions to a seeding
442 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
443 func TestSeedAfterDownloading(t *testing.T) {
444         greetingTempDir, mi := testutil.GreetingTestTorrent()
445         defer os.RemoveAll(greetingTempDir)
446         cfg := TestingConfig()
447         cfg.Seed = true
448         cfg.DataDir = greetingTempDir
449         seeder, err := NewClient(cfg)
450         require.NoError(t, err)
451         defer seeder.Close()
452         testutil.ExportStatusWriter(seeder, "s")
453         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
454         seederTorrent.VerifyData()
455         cfg.DataDir, err = ioutil.TempDir("", "")
456         require.NoError(t, err)
457         defer os.RemoveAll(cfg.DataDir)
458         leecher, err := NewClient(cfg)
459         require.NoError(t, err)
460         defer leecher.Close()
461         testutil.ExportStatusWriter(leecher, "l")
462         cfg.Seed = false
463         cfg.DataDir, err = ioutil.TempDir("", "")
464         require.NoError(t, err)
465         defer os.RemoveAll(cfg.DataDir)
466         leecherLeecher, _ := NewClient(cfg)
467         defer leecherLeecher.Close()
468         testutil.ExportStatusWriter(leecherLeecher, "ll")
469         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
470                 ret = TorrentSpecFromMetaInfo(mi)
471                 ret.ChunkSize = 2
472                 return
473         }())
474         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
475                 ret = TorrentSpecFromMetaInfo(mi)
476                 ret.ChunkSize = 3
477                 return
478         }())
479         // Simultaneously DownloadAll in Leecher, and read the contents
480         // consecutively in LeecherLeecher. This non-deterministically triggered a
481         // case where the leecher wouldn't unchoke the LeecherLeecher.
482         var wg sync.WaitGroup
483         wg.Add(1)
484         go func() {
485                 defer wg.Done()
486                 r := llg.NewReader()
487                 defer r.Close()
488                 b, err := ioutil.ReadAll(r)
489                 require.NoError(t, err)
490                 assert.EqualValues(t, testutil.GreetingFileContents, b)
491         }()
492         addClientPeer(leecherGreeting, seeder)
493         addClientPeer(leecherGreeting, leecherLeecher)
494         wg.Add(1)
495         go func() {
496                 defer wg.Done()
497                 leecherGreeting.DownloadAll()
498                 leecher.WaitAll()
499         }()
500         wg.Wait()
501 }
502
503 func TestMergingTrackersByAddingSpecs(t *testing.T) {
504         cl, err := NewClient(TestingConfig())
505         require.NoError(t, err)
506         defer cl.Close()
507         spec := TorrentSpec{}
508         T, new, _ := cl.AddTorrentSpec(&spec)
509         if !new {
510                 t.FailNow()
511         }
512         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
513         _, new, _ = cl.AddTorrentSpec(&spec)
514         assert.False(t, new)
515         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
516         // Because trackers are disabled in TestingConfig.
517         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
518 }
519
520 type badStorage struct{}
521
522 var _ storage.ClientImpl = badStorage{}
523
524 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
525         return bs, nil
526 }
527
528 func (bs badStorage) Close() error {
529         return nil
530 }
531
532 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
533         return badStoragePiece{p}
534 }
535
536 type badStoragePiece struct {
537         p metainfo.Piece
538 }
539
540 var _ storage.PieceImpl = badStoragePiece{}
541
542 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
543         return 0, nil
544 }
545
546 func (p badStoragePiece) Completion() storage.Completion {
547         return storage.Completion{Complete: true, Ok: true}
548 }
549
550 func (p badStoragePiece) MarkComplete() error {
551         return errors.New("psyyyyyyyche")
552 }
553
554 func (p badStoragePiece) MarkNotComplete() error {
555         return errors.New("psyyyyyyyche")
556 }
557
558 func (p badStoragePiece) randomlyTruncatedDataString() string {
559         return "hello, world\n"[:rand.Intn(14)]
560 }
561
562 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
563         r := strings.NewReader(p.randomlyTruncatedDataString())
564         return r.ReadAt(b, off+p.p.Offset())
565 }
566
567 // We read from a piece which is marked completed, but is missing data.
568 func TestCompletedPieceWrongSize(t *testing.T) {
569         cfg := TestingConfig()
570         cfg.DefaultStorage = badStorage{}
571         cl, err := NewClient(cfg)
572         require.NoError(t, err)
573         defer cl.Close()
574         info := metainfo.Info{
575                 PieceLength: 15,
576                 Pieces:      make([]byte, 20),
577                 Files: []metainfo.FileInfo{
578                         {Path: []string{"greeting"}, Length: 13},
579                 },
580         }
581         b, err := bencode.Marshal(info)
582         require.NoError(t, err)
583         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
584                 InfoBytes: b,
585                 InfoHash:  metainfo.HashBytes(b),
586         })
587         require.NoError(t, err)
588         defer tt.Drop()
589         assert.True(t, new)
590         r := tt.NewReader()
591         defer r.Close()
592         b, err = ioutil.ReadAll(r)
593         assert.Len(t, b, 13)
594         assert.NoError(t, err)
595 }
596
597 func BenchmarkAddLargeTorrent(b *testing.B) {
598         cfg := TestingConfig()
599         cfg.DisableTCP = true
600         cfg.DisableUTP = true
601         cfg.ListenAddr = "redonk"
602         cl, err := NewClient(cfg)
603         require.NoError(b, err)
604         defer cl.Close()
605         for range iter.N(b.N) {
606                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
607                 if err != nil {
608                         b.Fatal(err)
609                 }
610                 t.Drop()
611         }
612 }
613
614 func TestResponsive(t *testing.T) {
615         seederDataDir, mi := testutil.GreetingTestTorrent()
616         defer os.RemoveAll(seederDataDir)
617         cfg := TestingConfig()
618         cfg.Seed = true
619         cfg.DataDir = seederDataDir
620         seeder, err := NewClient(cfg)
621         require.Nil(t, err)
622         defer seeder.Close()
623         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
624         seederTorrent.VerifyData()
625         leecherDataDir, err := ioutil.TempDir("", "")
626         require.Nil(t, err)
627         defer os.RemoveAll(leecherDataDir)
628         cfg = TestingConfig()
629         cfg.DataDir = leecherDataDir
630         leecher, err := NewClient(cfg)
631         require.Nil(t, err)
632         defer leecher.Close()
633         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
634                 ret = TorrentSpecFromMetaInfo(mi)
635                 ret.ChunkSize = 2
636                 return
637         }())
638         addClientPeer(leecherTorrent, seeder)
639         reader := leecherTorrent.NewReader()
640         defer reader.Close()
641         reader.SetReadahead(0)
642         reader.SetResponsive()
643         b := make([]byte, 2)
644         _, err = reader.Seek(3, io.SeekStart)
645         require.NoError(t, err)
646         _, err = io.ReadFull(reader, b)
647         assert.Nil(t, err)
648         assert.EqualValues(t, "lo", string(b))
649         _, err = reader.Seek(11, io.SeekStart)
650         require.NoError(t, err)
651         n, err := io.ReadFull(reader, b)
652         assert.Nil(t, err)
653         assert.EqualValues(t, 2, n)
654         assert.EqualValues(t, "d\n", string(b))
655 }
656
657 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
658         seederDataDir, mi := testutil.GreetingTestTorrent()
659         defer os.RemoveAll(seederDataDir)
660         cfg := TestingConfig()
661         cfg.Seed = true
662         cfg.DataDir = seederDataDir
663         seeder, err := NewClient(cfg)
664         require.Nil(t, err)
665         defer seeder.Close()
666         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
667         seederTorrent.VerifyData()
668         leecherDataDir, err := ioutil.TempDir("", "")
669         require.Nil(t, err)
670         defer os.RemoveAll(leecherDataDir)
671         cfg = TestingConfig()
672         cfg.DataDir = leecherDataDir
673         leecher, err := NewClient(cfg)
674         require.Nil(t, err)
675         defer leecher.Close()
676         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
677                 ret = TorrentSpecFromMetaInfo(mi)
678                 ret.ChunkSize = 2
679                 return
680         }())
681         addClientPeer(leecherTorrent, seeder)
682         reader := leecherTorrent.NewReader()
683         defer reader.Close()
684         reader.SetReadahead(0)
685         reader.SetResponsive()
686         b := make([]byte, 2)
687         _, err = reader.Seek(3, io.SeekStart)
688         require.NoError(t, err)
689         _, err = io.ReadFull(reader, b)
690         assert.Nil(t, err)
691         assert.EqualValues(t, "lo", string(b))
692         go leecherTorrent.Drop()
693         _, err = reader.Seek(11, io.SeekStart)
694         require.NoError(t, err)
695         n, err := reader.Read(b)
696         assert.EqualError(t, err, "torrent closed")
697         assert.EqualValues(t, 0, n)
698 }
699
700 func TestDHTInheritBlocklist(t *testing.T) {
701         ipl := iplist.New(nil)
702         require.NotNil(t, ipl)
703         cfg := TestingConfig()
704         cfg.IPBlocklist = ipl
705         cfg.NoDHT = false
706         cl, err := NewClient(cfg)
707         require.NoError(t, err)
708         defer cl.Close()
709         require.Equal(t, ipl, cl.DHT().IPBlocklist())
710 }
711
712 // Check that stuff is merged in subsequent AddTorrentSpec for the same
713 // infohash.
714 func TestAddTorrentSpecMerging(t *testing.T) {
715         cl, err := NewClient(TestingConfig())
716         require.NoError(t, err)
717         defer cl.Close()
718         dir, mi := testutil.GreetingTestTorrent()
719         defer os.RemoveAll(dir)
720         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
721                 InfoHash: mi.HashInfoBytes(),
722         })
723         require.NoError(t, err)
724         require.True(t, new)
725         require.Nil(t, tt.Info())
726         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
727         require.NoError(t, err)
728         require.False(t, new)
729         require.NotNil(t, tt.Info())
730 }
731
732 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
733         dir, mi := testutil.GreetingTestTorrent()
734         os.RemoveAll(dir)
735         cl, _ := NewClient(TestingConfig())
736         defer cl.Close()
737         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
738                 InfoHash: mi.HashInfoBytes(),
739         })
740         tt.Drop()
741         assert.EqualValues(t, 0, len(cl.Torrents()))
742         select {
743         case <-tt.GotInfo():
744                 t.FailNow()
745         default:
746         }
747 }
748
749 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
750         for i := range iter.N(info.NumPieces()) {
751                 p := info.Piece(i)
752                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
753         }
754 }
755
756 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
757         fileCacheDir, err := ioutil.TempDir("", "")
758         require.NoError(t, err)
759         defer os.RemoveAll(fileCacheDir)
760         fileCache, err := filecache.NewCache(fileCacheDir)
761         require.NoError(t, err)
762         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
763         defer os.RemoveAll(greetingDataTempDir)
764         filePieceStore := csf(fileCache)
765         defer filePieceStore.Close()
766         info, err := greetingMetainfo.UnmarshalInfo()
767         require.NoError(t, err)
768         ih := greetingMetainfo.HashInfoBytes()
769         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
770         require.NoError(t, err)
771         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
772         // require.Equal(t, len(testutil.GreetingFileContents), written)
773         // require.NoError(t, err)
774         for i := 0; i < info.NumPieces(); i++ {
775                 p := info.Piece(i)
776                 if alreadyCompleted {
777                         require.NoError(t, greetingData.Piece(p).MarkComplete())
778                 }
779         }
780         cfg := TestingConfig()
781         // TODO: Disable network option?
782         cfg.DisableTCP = true
783         cfg.DisableUTP = true
784         cfg.DefaultStorage = filePieceStore
785         cl, err := NewClient(cfg)
786         require.NoError(t, err)
787         defer cl.Close()
788         tt, err := cl.AddTorrent(greetingMetainfo)
789         require.NoError(t, err)
790         psrs := tt.PieceStateRuns()
791         assert.Len(t, psrs, 1)
792         assert.EqualValues(t, 3, psrs[0].Length)
793         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
794         if alreadyCompleted {
795                 r := tt.NewReader()
796                 b, err := ioutil.ReadAll(r)
797                 assert.NoError(t, err)
798                 assert.EqualValues(t, testutil.GreetingFileContents, b)
799         }
800 }
801
802 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
803         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
804 }
805
806 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
807         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
808 }
809
810 func TestAddMetainfoWithNodes(t *testing.T) {
811         cfg := TestingConfig()
812         cfg.ListenAddr = ":0"
813         cfg.NoDHT = false
814         // For now, we want to just jam the nodes into the table, without
815         // verifying them first. Also the DHT code doesn't support mixing secure
816         // and insecure nodes if security is enabled (yet).
817         cfg.DHTConfig.NoSecurity = true
818         cl, err := NewClient(cfg)
819         require.NoError(t, err)
820         defer cl.Close()
821         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
822         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
823         require.NoError(t, err)
824         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
825         // check if the announce-list is here instead. TODO: Add nodes.
826         assert.Len(t, tt.metainfo.AnnounceList, 5)
827         // There are 6 nodes in the torrent file.
828         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
829 }
830
831 type testDownloadCancelParams struct {
832         ExportClientStatus        bool
833         SetLeecherStorageCapacity bool
834         LeecherStorageCapacity    int64
835         Cancel                    bool
836 }
837
838 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
839         greetingTempDir, mi := testutil.GreetingTestTorrent()
840         defer os.RemoveAll(greetingTempDir)
841         cfg := TestingConfig()
842         cfg.Seed = true
843         cfg.DataDir = greetingTempDir
844         seeder, err := NewClient(cfg)
845         require.NoError(t, err)
846         defer seeder.Close()
847         if ps.ExportClientStatus {
848                 testutil.ExportStatusWriter(seeder, "s")
849         }
850         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
851         seederTorrent.VerifyData()
852         leecherDataDir, err := ioutil.TempDir("", "")
853         require.NoError(t, err)
854         defer os.RemoveAll(leecherDataDir)
855         fc, err := filecache.NewCache(leecherDataDir)
856         require.NoError(t, err)
857         if ps.SetLeecherStorageCapacity {
858                 fc.SetCapacity(ps.LeecherStorageCapacity)
859         }
860         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
861         cfg.DataDir = leecherDataDir
862         leecher, _ := NewClient(cfg)
863         defer leecher.Close()
864         if ps.ExportClientStatus {
865                 testutil.ExportStatusWriter(leecher, "l")
866         }
867         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
868                 ret = TorrentSpecFromMetaInfo(mi)
869                 ret.ChunkSize = 2
870                 return
871         }())
872         require.NoError(t, err)
873         assert.True(t, new)
874         psc := leecherGreeting.SubscribePieceStateChanges()
875         defer psc.Close()
876
877         leecherGreeting.cl.mu.Lock()
878         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
879         if ps.Cancel {
880                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
881         }
882         leecherGreeting.cl.mu.Unlock()
883
884         addClientPeer(leecherGreeting, seeder)
885         completes := make(map[int]bool, 3)
886 values:
887         for {
888                 // started := time.Now()
889                 select {
890                 case _v := <-psc.Values:
891                         // log.Print(time.Since(started))
892                         v := _v.(PieceStateChange)
893                         completes[v.Index] = v.Complete
894                 case <-time.After(100 * time.Millisecond):
895                         break values
896                 }
897         }
898         if ps.Cancel {
899                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
900         } else {
901                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
902         }
903
904 }
905
906 func TestTorrentDownloadAll(t *testing.T) {
907         testDownloadCancel(t, testDownloadCancelParams{})
908 }
909
910 func TestTorrentDownloadAllThenCancel(t *testing.T) {
911         testDownloadCancel(t, testDownloadCancelParams{
912                 Cancel: true,
913         })
914 }
915
916 // Ensure that it's an error for a peer to send an invalid have message.
917 func TestPeerInvalidHave(t *testing.T) {
918         cl, err := NewClient(TestingConfig())
919         require.NoError(t, err)
920         defer cl.Close()
921         info := metainfo.Info{
922                 PieceLength: 1,
923                 Pieces:      make([]byte, 20),
924                 Files:       []metainfo.FileInfo{{Length: 1}},
925         }
926         infoBytes, err := bencode.Marshal(info)
927         require.NoError(t, err)
928         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
929                 InfoBytes: infoBytes,
930                 InfoHash:  metainfo.HashBytes(infoBytes),
931                 Storage:   badStorage{},
932         })
933         require.NoError(t, err)
934         assert.True(t, _new)
935         defer tt.Drop()
936         cn := &connection{
937                 t: tt,
938         }
939         assert.NoError(t, cn.peerSentHave(0))
940         assert.Error(t, cn.peerSentHave(1))
941 }
942
943 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
944         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
945         defer os.RemoveAll(greetingTempDir)
946         cfg := TestingConfig()
947         cfg.DataDir = greetingTempDir
948         seeder, err := NewClient(TestingConfig())
949         require.NoError(t, err)
950         seeder.AddTorrentSpec(&TorrentSpec{
951                 InfoBytes: greetingMetainfo.InfoBytes,
952         })
953 }
954
955 func TestPrepareTrackerAnnounce(t *testing.T) {
956         cl := &Client{}
957         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
958         require.NoError(t, err)
959         assert.False(t, blocked)
960         assert.EqualValues(t, "localhost:1234", host)
961         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
962 }
963
964 // Check that when the listen port is 0, all the protocols listened on have
965 // the same port, and it isn't zero.
966 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
967         cl, err := NewClient(TestingConfig())
968         require.NoError(t, err)
969         defer cl.Close()
970         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
971         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
972 }
973
974 func TestClientDynamicListenTCPOnly(t *testing.T) {
975         cfg := TestingConfig()
976         cfg.DisableUTP = true
977         cl, err := NewClient(cfg)
978         require.NoError(t, err)
979         defer cl.Close()
980         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
981         assert.Nil(t, cl.utpSock)
982 }
983
984 func TestClientDynamicListenUTPOnly(t *testing.T) {
985         cfg := TestingConfig()
986         cfg.DisableTCP = true
987         cl, err := NewClient(cfg)
988         require.NoError(t, err)
989         defer cl.Close()
990         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
991         assert.Nil(t, cl.tcpListener)
992 }
993
994 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
995         cfg := TestingConfig()
996         cfg.DisableTCP = true
997         cfg.DisableUTP = true
998         cl, err := NewClient(cfg)
999         require.NoError(t, err)
1000         defer cl.Close()
1001         assert.Nil(t, cl.ListenAddr())
1002 }
1003
1004 func addClientPeer(t *Torrent, cl *Client) {
1005         t.AddPeers([]Peer{
1006                 {
1007                         IP:   missinggo.AddrIP(cl.ListenAddr()),
1008                         Port: missinggo.AddrPort(cl.ListenAddr()),
1009                 },
1010         })
1011 }
1012
1013 func totalConns(tts []*Torrent) (ret int) {
1014         for _, tt := range tts {
1015                 tt.cl.mu.Lock()
1016                 ret += len(tt.conns)
1017                 tt.cl.mu.Unlock()
1018         }
1019         return
1020 }
1021
1022 func TestSetMaxEstablishedConn(t *testing.T) {
1023         var tts []*Torrent
1024         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1025         for i := range iter.N(3) {
1026                 cl, err := NewClient(TestingConfig())
1027                 require.NoError(t, err)
1028                 defer cl.Close()
1029                 tt, _ := cl.AddTorrentInfoHash(ih)
1030                 tt.SetMaxEstablishedConns(2)
1031                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1032                 tts = append(tts, tt)
1033         }
1034         addPeers := func() {
1035                 for i, tt := range tts {
1036                         for _, _tt := range tts[:i] {
1037                                 addClientPeer(tt, _tt.cl)
1038                         }
1039                 }
1040         }
1041         waitTotalConns := func(num int) {
1042                 for totalConns(tts) != num {
1043                         time.Sleep(time.Millisecond)
1044                 }
1045         }
1046         addPeers()
1047         waitTotalConns(6)
1048         tts[0].SetMaxEstablishedConns(1)
1049         waitTotalConns(4)
1050         tts[0].SetMaxEstablishedConns(0)
1051         waitTotalConns(2)
1052         tts[0].SetMaxEstablishedConns(1)
1053         addPeers()
1054         waitTotalConns(4)
1055         tts[0].SetMaxEstablishedConns(2)
1056         addPeers()
1057         waitTotalConns(6)
1058 }
1059
1060 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1061         os.MkdirAll(dir, 0770)
1062         file, err := os.Create(filepath.Join(dir, name))
1063         require.NoError(t, err)
1064         file.Write([]byte(name))
1065         file.Close()
1066         mi := metainfo.MetaInfo{}
1067         mi.SetDefaults()
1068         info := metainfo.Info{PieceLength: 256 * 1024}
1069         err = info.BuildFromFilePath(filepath.Join(dir, name))
1070         require.NoError(t, err)
1071         mi.InfoBytes, err = bencode.Marshal(info)
1072         require.NoError(t, err)
1073         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1074         tr, err := cl.AddTorrent(&mi)
1075         require.NoError(t, err)
1076         require.True(t, tr.Seeding())
1077         tr.VerifyData()
1078         return magnet
1079 }
1080
1081 // https://github.com/anacrolix/torrent/issues/114
1082 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1083         cfg := TestingConfig()
1084         cfg.DisableUTP = true
1085         cfg.Seed = true
1086         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1087         cfg.Debug = true
1088         cfg.ForceEncryption = true
1089         os.Mkdir(cfg.DataDir, 0755)
1090         server, err := NewClient(cfg)
1091         require.NoError(t, err)
1092         defer server.Close()
1093         testutil.ExportStatusWriter(server, "s")
1094         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1095         makeMagnet(t, server, cfg.DataDir, "test2")
1096         cfg = TestingConfig()
1097         cfg.DisableUTP = true
1098         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1099         cfg.Debug = true
1100         cfg.ForceEncryption = true
1101         client, err := NewClient(cfg)
1102         require.NoError(t, err)
1103         defer client.Close()
1104         testutil.ExportStatusWriter(client, "c")
1105         tr, err := client.AddMagnet(magnet1)
1106         require.NoError(t, err)
1107         tr.AddPeers([]Peer{{
1108                 IP:   missinggo.AddrIP(server.ListenAddr()),
1109                 Port: missinggo.AddrPort(server.ListenAddr()),
1110         }})
1111         <-tr.GotInfo()
1112         tr.DownloadAll()
1113         client.WaitAll()
1114 }
1115
1116 func TestClientAddressInUse(t *testing.T) {
1117         s, _ := NewUtpSocket("udp", ":50007")
1118         if s != nil {
1119                 defer s.Close()
1120         }
1121         cfg := TestingConfig()
1122         cfg.ListenAddr = ":50007"
1123         cl, err := NewClient(cfg)
1124         require.Error(t, err)
1125         require.Nil(t, cl)
1126 }