]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Changes for dht-cleanup
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "math/rand"
11         "net"
12         "os"
13         "path/filepath"
14         "strings"
15         "sync"
16         "testing"
17         "time"
18
19         _ "github.com/anacrolix/envpprof"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/filecache"
22         "github.com/anacrolix/missinggo/pubsub"
23         "github.com/bradfitz/iter"
24         "github.com/stretchr/testify/assert"
25         "github.com/stretchr/testify/require"
26         "golang.org/x/time/rate"
27
28         "github.com/anacrolix/torrent/bencode"
29         "github.com/anacrolix/torrent/internal/testutil"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32         "github.com/anacrolix/torrent/storage"
33 )
34
35 func init() {
36         log.SetFlags(log.LstdFlags | log.Llongfile)
37 }
38
39 func TestingConfig() *Config {
40         return &Config{
41                 ListenAddr:      "localhost:0",
42                 NoDHT:           true,
43                 DisableTrackers: true,
44                 DataDir: func() string {
45                         ret, err := ioutil.TempDir("", "")
46                         if err != nil {
47                                 panic(err)
48                         }
49                         return ret
50                 }(),
51                 Debug: true,
52         }
53 }
54
55 func TestClientDefault(t *testing.T) {
56         cl, err := NewClient(TestingConfig())
57         require.NoError(t, err)
58         cl.Close()
59 }
60
61 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
62         cfg := TestingConfig()
63         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
64         require.NoError(t, err)
65         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
66         defer ci.Close()
67         cfg.DefaultStorage = ci
68         cl, err := NewClient(cfg)
69         require.NoError(t, err)
70         cl.Close()
71         // And again, https://github.com/anacrolix/torrent/issues/158
72         cl, err = NewClient(cfg)
73         require.NoError(t, err)
74         cl.Close()
75 }
76
77 func TestAddDropTorrent(t *testing.T) {
78         cl, err := NewClient(TestingConfig())
79         require.NoError(t, err)
80         defer cl.Close()
81         dir, mi := testutil.GreetingTestTorrent()
82         defer os.RemoveAll(dir)
83         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
84         require.NoError(t, err)
85         assert.True(t, new)
86         tt.SetMaxEstablishedConns(0)
87         tt.SetMaxEstablishedConns(1)
88         tt.Drop()
89 }
90
91 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
92         t.SkipNow()
93 }
94
95 func TestAddTorrentNoUsableURLs(t *testing.T) {
96         t.SkipNow()
97 }
98
99 func TestAddPeersToUnknownTorrent(t *testing.T) {
100         t.SkipNow()
101 }
102
103 func TestPieceHashSize(t *testing.T) {
104         if pieceHash.Size() != 20 {
105                 t.FailNow()
106         }
107 }
108
109 func TestTorrentInitialState(t *testing.T) {
110         dir, mi := testutil.GreetingTestTorrent()
111         defer os.RemoveAll(dir)
112         tor := &Torrent{
113                 infoHash:          mi.HashInfoBytes(),
114                 pieceStateChanges: pubsub.NewPubSub(),
115         }
116         tor.chunkSize = 2
117         tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion("/dev/null", storage.NewMapPieceCompletion()))
118         // Needed to lock for asynchronous piece verification.
119         tor.cl = new(Client)
120         err := tor.setInfoBytes(mi.InfoBytes)
121         require.NoError(t, err)
122         require.Len(t, tor.pieces, 3)
123         tor.pendAllChunkSpecs(0)
124         tor.cl.mu.Lock()
125         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
126         tor.cl.mu.Unlock()
127         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
128 }
129
130 func TestUnmarshalPEXMsg(t *testing.T) {
131         var m peerExchangeMessage
132         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
133                 t.Fatal(err)
134         }
135         if len(m.Added) != 2 {
136                 t.FailNow()
137         }
138         if m.Added[0].Port != 0x506 {
139                 t.FailNow()
140         }
141 }
142
143 func TestReducedDialTimeout(t *testing.T) {
144         for _, _case := range []struct {
145                 Max             time.Duration
146                 HalfOpenLimit   int
147                 PendingPeers    int
148                 ExpectedReduced time.Duration
149         }{
150                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
151                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
152                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
153                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
154                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
155                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
156         } {
157                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
158                 expected := _case.ExpectedReduced
159                 if expected < minDialTimeout {
160                         expected = minDialTimeout
161                 }
162                 if reduced != expected {
163                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
164                 }
165         }
166 }
167
168 func TestUTPRawConn(t *testing.T) {
169         l, err := NewUtpSocket("udp", "")
170         if err != nil {
171                 t.Fatal(err)
172         }
173         defer l.Close()
174         go func() {
175                 for {
176                         _, err := l.Accept()
177                         if err != nil {
178                                 break
179                         }
180                 }
181         }()
182         // Connect a UTP peer to see if the RawConn will still work.
183         s, _ := NewUtpSocket("udp", "")
184         defer s.Close()
185         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
186         if err != nil {
187                 t.Fatalf("error dialing utp listener: %s", err)
188         }
189         defer utpPeer.Close()
190         peer, err := net.ListenPacket("udp", ":0")
191         if err != nil {
192                 t.Fatal(err)
193         }
194         defer peer.Close()
195
196         msgsReceived := 0
197         // How many messages to send. I've set this to double the channel buffer
198         // size in the raw packetConn.
199         const N = 200
200         readerStopped := make(chan struct{})
201         // The reader goroutine.
202         go func() {
203                 defer close(readerStopped)
204                 b := make([]byte, 500)
205                 for i := 0; i < N; i++ {
206                         n, _, err := l.ReadFrom(b)
207                         if err != nil {
208                                 t.Fatalf("error reading from raw conn: %s", err)
209                         }
210                         msgsReceived++
211                         var d int
212                         fmt.Sscan(string(b[:n]), &d)
213                         if d != i {
214                                 log.Printf("got wrong number: expected %d, got %d", i, d)
215                         }
216                 }
217         }()
218         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
219         if err != nil {
220                 t.Fatal(err)
221         }
222         for i := 0; i < N; i++ {
223                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
224                 if err != nil {
225                         t.Fatal(err)
226                 }
227                 time.Sleep(time.Microsecond)
228         }
229         select {
230         case <-readerStopped:
231         case <-time.After(time.Second):
232                 t.Fatal("reader timed out")
233         }
234         if msgsReceived != N {
235                 t.Fatalf("messages received: %d", msgsReceived)
236         }
237 }
238
239 func TestTwoClientsArbitraryPorts(t *testing.T) {
240         for i := 0; i < 2; i++ {
241                 cl, err := NewClient(TestingConfig())
242                 if err != nil {
243                         t.Fatal(err)
244                 }
245                 defer cl.Close()
246         }
247 }
248
249 func TestAddDropManyTorrents(t *testing.T) {
250         cl, err := NewClient(TestingConfig())
251         require.NoError(t, err)
252         defer cl.Close()
253         for i := range iter.N(1000) {
254                 var spec TorrentSpec
255                 binary.PutVarint(spec.InfoHash[:], int64(i))
256                 tt, new, err := cl.AddTorrentSpec(&spec)
257                 assert.NoError(t, err)
258                 assert.True(t, new)
259                 defer tt.Drop()
260         }
261 }
262
263 type FileCacheClientStorageFactoryParams struct {
264         Capacity    int64
265         SetCapacity bool
266         Wrapper     func(*filecache.Cache) storage.ClientImpl
267 }
268
269 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
270         return func(dataDir string) storage.ClientImpl {
271                 fc, err := filecache.NewCache(dataDir)
272                 if err != nil {
273                         panic(err)
274                 }
275                 if ps.SetCapacity {
276                         fc.SetCapacity(ps.Capacity)
277                 }
278                 return ps.Wrapper(fc)
279         }
280 }
281
282 type storageFactory func(string) storage.ClientImpl
283
284 func TestClientTransferDefault(t *testing.T) {
285         testClientTransfer(t, testClientTransferParams{
286                 ExportClientStatus: true,
287                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
288                         Wrapper: fileCachePieceResourceStorage,
289                 }),
290         })
291 }
292
293 func TestClientTransferRateLimitedUpload(t *testing.T) {
294         started := time.Now()
295         testClientTransfer(t, testClientTransferParams{
296                 // We are uploading 13 bytes (the length of the greeting torrent). The
297                 // chunks are 2 bytes in length. Then the smallest burst we can run
298                 // with is 2. Time taken is (13-burst)/rate.
299                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
300         })
301         require.True(t, time.Since(started) > time.Second)
302 }
303
304 func TestClientTransferRateLimitedDownload(t *testing.T) {
305         testClientTransfer(t, testClientTransferParams{
306                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
307         })
308 }
309
310 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
311         return storage.NewResourcePieces(fc.AsResourceProvider())
312 }
313
314 func TestClientTransferSmallCache(t *testing.T) {
315         testClientTransfer(t, testClientTransferParams{
316                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
317                         SetCapacity: true,
318                         // Going below the piece length means it can't complete a piece so
319                         // that it can be hashed.
320                         Capacity: 5,
321                         Wrapper:  fileCachePieceResourceStorage,
322                 }),
323                 SetReadahead: true,
324                 // Can't readahead too far or the cache will thrash and drop data we
325                 // thought we had.
326                 Readahead:          0,
327                 ExportClientStatus: true,
328         })
329 }
330
331 func TestClientTransferVarious(t *testing.T) {
332         // Leecher storage
333         for _, ls := range []storageFactory{
334                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
335                         Wrapper: fileCachePieceResourceStorage,
336                 }),
337                 storage.NewBoltDB,
338         } {
339                 // Seeder storage
340                 for _, ss := range []func(string) storage.ClientImpl{
341                         storage.NewFile,
342                         storage.NewMMap,
343                 } {
344                         for _, responsive := range []bool{false, true} {
345                                 testClientTransfer(t, testClientTransferParams{
346                                         Responsive:     responsive,
347                                         SeederStorage:  ss,
348                                         LeecherStorage: ls,
349                                 })
350                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
351                                         testClientTransfer(t, testClientTransferParams{
352                                                 SeederStorage:  ss,
353                                                 Responsive:     responsive,
354                                                 SetReadahead:   true,
355                                                 Readahead:      readahead,
356                                                 LeecherStorage: ls,
357                                         })
358                                 }
359                         }
360                 }
361         }
362 }
363
364 type testClientTransferParams struct {
365         Responsive                 bool
366         Readahead                  int64
367         SetReadahead               bool
368         ExportClientStatus         bool
369         LeecherStorage             func(string) storage.ClientImpl
370         SeederStorage              func(string) storage.ClientImpl
371         SeederUploadRateLimiter    *rate.Limiter
372         LeecherDownloadRateLimiter *rate.Limiter
373 }
374
375 // Creates a seeder and a leecher, and ensures the data transfers when a read
376 // is attempted on the leecher.
377 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
378         greetingTempDir, mi := testutil.GreetingTestTorrent()
379         defer os.RemoveAll(greetingTempDir)
380         // Create seeder and a Torrent.
381         cfg := TestingConfig()
382         cfg.Seed = true
383         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
384         // cfg.ListenAddr = "localhost:4000"
385         if ps.SeederStorage != nil {
386                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
387                 defer cfg.DefaultStorage.Close()
388         } else {
389                 cfg.DataDir = greetingTempDir
390         }
391         seeder, err := NewClient(cfg)
392         require.NoError(t, err)
393         defer seeder.Close()
394         if ps.ExportClientStatus {
395                 testutil.ExportStatusWriter(seeder, "s")
396         }
397         // seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
398         _, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
399         require.NoError(t, err)
400         assert.True(t, new)
401         // Create leecher and a Torrent.
402         leecherDataDir, err := ioutil.TempDir("", "")
403         require.NoError(t, err)
404         defer os.RemoveAll(leecherDataDir)
405         if ps.LeecherStorage == nil {
406                 cfg.DataDir = leecherDataDir
407         } else {
408                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
409         }
410         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
411         // cfg.ListenAddr = "localhost:4001"
412         leecher, err := NewClient(cfg)
413         require.NoError(t, err)
414         defer leecher.Close()
415         if ps.ExportClientStatus {
416                 testutil.ExportStatusWriter(leecher, "l")
417         }
418         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
419                 ret = TorrentSpecFromMetaInfo(mi)
420                 ret.ChunkSize = 2
421                 return
422         }())
423         require.NoError(t, err)
424         assert.True(t, new)
425         // Now do some things with leecher and seeder.
426         addClientPeer(leecherGreeting, seeder)
427         r := leecherGreeting.NewReader()
428         defer r.Close()
429         if ps.Responsive {
430                 r.SetResponsive()
431         }
432         if ps.SetReadahead {
433                 r.SetReadahead(ps.Readahead)
434         }
435         assertReadAllGreeting(t, r)
436         // After one read through, we can assume certain torrent statistics.
437         // These are not a strict requirement. It is however interesting to
438         // follow.
439         // t.Logf("%#v", seederTorrent.Stats())
440         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
441         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
442         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
443         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
444         // Read through again for the cases where the torrent data size exceeds
445         // the size of the cache.
446         assertReadAllGreeting(t, r)
447 }
448
449 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
450         pos, err := r.Seek(0, os.SEEK_SET)
451         assert.NoError(t, err)
452         assert.EqualValues(t, 0, pos)
453         _greeting, err := ioutil.ReadAll(r)
454         assert.NoError(t, err)
455         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
456 }
457
458 // Check that after completing leeching, a leecher transitions to a seeding
459 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
460 func TestSeedAfterDownloading(t *testing.T) {
461         greetingTempDir, mi := testutil.GreetingTestTorrent()
462         defer os.RemoveAll(greetingTempDir)
463         cfg := TestingConfig()
464         cfg.Seed = true
465         cfg.DataDir = greetingTempDir
466         seeder, err := NewClient(cfg)
467         require.NoError(t, err)
468         defer seeder.Close()
469         testutil.ExportStatusWriter(seeder, "s")
470         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
471         cfg.DataDir, err = ioutil.TempDir("", "")
472         require.NoError(t, err)
473         defer os.RemoveAll(cfg.DataDir)
474         leecher, err := NewClient(cfg)
475         require.NoError(t, err)
476         defer leecher.Close()
477         testutil.ExportStatusWriter(leecher, "l")
478         cfg.Seed = false
479         // cfg.TorrentDataOpener = nil
480         cfg.DataDir, err = ioutil.TempDir("", "")
481         require.NoError(t, err)
482         defer os.RemoveAll(cfg.DataDir)
483         leecherLeecher, _ := NewClient(cfg)
484         defer leecherLeecher.Close()
485         testutil.ExportStatusWriter(leecherLeecher, "ll")
486         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
487                 ret = TorrentSpecFromMetaInfo(mi)
488                 ret.ChunkSize = 2
489                 return
490         }())
491         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
492                 ret = TorrentSpecFromMetaInfo(mi)
493                 ret.ChunkSize = 3
494                 return
495         }())
496         // Simultaneously DownloadAll in Leecher, and read the contents
497         // consecutively in LeecherLeecher. This non-deterministically triggered a
498         // case where the leecher wouldn't unchoke the LeecherLeecher.
499         var wg sync.WaitGroup
500         wg.Add(1)
501         go func() {
502                 defer wg.Done()
503                 r := llg.NewReader()
504                 defer r.Close()
505                 b, err := ioutil.ReadAll(r)
506                 require.NoError(t, err)
507                 assert.EqualValues(t, testutil.GreetingFileContents, b)
508         }()
509         addClientPeer(leecherGreeting, seeder)
510         addClientPeer(leecherGreeting, leecherLeecher)
511         wg.Add(1)
512         go func() {
513                 defer wg.Done()
514                 leecherGreeting.DownloadAll()
515                 leecher.WaitAll()
516         }()
517         wg.Wait()
518 }
519
520 func TestMergingTrackersByAddingSpecs(t *testing.T) {
521         cl, err := NewClient(TestingConfig())
522         require.NoError(t, err)
523         defer cl.Close()
524         spec := TorrentSpec{}
525         T, new, _ := cl.AddTorrentSpec(&spec)
526         if !new {
527                 t.FailNow()
528         }
529         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
530         _, new, _ = cl.AddTorrentSpec(&spec)
531         assert.False(t, new)
532         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
533         // Because trackers are disabled in TestingConfig.
534         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
535 }
536
537 type badStorage struct{}
538
539 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
540         return bs, nil
541 }
542
543 func (bs badStorage) Close() error {
544         return nil
545 }
546
547 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
548         return badStoragePiece{p}
549 }
550
551 type badStoragePiece struct {
552         p metainfo.Piece
553 }
554
555 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
556         return 0, nil
557 }
558
559 func (p badStoragePiece) GetIsComplete() bool {
560         return true
561 }
562
563 func (p badStoragePiece) MarkComplete() error {
564         return errors.New("psyyyyyyyche")
565 }
566
567 func (p badStoragePiece) MarkNotComplete() error {
568         return errors.New("psyyyyyyyche")
569 }
570
571 func (p badStoragePiece) randomlyTruncatedDataString() string {
572         return "hello, world\n"[:rand.Intn(14)]
573 }
574
575 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
576         r := strings.NewReader(p.randomlyTruncatedDataString())
577         return r.ReadAt(b, off+p.p.Offset())
578 }
579
580 // We read from a piece which is marked completed, but is missing data.
581 func TestCompletedPieceWrongSize(t *testing.T) {
582         cfg := TestingConfig()
583         cfg.DefaultStorage = badStorage{}
584         cl, err := NewClient(cfg)
585         require.NoError(t, err)
586         defer cl.Close()
587         info := metainfo.Info{
588                 PieceLength: 15,
589                 Pieces:      make([]byte, 20),
590                 Files: []metainfo.FileInfo{
591                         {Path: []string{"greeting"}, Length: 13},
592                 },
593         }
594         b, err := bencode.Marshal(info)
595         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
596                 InfoBytes: b,
597                 InfoHash:  metainfo.HashBytes(b),
598         })
599         require.NoError(t, err)
600         defer tt.Drop()
601         assert.True(t, new)
602         r := tt.NewReader()
603         defer r.Close()
604         b, err = ioutil.ReadAll(r)
605         assert.Len(t, b, 13)
606         assert.NoError(t, err)
607 }
608
609 func BenchmarkAddLargeTorrent(b *testing.B) {
610         cfg := TestingConfig()
611         cfg.DisableTCP = true
612         cfg.DisableUTP = true
613         cfg.ListenAddr = "redonk"
614         cl, err := NewClient(cfg)
615         require.NoError(b, err)
616         defer cl.Close()
617         for range iter.N(b.N) {
618                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
619                 if err != nil {
620                         b.Fatal(err)
621                 }
622                 t.Drop()
623         }
624 }
625
626 func TestResponsive(t *testing.T) {
627         seederDataDir, mi := testutil.GreetingTestTorrent()
628         defer os.RemoveAll(seederDataDir)
629         cfg := TestingConfig()
630         cfg.Seed = true
631         cfg.DataDir = seederDataDir
632         seeder, err := NewClient(cfg)
633         require.Nil(t, err)
634         defer seeder.Close()
635         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
636         leecherDataDir, err := ioutil.TempDir("", "")
637         require.Nil(t, err)
638         defer os.RemoveAll(leecherDataDir)
639         cfg = TestingConfig()
640         cfg.DataDir = leecherDataDir
641         leecher, err := NewClient(cfg)
642         require.Nil(t, err)
643         defer leecher.Close()
644         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
645                 ret = TorrentSpecFromMetaInfo(mi)
646                 ret.ChunkSize = 2
647                 return
648         }())
649         addClientPeer(leecherTorrent, seeder)
650         reader := leecherTorrent.NewReader()
651         defer reader.Close()
652         reader.SetReadahead(0)
653         reader.SetResponsive()
654         b := make([]byte, 2)
655         _, err = reader.Seek(3, os.SEEK_SET)
656         require.NoError(t, err)
657         _, err = io.ReadFull(reader, b)
658         assert.Nil(t, err)
659         assert.EqualValues(t, "lo", string(b))
660         _, err = reader.Seek(11, os.SEEK_SET)
661         require.NoError(t, err)
662         n, err := io.ReadFull(reader, b)
663         assert.Nil(t, err)
664         assert.EqualValues(t, 2, n)
665         assert.EqualValues(t, "d\n", string(b))
666 }
667
668 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
669         seederDataDir, mi := testutil.GreetingTestTorrent()
670         defer os.RemoveAll(seederDataDir)
671         cfg := TestingConfig()
672         cfg.Seed = true
673         cfg.DataDir = seederDataDir
674         seeder, err := NewClient(cfg)
675         require.Nil(t, err)
676         defer seeder.Close()
677         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
678         leecherDataDir, err := ioutil.TempDir("", "")
679         require.Nil(t, err)
680         defer os.RemoveAll(leecherDataDir)
681         cfg = TestingConfig()
682         cfg.DataDir = leecherDataDir
683         leecher, err := NewClient(cfg)
684         require.Nil(t, err)
685         defer leecher.Close()
686         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
687                 ret = TorrentSpecFromMetaInfo(mi)
688                 ret.ChunkSize = 2
689                 return
690         }())
691         addClientPeer(leecherTorrent, seeder)
692         reader := leecherTorrent.NewReader()
693         defer reader.Close()
694         reader.SetReadahead(0)
695         reader.SetResponsive()
696         b := make([]byte, 2)
697         _, err = reader.Seek(3, os.SEEK_SET)
698         require.NoError(t, err)
699         _, err = io.ReadFull(reader, b)
700         assert.Nil(t, err)
701         assert.EqualValues(t, "lo", string(b))
702         go leecherTorrent.Drop()
703         _, err = reader.Seek(11, os.SEEK_SET)
704         require.NoError(t, err)
705         n, err := reader.Read(b)
706         assert.EqualError(t, err, "torrent closed")
707         assert.EqualValues(t, 0, n)
708 }
709
710 func TestDHTInheritBlocklist(t *testing.T) {
711         ipl := iplist.New(nil)
712         require.NotNil(t, ipl)
713         cfg := TestingConfig()
714         cfg.IPBlocklist = ipl
715         cfg.NoDHT = false
716         cl, err := NewClient(cfg)
717         require.NoError(t, err)
718         defer cl.Close()
719         require.Equal(t, ipl, cl.DHT().IPBlocklist())
720 }
721
722 // Check that stuff is merged in subsequent AddTorrentSpec for the same
723 // infohash.
724 func TestAddTorrentSpecMerging(t *testing.T) {
725         cl, err := NewClient(TestingConfig())
726         require.NoError(t, err)
727         defer cl.Close()
728         dir, mi := testutil.GreetingTestTorrent()
729         defer os.RemoveAll(dir)
730         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
731                 InfoHash: mi.HashInfoBytes(),
732         })
733         require.NoError(t, err)
734         require.True(t, new)
735         require.Nil(t, tt.Info())
736         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
737         require.NoError(t, err)
738         require.False(t, new)
739         require.NotNil(t, tt.Info())
740 }
741
742 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
743         dir, mi := testutil.GreetingTestTorrent()
744         os.RemoveAll(dir)
745         cl, _ := NewClient(TestingConfig())
746         defer cl.Close()
747         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
748                 InfoHash: mi.HashInfoBytes(),
749         })
750         tt.Drop()
751         assert.EqualValues(t, 0, len(cl.Torrents()))
752         select {
753         case <-tt.GotInfo():
754                 t.FailNow()
755         default:
756         }
757 }
758
759 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
760         for i := range iter.N(info.NumPieces()) {
761                 p := info.Piece(i)
762                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
763         }
764 }
765
766 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
767         fileCacheDir, err := ioutil.TempDir("", "")
768         require.NoError(t, err)
769         defer os.RemoveAll(fileCacheDir)
770         fileCache, err := filecache.NewCache(fileCacheDir)
771         require.NoError(t, err)
772         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
773         defer os.RemoveAll(greetingDataTempDir)
774         filePieceStore := csf(fileCache)
775         info, err := greetingMetainfo.UnmarshalInfo()
776         require.NoError(t, err)
777         ih := greetingMetainfo.HashInfoBytes()
778         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
779         require.NoError(t, err)
780         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
781         // require.Equal(t, len(testutil.GreetingFileContents), written)
782         // require.NoError(t, err)
783         for i := 0; i < info.NumPieces(); i++ {
784                 p := info.Piece(i)
785                 if alreadyCompleted {
786                         err := greetingData.Piece(p).MarkComplete()
787                         assert.NoError(t, err)
788                 }
789         }
790         cfg := TestingConfig()
791         // TODO: Disable network option?
792         cfg.DisableTCP = true
793         cfg.DisableUTP = true
794         cfg.DefaultStorage = filePieceStore
795         cl, err := NewClient(cfg)
796         require.NoError(t, err)
797         defer cl.Close()
798         tt, err := cl.AddTorrent(greetingMetainfo)
799         require.NoError(t, err)
800         psrs := tt.PieceStateRuns()
801         assert.Len(t, psrs, 1)
802         assert.EqualValues(t, 3, psrs[0].Length)
803         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
804         if alreadyCompleted {
805                 r := tt.NewReader()
806                 b, err := ioutil.ReadAll(r)
807                 assert.NoError(t, err)
808                 assert.EqualValues(t, testutil.GreetingFileContents, b)
809         }
810 }
811
812 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
813         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
814 }
815
816 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
817         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
818 }
819
820 func TestAddMetainfoWithNodes(t *testing.T) {
821         cfg := TestingConfig()
822         cfg.NoDHT = false
823         // For now, we want to just jam the nodes into the table, without
824         // verifying them first. Also the DHT code doesn't support mixing secure
825         // and insecure nodes if security is enabled (yet).
826         cfg.DHTConfig.NoSecurity = true
827         cl, err := NewClient(cfg)
828         require.NoError(t, err)
829         defer cl.Close()
830         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
831         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
832         require.NoError(t, err)
833         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
834         // check if the announce-list is here instead. TODO: Add nodes.
835         assert.Len(t, tt.metainfo.AnnounceList, 5)
836         // There are 6 nodes in the torrent file.
837         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
838 }
839
840 type testDownloadCancelParams struct {
841         ExportClientStatus        bool
842         SetLeecherStorageCapacity bool
843         LeecherStorageCapacity    int64
844         Cancel                    bool
845 }
846
847 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
848         greetingTempDir, mi := testutil.GreetingTestTorrent()
849         defer os.RemoveAll(greetingTempDir)
850         cfg := TestingConfig()
851         cfg.Seed = true
852         cfg.DataDir = greetingTempDir
853         seeder, err := NewClient(cfg)
854         require.NoError(t, err)
855         defer seeder.Close()
856         if ps.ExportClientStatus {
857                 testutil.ExportStatusWriter(seeder, "s")
858         }
859         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
860         leecherDataDir, err := ioutil.TempDir("", "")
861         require.NoError(t, err)
862         defer os.RemoveAll(leecherDataDir)
863         fc, err := filecache.NewCache(leecherDataDir)
864         require.NoError(t, err)
865         if ps.SetLeecherStorageCapacity {
866                 fc.SetCapacity(ps.LeecherStorageCapacity)
867         }
868         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
869         cfg.DataDir = leecherDataDir
870         leecher, _ := NewClient(cfg)
871         defer leecher.Close()
872         if ps.ExportClientStatus {
873                 testutil.ExportStatusWriter(leecher, "l")
874         }
875         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
876                 ret = TorrentSpecFromMetaInfo(mi)
877                 ret.ChunkSize = 2
878                 return
879         }())
880         require.NoError(t, err)
881         assert.True(t, new)
882         psc := leecherGreeting.SubscribePieceStateChanges()
883         defer psc.Close()
884         leecherGreeting.DownloadAll()
885         if ps.Cancel {
886                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
887         }
888         addClientPeer(leecherGreeting, seeder)
889         completes := make(map[int]bool, 3)
890 values:
891         for {
892                 // started := time.Now()
893                 select {
894                 case _v := <-psc.Values:
895                         // log.Print(time.Since(started))
896                         v := _v.(PieceStateChange)
897                         completes[v.Index] = v.Complete
898                 case <-time.After(100 * time.Millisecond):
899                         break values
900                 }
901         }
902         if ps.Cancel {
903                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
904         } else {
905                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
906         }
907
908 }
909
910 func TestTorrentDownloadAll(t *testing.T) {
911         testDownloadCancel(t, testDownloadCancelParams{})
912 }
913
914 func TestTorrentDownloadAllThenCancel(t *testing.T) {
915         testDownloadCancel(t, testDownloadCancelParams{
916                 Cancel: true,
917         })
918 }
919
920 // Ensure that it's an error for a peer to send an invalid have message.
921 func TestPeerInvalidHave(t *testing.T) {
922         cl, err := NewClient(TestingConfig())
923         require.NoError(t, err)
924         defer cl.Close()
925         info := metainfo.Info{
926                 PieceLength: 1,
927                 Pieces:      make([]byte, 20),
928                 Files:       []metainfo.FileInfo{{Length: 1}},
929         }
930         infoBytes, err := bencode.Marshal(info)
931         require.NoError(t, err)
932         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
933                 InfoBytes: infoBytes,
934                 InfoHash:  metainfo.HashBytes(infoBytes),
935         })
936         require.NoError(t, err)
937         assert.True(t, _new)
938         defer tt.Drop()
939         cn := &connection{
940                 t: tt,
941         }
942         assert.NoError(t, cn.peerSentHave(0))
943         assert.Error(t, cn.peerSentHave(1))
944 }
945
946 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
947         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
948         defer os.RemoveAll(greetingTempDir)
949         cfg := TestingConfig()
950         cfg.DataDir = greetingTempDir
951         seeder, err := NewClient(TestingConfig())
952         require.NoError(t, err)
953         seeder.AddTorrentSpec(&TorrentSpec{
954                 InfoBytes: greetingMetainfo.InfoBytes,
955         })
956 }
957
958 func TestPrepareTrackerAnnounce(t *testing.T) {
959         cl := &Client{}
960         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
961         require.NoError(t, err)
962         assert.False(t, blocked)
963         assert.EqualValues(t, "localhost:1234", host)
964         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
965 }
966
967 // Check that when the listen port is 0, all the protocols listened on have
968 // the same port, and it isn't zero.
969 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
970         cl, err := NewClient(TestingConfig())
971         require.NoError(t, err)
972         defer cl.Close()
973         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
974         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
975 }
976
977 func TestClientDynamicListenTCPOnly(t *testing.T) {
978         cfg := TestingConfig()
979         cfg.DisableUTP = true
980         cl, err := NewClient(cfg)
981         require.NoError(t, err)
982         defer cl.Close()
983         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
984         assert.Nil(t, cl.utpSock)
985 }
986
987 func TestClientDynamicListenUTPOnly(t *testing.T) {
988         cfg := TestingConfig()
989         cfg.DisableTCP = true
990         cl, err := NewClient(cfg)
991         require.NoError(t, err)
992         defer cl.Close()
993         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
994         assert.Nil(t, cl.tcpListener)
995 }
996
997 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
998         cfg := TestingConfig()
999         cfg.DisableTCP = true
1000         cfg.DisableUTP = true
1001         cl, err := NewClient(cfg)
1002         require.NoError(t, err)
1003         defer cl.Close()
1004         assert.Nil(t, cl.ListenAddr())
1005 }
1006
1007 func addClientPeer(t *Torrent, cl *Client) {
1008         t.AddPeers([]Peer{
1009                 {
1010                         IP:   missinggo.AddrIP(cl.ListenAddr()),
1011                         Port: missinggo.AddrPort(cl.ListenAddr()),
1012                 },
1013         })
1014 }
1015
1016 func printConnPeerCounts(t *Torrent) {
1017         t.cl.mu.Lock()
1018         log.Println(len(t.conns), len(t.peers))
1019         t.cl.mu.Unlock()
1020 }
1021
1022 func totalConns(tts []*Torrent) (ret int) {
1023         for _, tt := range tts {
1024                 tt.cl.mu.Lock()
1025                 ret += len(tt.conns)
1026                 tt.cl.mu.Unlock()
1027         }
1028         return
1029 }
1030
1031 func TestSetMaxEstablishedConn(t *testing.T) {
1032         var tts []*Torrent
1033         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1034         for i := range iter.N(3) {
1035                 cl, err := NewClient(TestingConfig())
1036                 require.NoError(t, err)
1037                 defer cl.Close()
1038                 tt, _ := cl.AddTorrentInfoHash(ih)
1039                 tt.SetMaxEstablishedConns(2)
1040                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1041                 tts = append(tts, tt)
1042         }
1043         addPeers := func() {
1044                 for i, tt := range tts {
1045                         for _, _tt := range tts[:i] {
1046                                 addClientPeer(tt, _tt.cl)
1047                         }
1048                 }
1049         }
1050         waitTotalConns := func(num int) {
1051                 for totalConns(tts) != num {
1052                         time.Sleep(time.Millisecond)
1053                 }
1054         }
1055         addPeers()
1056         waitTotalConns(6)
1057         tts[0].SetMaxEstablishedConns(1)
1058         waitTotalConns(4)
1059         tts[0].SetMaxEstablishedConns(0)
1060         waitTotalConns(2)
1061         tts[0].SetMaxEstablishedConns(1)
1062         addPeers()
1063         waitTotalConns(4)
1064         tts[0].SetMaxEstablishedConns(2)
1065         addPeers()
1066         waitTotalConns(6)
1067 }
1068
1069 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1070         os.MkdirAll(dir, 0770)
1071         file, err := os.Create(filepath.Join(dir, name))
1072         require.NoError(t, err)
1073         file.Write([]byte(name))
1074         file.Close()
1075         mi := metainfo.MetaInfo{}
1076         mi.SetDefaults()
1077         info := metainfo.Info{PieceLength: 256 * 1024}
1078         err = info.BuildFromFilePath(filepath.Join(dir, name))
1079         require.NoError(t, err)
1080         mi.InfoBytes, err = bencode.Marshal(info)
1081         require.NoError(t, err)
1082         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1083         tr, err := cl.AddTorrent(&mi)
1084         require.NoError(t, err)
1085         assert.True(t, tr.Seeding())
1086         return magnet
1087 }
1088
1089 // https://github.com/anacrolix/torrent/issues/114
1090 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1091         cfg := TestingConfig()
1092         cfg.DisableUTP = true
1093         cfg.Seed = true
1094         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1095         cfg.Debug = true
1096         cfg.ForceEncryption = true
1097         os.Mkdir(cfg.DataDir, 0755)
1098         server, err := NewClient(cfg)
1099         require.NoError(t, err)
1100         defer server.Close()
1101         testutil.ExportStatusWriter(server, "s")
1102         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1103         makeMagnet(t, server, cfg.DataDir, "test2")
1104         cfg = TestingConfig()
1105         cfg.DisableUTP = true
1106         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1107         cfg.Debug = true
1108         cfg.ForceEncryption = true
1109         client, err := NewClient(cfg)
1110         require.NoError(t, err)
1111         defer client.Close()
1112         testutil.ExportStatusWriter(client, "c")
1113         tr, err := client.AddMagnet(magnet1)
1114         require.NoError(t, err)
1115         tr.AddPeers([]Peer{{
1116                 IP:   missinggo.AddrIP(server.ListenAddr()),
1117                 Port: missinggo.AddrPort(server.ListenAddr()),
1118         }})
1119         <-tr.GotInfo()
1120         tr.DownloadAll()
1121         client.WaitAll()
1122 }