]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Add download rate limiting
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "math/rand"
11         "net"
12         "os"
13         "path/filepath"
14         "strings"
15         "sync"
16         "testing"
17         "time"
18
19         _ "github.com/anacrolix/envpprof"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/filecache"
22         "github.com/anacrolix/missinggo/pubsub"
23         "github.com/anacrolix/utp"
24         "github.com/bradfitz/iter"
25         "github.com/stretchr/testify/assert"
26         "github.com/stretchr/testify/require"
27         "golang.org/x/time/rate"
28
29         "github.com/anacrolix/torrent/bencode"
30         "github.com/anacrolix/torrent/dht"
31         "github.com/anacrolix/torrent/internal/testutil"
32         "github.com/anacrolix/torrent/iplist"
33         "github.com/anacrolix/torrent/metainfo"
34         "github.com/anacrolix/torrent/storage"
35 )
36
37 func init() {
38         log.SetFlags(log.LstdFlags | log.Llongfile)
39 }
40
41 var TestingConfig = Config{
42         ListenAddr:      "localhost:0",
43         NoDHT:           true,
44         DisableTrackers: true,
45         DataDir:         "/tmp/anacrolix",
46         DHTConfig: dht.ServerConfig{
47                 NoDefaultBootstrap: true,
48         },
49         Debug: true,
50 }
51
52 func TestClientDefault(t *testing.T) {
53         cl, err := NewClient(&TestingConfig)
54         require.NoError(t, err)
55         cl.Close()
56 }
57
58 func TestAddDropTorrent(t *testing.T) {
59         cl, err := NewClient(&TestingConfig)
60         require.NoError(t, err)
61         defer cl.Close()
62         dir, mi := testutil.GreetingTestTorrent()
63         defer os.RemoveAll(dir)
64         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
65         require.NoError(t, err)
66         assert.True(t, new)
67         tt.SetMaxEstablishedConns(0)
68         tt.SetMaxEstablishedConns(1)
69         tt.Drop()
70 }
71
72 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
73         t.SkipNow()
74 }
75
76 func TestAddTorrentNoUsableURLs(t *testing.T) {
77         t.SkipNow()
78 }
79
80 func TestAddPeersToUnknownTorrent(t *testing.T) {
81         t.SkipNow()
82 }
83
84 func TestPieceHashSize(t *testing.T) {
85         if pieceHash.Size() != 20 {
86                 t.FailNow()
87         }
88 }
89
90 func TestTorrentInitialState(t *testing.T) {
91         dir, mi := testutil.GreetingTestTorrent()
92         defer os.RemoveAll(dir)
93         tor := &Torrent{
94                 infoHash:          mi.HashInfoBytes(),
95                 pieceStateChanges: pubsub.NewPubSub(),
96         }
97         tor.chunkSize = 2
98         tor.storageOpener = storage.NewClient(storage.NewFile("/dev/null"))
99         // Needed to lock for asynchronous piece verification.
100         tor.cl = new(Client)
101         err := tor.setInfoBytes(mi.InfoBytes)
102         require.NoError(t, err)
103         require.Len(t, tor.pieces, 3)
104         tor.pendAllChunkSpecs(0)
105         tor.cl.mu.Lock()
106         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
107         tor.cl.mu.Unlock()
108         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
109 }
110
111 func TestUnmarshalPEXMsg(t *testing.T) {
112         var m peerExchangeMessage
113         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
114                 t.Fatal(err)
115         }
116         if len(m.Added) != 2 {
117                 t.FailNow()
118         }
119         if m.Added[0].Port != 0x506 {
120                 t.FailNow()
121         }
122 }
123
124 func TestReducedDialTimeout(t *testing.T) {
125         for _, _case := range []struct {
126                 Max             time.Duration
127                 HalfOpenLimit   int
128                 PendingPeers    int
129                 ExpectedReduced time.Duration
130         }{
131                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
132                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
133                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
134                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
135                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
136                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
137         } {
138                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
139                 expected := _case.ExpectedReduced
140                 if expected < minDialTimeout {
141                         expected = minDialTimeout
142                 }
143                 if reduced != expected {
144                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
145                 }
146         }
147 }
148
149 func TestUTPRawConn(t *testing.T) {
150         l, err := utp.NewSocket("udp", "")
151         if err != nil {
152                 t.Fatal(err)
153         }
154         defer l.Close()
155         go func() {
156                 for {
157                         _, err := l.Accept()
158                         if err != nil {
159                                 break
160                         }
161                 }
162         }()
163         // Connect a UTP peer to see if the RawConn will still work.
164         s, _ := utp.NewSocket("udp", "")
165         defer s.Close()
166         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
167         if err != nil {
168                 t.Fatalf("error dialing utp listener: %s", err)
169         }
170         defer utpPeer.Close()
171         peer, err := net.ListenPacket("udp", ":0")
172         if err != nil {
173                 t.Fatal(err)
174         }
175         defer peer.Close()
176
177         msgsReceived := 0
178         // How many messages to send. I've set this to double the channel buffer
179         // size in the raw packetConn.
180         const N = 200
181         readerStopped := make(chan struct{})
182         // The reader goroutine.
183         go func() {
184                 defer close(readerStopped)
185                 b := make([]byte, 500)
186                 for i := 0; i < N; i++ {
187                         n, _, err := l.ReadFrom(b)
188                         if err != nil {
189                                 t.Fatalf("error reading from raw conn: %s", err)
190                         }
191                         msgsReceived++
192                         var d int
193                         fmt.Sscan(string(b[:n]), &d)
194                         if d != i {
195                                 log.Printf("got wrong number: expected %d, got %d", i, d)
196                         }
197                 }
198         }()
199         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
200         if err != nil {
201                 t.Fatal(err)
202         }
203         for i := 0; i < N; i++ {
204                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
205                 if err != nil {
206                         t.Fatal(err)
207                 }
208                 time.Sleep(time.Microsecond)
209         }
210         select {
211         case <-readerStopped:
212         case <-time.After(time.Second):
213                 t.Fatal("reader timed out")
214         }
215         if msgsReceived != N {
216                 t.Fatalf("messages received: %d", msgsReceived)
217         }
218 }
219
220 func TestTwoClientsArbitraryPorts(t *testing.T) {
221         for i := 0; i < 2; i++ {
222                 cl, err := NewClient(&TestingConfig)
223                 if err != nil {
224                         t.Fatal(err)
225                 }
226                 defer cl.Close()
227         }
228 }
229
230 func TestAddDropManyTorrents(t *testing.T) {
231         cl, err := NewClient(&TestingConfig)
232         require.NoError(t, err)
233         defer cl.Close()
234         for i := range iter.N(1000) {
235                 var spec TorrentSpec
236                 binary.PutVarint(spec.InfoHash[:], int64(i))
237                 tt, new, err := cl.AddTorrentSpec(&spec)
238                 assert.NoError(t, err)
239                 assert.True(t, new)
240                 defer tt.Drop()
241         }
242 }
243
244 type FileCacheClientStorageFactoryParams struct {
245         Capacity    int64
246         SetCapacity bool
247         Wrapper     func(*filecache.Cache) storage.ClientImpl
248 }
249
250 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
251         return func(dataDir string) storage.ClientImpl {
252                 fc, err := filecache.NewCache(dataDir)
253                 if err != nil {
254                         panic(err)
255                 }
256                 if ps.SetCapacity {
257                         fc.SetCapacity(ps.Capacity)
258                 }
259                 return ps.Wrapper(fc)
260         }
261 }
262
263 type storageFactory func(string) storage.ClientImpl
264
265 func TestClientTransferDefault(t *testing.T) {
266         testClientTransfer(t, testClientTransferParams{
267                 ExportClientStatus: true,
268                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
269                         Wrapper: fileCachePieceResourceStorage,
270                 }),
271         })
272 }
273
274 func TestClientTransferRateLimitedUpload(t *testing.T) {
275         started := time.Now()
276         testClientTransfer(t, testClientTransferParams{
277                 // We are uploading 13 bytes (the length of the greeting torrent). The
278                 // chunks are 2 bytes in length. Then the smallest burst we can run
279                 // with is 2. Time taken is (13-burst)/rate.
280                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
281         })
282         require.True(t, time.Since(started) > time.Second)
283 }
284
285 func TestClientTransferRateLimitedDownload(t *testing.T) {
286         testClientTransfer(t, testClientTransferParams{
287                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
288         })
289 }
290
291 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
292         return storage.NewResourcePieces(fc.AsResourceProvider())
293 }
294
295 func fileCachePieceFileStorage(fc *filecache.Cache) storage.ClientImpl {
296         return storage.NewFileStorePieces(fc.AsFileStore())
297 }
298
299 func TestClientTransferSmallCache(t *testing.T) {
300         testClientTransfer(t, testClientTransferParams{
301                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
302                         SetCapacity: true,
303                         // Going below the piece length means it can't complete a piece so
304                         // that it can be hashed.
305                         Capacity: 5,
306                         Wrapper:  fileCachePieceResourceStorage,
307                 }),
308                 SetReadahead: true,
309                 // Can't readahead too far or the cache will thrash and drop data we
310                 // thought we had.
311                 Readahead:          0,
312                 ExportClientStatus: true,
313         })
314 }
315
316 func TestClientTransferVarious(t *testing.T) {
317         // Leecher storage
318         for _, ls := range []storageFactory{
319                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
320                         Wrapper: fileCachePieceFileStorage,
321                 }),
322                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
323                         Wrapper: fileCachePieceResourceStorage,
324                 }),
325                 storage.NewBoltDB,
326         } {
327                 // Seeder storage
328                 for _, ss := range []func(string) storage.ClientImpl{
329                         storage.NewFile,
330                         storage.NewMMap,
331                 } {
332                         for _, responsive := range []bool{false, true} {
333                                 testClientTransfer(t, testClientTransferParams{
334                                         Responsive:     responsive,
335                                         SeederStorage:  ss,
336                                         LeecherStorage: ls,
337                                 })
338                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
339                                         testClientTransfer(t, testClientTransferParams{
340                                                 SeederStorage:  ss,
341                                                 Responsive:     responsive,
342                                                 SetReadahead:   true,
343                                                 Readahead:      readahead,
344                                                 LeecherStorage: ls,
345                                         })
346                                 }
347                         }
348                 }
349         }
350 }
351
352 type testClientTransferParams struct {
353         Responsive                 bool
354         Readahead                  int64
355         SetReadahead               bool
356         ExportClientStatus         bool
357         LeecherStorage             func(string) storage.ClientImpl
358         SeederStorage              func(string) storage.ClientImpl
359         SeederUploadRateLimiter    *rate.Limiter
360         LeecherDownloadRateLimiter *rate.Limiter
361 }
362
363 // Creates a seeder and a leecher, and ensures the data transfers when a read
364 // is attempted on the leecher.
365 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
366         greetingTempDir, mi := testutil.GreetingTestTorrent()
367         defer os.RemoveAll(greetingTempDir)
368         // Create seeder and a Torrent.
369         cfg := TestingConfig
370         cfg.Seed = true
371         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
372         // cfg.ListenAddr = "localhost:4000"
373         if ps.SeederStorage != nil {
374                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
375         } else {
376                 cfg.DataDir = greetingTempDir
377         }
378         seeder, err := NewClient(&cfg)
379         require.NoError(t, err)
380         defer seeder.Close()
381         if ps.ExportClientStatus {
382                 testutil.ExportStatusWriter(seeder, "s")
383         }
384         // seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
385         _, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
386         require.NoError(t, err)
387         assert.True(t, new)
388         // Create leecher and a Torrent.
389         leecherDataDir, err := ioutil.TempDir("", "")
390         require.NoError(t, err)
391         defer os.RemoveAll(leecherDataDir)
392         if ps.LeecherStorage == nil {
393                 cfg.DataDir = leecherDataDir
394         } else {
395                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
396         }
397         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
398         // cfg.ListenAddr = "localhost:4001"
399         leecher, err := NewClient(&cfg)
400         require.NoError(t, err)
401         defer leecher.Close()
402         if ps.ExportClientStatus {
403                 testutil.ExportStatusWriter(leecher, "l")
404         }
405         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
406                 ret = TorrentSpecFromMetaInfo(mi)
407                 ret.ChunkSize = 2
408                 ret.Storage = storage.NewFile(leecherDataDir)
409                 return
410         }())
411         require.NoError(t, err)
412         assert.True(t, new)
413         // Now do some things with leecher and seeder.
414         addClientPeer(leecherGreeting, seeder)
415         r := leecherGreeting.NewReader()
416         defer r.Close()
417         if ps.Responsive {
418                 r.SetResponsive()
419         }
420         if ps.SetReadahead {
421                 r.SetReadahead(ps.Readahead)
422         }
423         assertReadAllGreeting(t, r)
424         // After one read through, we can assume certain torrent statistics.
425         // These are not a strict requirement. It is however interesting to
426         // follow.
427         // t.Logf("%#v", seederTorrent.Stats())
428         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
429         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
430         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
431         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
432         // Read through again for the cases where the torrent data size exceeds
433         // the size of the cache.
434         assertReadAllGreeting(t, r)
435 }
436
437 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
438         pos, err := r.Seek(0, os.SEEK_SET)
439         assert.NoError(t, err)
440         assert.EqualValues(t, 0, pos)
441         _greeting, err := ioutil.ReadAll(r)
442         assert.NoError(t, err)
443         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
444 }
445
446 // Check that after completing leeching, a leecher transitions to a seeding
447 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
448 func TestSeedAfterDownloading(t *testing.T) {
449         greetingTempDir, mi := testutil.GreetingTestTorrent()
450         defer os.RemoveAll(greetingTempDir)
451         cfg := TestingConfig
452         cfg.Seed = true
453         cfg.DataDir = greetingTempDir
454         seeder, err := NewClient(&cfg)
455         require.NoError(t, err)
456         defer seeder.Close()
457         testutil.ExportStatusWriter(seeder, "s")
458         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
459         cfg.DataDir, err = ioutil.TempDir("", "")
460         require.NoError(t, err)
461         defer os.RemoveAll(cfg.DataDir)
462         leecher, err := NewClient(&cfg)
463         require.NoError(t, err)
464         defer leecher.Close()
465         testutil.ExportStatusWriter(leecher, "l")
466         cfg.Seed = false
467         // cfg.TorrentDataOpener = nil
468         cfg.DataDir, err = ioutil.TempDir("", "")
469         require.NoError(t, err)
470         defer os.RemoveAll(cfg.DataDir)
471         leecherLeecher, _ := NewClient(&cfg)
472         defer leecherLeecher.Close()
473         testutil.ExportStatusWriter(leecherLeecher, "ll")
474         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
475                 ret = TorrentSpecFromMetaInfo(mi)
476                 ret.ChunkSize = 2
477                 return
478         }())
479         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
480                 ret = TorrentSpecFromMetaInfo(mi)
481                 ret.ChunkSize = 3
482                 return
483         }())
484         // Simultaneously DownloadAll in Leecher, and read the contents
485         // consecutively in LeecherLeecher. This non-deterministically triggered a
486         // case where the leecher wouldn't unchoke the LeecherLeecher.
487         var wg sync.WaitGroup
488         wg.Add(1)
489         go func() {
490                 defer wg.Done()
491                 r := llg.NewReader()
492                 defer r.Close()
493                 b, err := ioutil.ReadAll(r)
494                 require.NoError(t, err)
495                 assert.EqualValues(t, testutil.GreetingFileContents, b)
496         }()
497         addClientPeer(leecherGreeting, seeder)
498         addClientPeer(leecherGreeting, leecherLeecher)
499         wg.Add(1)
500         go func() {
501                 defer wg.Done()
502                 leecherGreeting.DownloadAll()
503                 leecher.WaitAll()
504         }()
505         wg.Wait()
506 }
507
508 func TestMergingTrackersByAddingSpecs(t *testing.T) {
509         cl, err := NewClient(&TestingConfig)
510         require.NoError(t, err)
511         defer cl.Close()
512         spec := TorrentSpec{}
513         T, new, _ := cl.AddTorrentSpec(&spec)
514         if !new {
515                 t.FailNow()
516         }
517         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
518         _, new, _ = cl.AddTorrentSpec(&spec)
519         assert.False(t, new)
520         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
521         // Because trackers are disabled in TestingConfig.
522         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
523 }
524
525 type badStorage struct{}
526
527 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
528         return bs, nil
529 }
530
531 func (bs badStorage) Close() error {
532         return nil
533 }
534
535 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
536         return badStoragePiece{p}
537 }
538
539 type badStoragePiece struct {
540         p metainfo.Piece
541 }
542
543 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
544         return 0, nil
545 }
546
547 func (p badStoragePiece) GetIsComplete() bool {
548         return true
549 }
550
551 func (p badStoragePiece) MarkComplete() error {
552         return errors.New("psyyyyyyyche")
553 }
554
555 func (p badStoragePiece) MarkNotComplete() error {
556         return errors.New("psyyyyyyyche")
557 }
558
559 func (p badStoragePiece) randomlyTruncatedDataString() string {
560         return "hello, world\n"[:rand.Intn(14)]
561 }
562
563 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
564         r := strings.NewReader(p.randomlyTruncatedDataString())
565         return r.ReadAt(b, off+p.p.Offset())
566 }
567
568 // We read from a piece which is marked completed, but is missing data.
569 func TestCompletedPieceWrongSize(t *testing.T) {
570         cfg := TestingConfig
571         cfg.DefaultStorage = badStorage{}
572         cl, err := NewClient(&cfg)
573         require.NoError(t, err)
574         defer cl.Close()
575         info := metainfo.Info{
576                 PieceLength: 15,
577                 Pieces:      make([]byte, 20),
578                 Files: []metainfo.FileInfo{
579                         metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
580                 },
581         }
582         b, err := bencode.Marshal(info)
583         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
584                 InfoBytes: b,
585                 InfoHash:  metainfo.HashBytes(b),
586         })
587         require.NoError(t, err)
588         defer tt.Drop()
589         assert.True(t, new)
590         r := tt.NewReader()
591         defer r.Close()
592         b, err = ioutil.ReadAll(r)
593         assert.Len(t, b, 13)
594         assert.NoError(t, err)
595 }
596
597 func BenchmarkAddLargeTorrent(b *testing.B) {
598         cfg := TestingConfig
599         cfg.DisableTCP = true
600         cfg.DisableUTP = true
601         cfg.ListenAddr = "redonk"
602         cl, err := NewClient(&cfg)
603         require.NoError(b, err)
604         defer cl.Close()
605         for range iter.N(b.N) {
606                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
607                 if err != nil {
608                         b.Fatal(err)
609                 }
610                 t.Drop()
611         }
612 }
613
614 func TestResponsive(t *testing.T) {
615         seederDataDir, mi := testutil.GreetingTestTorrent()
616         defer os.RemoveAll(seederDataDir)
617         cfg := TestingConfig
618         cfg.Seed = true
619         cfg.DataDir = seederDataDir
620         seeder, err := NewClient(&cfg)
621         require.Nil(t, err)
622         defer seeder.Close()
623         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
624         leecherDataDir, err := ioutil.TempDir("", "")
625         require.Nil(t, err)
626         defer os.RemoveAll(leecherDataDir)
627         cfg = TestingConfig
628         cfg.DataDir = leecherDataDir
629         leecher, err := NewClient(&cfg)
630         require.Nil(t, err)
631         defer leecher.Close()
632         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
633                 ret = TorrentSpecFromMetaInfo(mi)
634                 ret.ChunkSize = 2
635                 return
636         }())
637         addClientPeer(leecherTorrent, seeder)
638         reader := leecherTorrent.NewReader()
639         defer reader.Close()
640         reader.SetReadahead(0)
641         reader.SetResponsive()
642         b := make([]byte, 2)
643         _, err = reader.Seek(3, os.SEEK_SET)
644         require.NoError(t, err)
645         _, err = io.ReadFull(reader, b)
646         assert.Nil(t, err)
647         assert.EqualValues(t, "lo", string(b))
648         _, err = reader.Seek(11, os.SEEK_SET)
649         require.NoError(t, err)
650         n, err := io.ReadFull(reader, b)
651         assert.Nil(t, err)
652         assert.EqualValues(t, 2, n)
653         assert.EqualValues(t, "d\n", string(b))
654 }
655
656 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
657         seederDataDir, mi := testutil.GreetingTestTorrent()
658         defer os.RemoveAll(seederDataDir)
659         cfg := TestingConfig
660         cfg.Seed = true
661         cfg.DataDir = seederDataDir
662         seeder, err := NewClient(&cfg)
663         require.Nil(t, err)
664         defer seeder.Close()
665         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
666         leecherDataDir, err := ioutil.TempDir("", "")
667         require.Nil(t, err)
668         defer os.RemoveAll(leecherDataDir)
669         cfg = TestingConfig
670         cfg.DataDir = leecherDataDir
671         leecher, err := NewClient(&cfg)
672         require.Nil(t, err)
673         defer leecher.Close()
674         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
675                 ret = TorrentSpecFromMetaInfo(mi)
676                 ret.ChunkSize = 2
677                 return
678         }())
679         addClientPeer(leecherTorrent, seeder)
680         reader := leecherTorrent.NewReader()
681         defer reader.Close()
682         reader.SetReadahead(0)
683         reader.SetResponsive()
684         b := make([]byte, 2)
685         _, err = reader.Seek(3, os.SEEK_SET)
686         require.NoError(t, err)
687         _, err = io.ReadFull(reader, b)
688         assert.Nil(t, err)
689         assert.EqualValues(t, "lo", string(b))
690         go leecherTorrent.Drop()
691         _, err = reader.Seek(11, os.SEEK_SET)
692         require.NoError(t, err)
693         n, err := reader.Read(b)
694         assert.EqualError(t, err, "torrent closed")
695         assert.EqualValues(t, 0, n)
696 }
697
698 func TestDHTInheritBlocklist(t *testing.T) {
699         ipl := iplist.New(nil)
700         require.NotNil(t, ipl)
701         cfg := TestingConfig
702         cfg.IPBlocklist = ipl
703         cfg.NoDHT = false
704         cl, err := NewClient(&cfg)
705         require.NoError(t, err)
706         defer cl.Close()
707         require.Equal(t, ipl, cl.DHT().IPBlocklist())
708 }
709
710 // Check that stuff is merged in subsequent AddTorrentSpec for the same
711 // infohash.
712 func TestAddTorrentSpecMerging(t *testing.T) {
713         cl, err := NewClient(&TestingConfig)
714         require.NoError(t, err)
715         defer cl.Close()
716         dir, mi := testutil.GreetingTestTorrent()
717         defer os.RemoveAll(dir)
718         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
719                 InfoHash: mi.HashInfoBytes(),
720         })
721         require.NoError(t, err)
722         require.True(t, new)
723         require.Nil(t, tt.Info())
724         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
725         require.NoError(t, err)
726         require.False(t, new)
727         require.NotNil(t, tt.Info())
728 }
729
730 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
731         dir, mi := testutil.GreetingTestTorrent()
732         os.RemoveAll(dir)
733         cl, _ := NewClient(&TestingConfig)
734         defer cl.Close()
735         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
736                 InfoHash: mi.HashInfoBytes(),
737         })
738         tt.Drop()
739         assert.EqualValues(t, 0, len(cl.Torrents()))
740         select {
741         case <-tt.GotInfo():
742                 t.FailNow()
743         default:
744         }
745 }
746
747 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
748         for i := range iter.N(info.NumPieces()) {
749                 p := info.Piece(i)
750                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
751         }
752 }
753
754 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
755         fileCacheDir, err := ioutil.TempDir("", "")
756         require.NoError(t, err)
757         defer os.RemoveAll(fileCacheDir)
758         fileCache, err := filecache.NewCache(fileCacheDir)
759         require.NoError(t, err)
760         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
761         defer os.RemoveAll(greetingDataTempDir)
762         filePieceStore := csf(fileCache)
763         info, err := greetingMetainfo.UnmarshalInfo()
764         require.NoError(t, err)
765         ih := greetingMetainfo.HashInfoBytes()
766         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
767         require.NoError(t, err)
768         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
769         // require.Equal(t, len(testutil.GreetingFileContents), written)
770         // require.NoError(t, err)
771         for i := 0; i < info.NumPieces(); i++ {
772                 p := info.Piece(i)
773                 if alreadyCompleted {
774                         err := greetingData.Piece(p).MarkComplete()
775                         assert.NoError(t, err)
776                 }
777         }
778         cfg := TestingConfig
779         // TODO: Disable network option?
780         cfg.DisableTCP = true
781         cfg.DisableUTP = true
782         cfg.DefaultStorage = filePieceStore
783         cl, err := NewClient(&cfg)
784         require.NoError(t, err)
785         defer cl.Close()
786         tt, err := cl.AddTorrent(greetingMetainfo)
787         require.NoError(t, err)
788         psrs := tt.PieceStateRuns()
789         assert.Len(t, psrs, 1)
790         assert.EqualValues(t, 3, psrs[0].Length)
791         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
792         if alreadyCompleted {
793                 r := tt.NewReader()
794                 b, err := ioutil.ReadAll(r)
795                 assert.NoError(t, err)
796                 assert.EqualValues(t, testutil.GreetingFileContents, b)
797         }
798 }
799
800 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
801         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceFileStorage)
802         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
803 }
804
805 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
806         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceFileStorage)
807         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
808 }
809
810 func TestAddMetainfoWithNodes(t *testing.T) {
811         cfg := TestingConfig
812         cfg.NoDHT = false
813         // For now, we want to just jam the nodes into the table, without
814         // verifying them first. Also the DHT code doesn't support mixing secure
815         // and insecure nodes if security is enabled (yet).
816         cfg.DHTConfig.NoSecurity = true
817         cl, err := NewClient(&cfg)
818         require.NoError(t, err)
819         defer cl.Close()
820         assert.EqualValues(t, cl.DHT().NumNodes(), 0)
821         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
822         require.NoError(t, err)
823         assert.Len(t, tt.metainfo.AnnounceList, 5)
824         assert.EqualValues(t, 6, cl.DHT().NumNodes())
825 }
826
827 type testDownloadCancelParams struct {
828         ExportClientStatus        bool
829         SetLeecherStorageCapacity bool
830         LeecherStorageCapacity    int64
831         Cancel                    bool
832 }
833
834 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
835         greetingTempDir, mi := testutil.GreetingTestTorrent()
836         defer os.RemoveAll(greetingTempDir)
837         cfg := TestingConfig
838         cfg.Seed = true
839         cfg.DataDir = greetingTempDir
840         seeder, err := NewClient(&cfg)
841         require.NoError(t, err)
842         defer seeder.Close()
843         if ps.ExportClientStatus {
844                 testutil.ExportStatusWriter(seeder, "s")
845         }
846         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
847         leecherDataDir, err := ioutil.TempDir("", "")
848         require.NoError(t, err)
849         defer os.RemoveAll(leecherDataDir)
850         fc, err := filecache.NewCache(leecherDataDir)
851         require.NoError(t, err)
852         if ps.SetLeecherStorageCapacity {
853                 fc.SetCapacity(ps.LeecherStorageCapacity)
854         }
855         cfg.DefaultStorage = storage.NewFileStorePieces(fc.AsFileStore())
856         cfg.DataDir = leecherDataDir
857         leecher, _ := NewClient(&cfg)
858         defer leecher.Close()
859         if ps.ExportClientStatus {
860                 testutil.ExportStatusWriter(leecher, "l")
861         }
862         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
863                 ret = TorrentSpecFromMetaInfo(mi)
864                 ret.ChunkSize = 2
865                 return
866         }())
867         require.NoError(t, err)
868         assert.True(t, new)
869         psc := leecherGreeting.SubscribePieceStateChanges()
870         defer psc.Close()
871         leecherGreeting.DownloadAll()
872         if ps.Cancel {
873                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
874         }
875         addClientPeer(leecherGreeting, seeder)
876         completes := make(map[int]bool, 3)
877 values:
878         for {
879                 // started := time.Now()
880                 select {
881                 case _v := <-psc.Values:
882                         // log.Print(time.Since(started))
883                         v := _v.(PieceStateChange)
884                         completes[v.Index] = v.Complete
885                 case <-time.After(100 * time.Millisecond):
886                         break values
887                 }
888         }
889         if ps.Cancel {
890                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
891         } else {
892                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
893         }
894
895 }
896
897 func TestTorrentDownloadAll(t *testing.T) {
898         testDownloadCancel(t, testDownloadCancelParams{})
899 }
900
901 func TestTorrentDownloadAllThenCancel(t *testing.T) {
902         testDownloadCancel(t, testDownloadCancelParams{
903                 Cancel: true,
904         })
905 }
906
907 // Ensure that it's an error for a peer to send an invalid have message.
908 func TestPeerInvalidHave(t *testing.T) {
909         cl, err := NewClient(&TestingConfig)
910         require.NoError(t, err)
911         defer cl.Close()
912         info := metainfo.Info{
913                 PieceLength: 1,
914                 Pieces:      make([]byte, 20),
915                 Files:       []metainfo.FileInfo{{Length: 1}},
916         }
917         infoBytes, err := bencode.Marshal(info)
918         require.NoError(t, err)
919         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
920                 InfoBytes: infoBytes,
921                 InfoHash:  metainfo.HashBytes(infoBytes),
922         })
923         require.NoError(t, err)
924         assert.True(t, _new)
925         defer tt.Drop()
926         cn := &connection{
927                 t: tt,
928         }
929         assert.NoError(t, cn.peerSentHave(0))
930         assert.Error(t, cn.peerSentHave(1))
931 }
932
933 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
934         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
935         defer os.RemoveAll(greetingTempDir)
936         cfg := TestingConfig
937         cfg.DataDir = greetingTempDir
938         seeder, err := NewClient(&TestingConfig)
939         require.NoError(t, err)
940         seeder.AddTorrentSpec(&TorrentSpec{
941                 InfoBytes: greetingMetainfo.InfoBytes,
942         })
943 }
944
945 func TestPrepareTrackerAnnounce(t *testing.T) {
946         cl := &Client{}
947         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
948         require.NoError(t, err)
949         assert.False(t, blocked)
950         assert.EqualValues(t, "localhost:1234", host)
951         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
952 }
953
954 // Check that when the listen port is 0, all the protocols listened on have
955 // the same port, and it isn't zero.
956 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
957         cl, err := NewClient(&TestingConfig)
958         require.NoError(t, err)
959         defer cl.Close()
960         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
961         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
962 }
963
964 func TestClientDynamicListenTCPOnly(t *testing.T) {
965         cfg := TestingConfig
966         cfg.DisableUTP = true
967         cl, err := NewClient(&cfg)
968         require.NoError(t, err)
969         defer cl.Close()
970         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
971         assert.Nil(t, cl.utpSock)
972 }
973
974 func TestClientDynamicListenUTPOnly(t *testing.T) {
975         cfg := TestingConfig
976         cfg.DisableTCP = true
977         cl, err := NewClient(&cfg)
978         require.NoError(t, err)
979         defer cl.Close()
980         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
981         assert.Nil(t, cl.tcpListener)
982 }
983
984 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
985         cfg := TestingConfig
986         cfg.DisableTCP = true
987         cfg.DisableUTP = true
988         cl, err := NewClient(&cfg)
989         require.NoError(t, err)
990         defer cl.Close()
991         assert.Nil(t, cl.ListenAddr())
992 }
993
994 func addClientPeer(t *Torrent, cl *Client) {
995         t.AddPeers([]Peer{
996                 Peer{
997                         IP:   missinggo.AddrIP(cl.ListenAddr()),
998                         Port: missinggo.AddrPort(cl.ListenAddr()),
999                 },
1000         })
1001 }
1002
1003 func printConnPeerCounts(t *Torrent) {
1004         t.cl.mu.Lock()
1005         log.Println(len(t.conns), len(t.peers))
1006         t.cl.mu.Unlock()
1007 }
1008
1009 func totalConns(tts []*Torrent) (ret int) {
1010         for _, tt := range tts {
1011                 tt.cl.mu.Lock()
1012                 ret += len(tt.conns)
1013                 tt.cl.mu.Unlock()
1014         }
1015         return
1016 }
1017
1018 func TestSetMaxEstablishedConn(t *testing.T) {
1019         var tts []*Torrent
1020         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1021         cfg := TestingConfig
1022         for i := range iter.N(3) {
1023                 cl, err := NewClient(&cfg)
1024                 require.NoError(t, err)
1025                 defer cl.Close()
1026                 tt, _ := cl.AddTorrentInfoHash(ih)
1027                 tt.SetMaxEstablishedConns(2)
1028                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1029                 tts = append(tts, tt)
1030         }
1031         addPeers := func() {
1032                 for i, tt := range tts {
1033                         for _, _tt := range tts[:i] {
1034                                 addClientPeer(tt, _tt.cl)
1035                         }
1036                 }
1037         }
1038         waitTotalConns := func(num int) {
1039                 for totalConns(tts) != num {
1040                         time.Sleep(time.Millisecond)
1041                 }
1042         }
1043         addPeers()
1044         waitTotalConns(6)
1045         tts[0].SetMaxEstablishedConns(1)
1046         waitTotalConns(4)
1047         tts[0].SetMaxEstablishedConns(0)
1048         waitTotalConns(2)
1049         tts[0].SetMaxEstablishedConns(1)
1050         addPeers()
1051         waitTotalConns(4)
1052         tts[0].SetMaxEstablishedConns(2)
1053         addPeers()
1054         waitTotalConns(6)
1055 }
1056
1057 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1058         os.MkdirAll(dir, 0770)
1059         file, err := os.Create(filepath.Join(dir, name))
1060         require.NoError(t, err)
1061         file.Write([]byte(name))
1062         file.Close()
1063         mi := metainfo.MetaInfo{}
1064         mi.SetDefaults()
1065         info := metainfo.Info{PieceLength: 256 * 1024}
1066         err = info.BuildFromFilePath(filepath.Join(dir, name))
1067         require.NoError(t, err)
1068         mi.InfoBytes, err = bencode.Marshal(info)
1069         require.NoError(t, err)
1070         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1071         tr, err := cl.AddTorrent(&mi)
1072         require.NoError(t, err)
1073         assert.True(t, tr.Seeding())
1074         return magnet
1075 }
1076
1077 // https://github.com/anacrolix/torrent/issues/114
1078 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1079         cfg := TestingConfig
1080         cfg.DisableUTP = true
1081         cfg.Seed = true
1082         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1083         cfg.Debug = true
1084         cfg.ForceEncryption = true
1085         os.Mkdir(cfg.DataDir, 0755)
1086         server, err := NewClient(&cfg)
1087         require.NoError(t, err)
1088         defer server.Close()
1089         testutil.ExportStatusWriter(server, "s")
1090         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1091         makeMagnet(t, server, cfg.DataDir, "test2")
1092         cfg = TestingConfig
1093         cfg.DisableUTP = true
1094         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1095         cfg.Debug = true
1096         cfg.ForceEncryption = true
1097         client, err := NewClient(&cfg)
1098         require.NoError(t, err)
1099         defer client.Close()
1100         testutil.ExportStatusWriter(client, "c")
1101         tr, err := client.AddMagnet(magnet1)
1102         require.NoError(t, err)
1103         tr.AddPeers([]Peer{Peer{
1104                 IP:   missinggo.AddrIP(server.ListenAddr()),
1105                 Port: missinggo.AddrPort(server.ListenAddr()),
1106         }})
1107         <-tr.GotInfo()
1108         tr.DownloadAll()
1109         client.WaitAll()
1110 }