]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Tidy up TestMultipleTorrentsWithEncryption test
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "math/rand"
11         "net"
12         "os"
13         "path/filepath"
14         "strings"
15         "sync"
16         "testing"
17         "time"
18
19         _ "github.com/anacrolix/envpprof"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/filecache"
22         "github.com/anacrolix/missinggo/pubsub"
23         "github.com/anacrolix/utp"
24         "github.com/bradfitz/iter"
25         "github.com/stretchr/testify/assert"
26         "github.com/stretchr/testify/require"
27
28         "github.com/anacrolix/torrent/bencode"
29         "github.com/anacrolix/torrent/dht"
30         "github.com/anacrolix/torrent/internal/testutil"
31         "github.com/anacrolix/torrent/iplist"
32         "github.com/anacrolix/torrent/metainfo"
33         "github.com/anacrolix/torrent/storage"
34 )
35
36 func init() {
37         log.SetFlags(log.LstdFlags | log.Llongfile)
38 }
39
40 var TestingConfig = Config{
41         ListenAddr:      "localhost:0",
42         NoDHT:           true,
43         DisableTrackers: true,
44         DataDir:         "/tmp/anacrolix",
45         DHTConfig: dht.ServerConfig{
46                 NoDefaultBootstrap: true,
47         },
48 }
49
50 func TestClientDefault(t *testing.T) {
51         cl, err := NewClient(&TestingConfig)
52         require.NoError(t, err)
53         cl.Close()
54 }
55
56 func TestAddDropTorrent(t *testing.T) {
57         cl, err := NewClient(&TestingConfig)
58         require.NoError(t, err)
59         defer cl.Close()
60         dir, mi := testutil.GreetingTestTorrent()
61         defer os.RemoveAll(dir)
62         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
63         require.NoError(t, err)
64         assert.True(t, new)
65         tt.SetMaxEstablishedConns(0)
66         tt.SetMaxEstablishedConns(1)
67         tt.Drop()
68 }
69
70 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
71         t.SkipNow()
72 }
73
74 func TestAddTorrentNoUsableURLs(t *testing.T) {
75         t.SkipNow()
76 }
77
78 func TestAddPeersToUnknownTorrent(t *testing.T) {
79         t.SkipNow()
80 }
81
82 func TestPieceHashSize(t *testing.T) {
83         if pieceHash.Size() != 20 {
84                 t.FailNow()
85         }
86 }
87
88 func TestTorrentInitialState(t *testing.T) {
89         dir, mi := testutil.GreetingTestTorrent()
90         defer os.RemoveAll(dir)
91         tor := &Torrent{
92                 infoHash:          mi.HashInfoBytes(),
93                 pieceStateChanges: pubsub.NewPubSub(),
94         }
95         tor.chunkSize = 2
96         tor.storageOpener = storage.NewClient(storage.NewFile("/dev/null"))
97         // Needed to lock for asynchronous piece verification.
98         tor.cl = new(Client)
99         err := tor.setInfoBytes(mi.InfoBytes)
100         require.NoError(t, err)
101         require.Len(t, tor.pieces, 3)
102         tor.pendAllChunkSpecs(0)
103         tor.cl.mu.Lock()
104         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
105         tor.cl.mu.Unlock()
106         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
107 }
108
109 func TestUnmarshalPEXMsg(t *testing.T) {
110         var m peerExchangeMessage
111         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
112                 t.Fatal(err)
113         }
114         if len(m.Added) != 2 {
115                 t.FailNow()
116         }
117         if m.Added[0].Port != 0x506 {
118                 t.FailNow()
119         }
120 }
121
122 func TestReducedDialTimeout(t *testing.T) {
123         for _, _case := range []struct {
124                 Max             time.Duration
125                 HalfOpenLimit   int
126                 PendingPeers    int
127                 ExpectedReduced time.Duration
128         }{
129                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
130                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
131                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
132                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
133                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
134                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
135         } {
136                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
137                 expected := _case.ExpectedReduced
138                 if expected < minDialTimeout {
139                         expected = minDialTimeout
140                 }
141                 if reduced != expected {
142                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
143                 }
144         }
145 }
146
147 func TestUTPRawConn(t *testing.T) {
148         l, err := utp.NewSocket("udp", "")
149         if err != nil {
150                 t.Fatal(err)
151         }
152         defer l.Close()
153         go func() {
154                 for {
155                         _, err := l.Accept()
156                         if err != nil {
157                                 break
158                         }
159                 }
160         }()
161         // Connect a UTP peer to see if the RawConn will still work.
162         s, _ := utp.NewSocket("udp", "")
163         defer s.Close()
164         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
165         if err != nil {
166                 t.Fatalf("error dialing utp listener: %s", err)
167         }
168         defer utpPeer.Close()
169         peer, err := net.ListenPacket("udp", ":0")
170         if err != nil {
171                 t.Fatal(err)
172         }
173         defer peer.Close()
174
175         msgsReceived := 0
176         // How many messages to send. I've set this to double the channel buffer
177         // size in the raw packetConn.
178         const N = 200
179         readerStopped := make(chan struct{})
180         // The reader goroutine.
181         go func() {
182                 defer close(readerStopped)
183                 b := make([]byte, 500)
184                 for i := 0; i < N; i++ {
185                         n, _, err := l.ReadFrom(b)
186                         if err != nil {
187                                 t.Fatalf("error reading from raw conn: %s", err)
188                         }
189                         msgsReceived++
190                         var d int
191                         fmt.Sscan(string(b[:n]), &d)
192                         if d != i {
193                                 log.Printf("got wrong number: expected %d, got %d", i, d)
194                         }
195                 }
196         }()
197         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
198         if err != nil {
199                 t.Fatal(err)
200         }
201         for i := 0; i < N; i++ {
202                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
203                 if err != nil {
204                         t.Fatal(err)
205                 }
206                 time.Sleep(time.Microsecond)
207         }
208         select {
209         case <-readerStopped:
210         case <-time.After(time.Second):
211                 t.Fatal("reader timed out")
212         }
213         if msgsReceived != N {
214                 t.Fatalf("messages received: %d", msgsReceived)
215         }
216 }
217
218 func TestTwoClientsArbitraryPorts(t *testing.T) {
219         for i := 0; i < 2; i++ {
220                 cl, err := NewClient(&TestingConfig)
221                 if err != nil {
222                         t.Fatal(err)
223                 }
224                 defer cl.Close()
225         }
226 }
227
228 func TestAddDropManyTorrents(t *testing.T) {
229         cl, err := NewClient(&TestingConfig)
230         require.NoError(t, err)
231         defer cl.Close()
232         for i := range iter.N(1000) {
233                 var spec TorrentSpec
234                 binary.PutVarint(spec.InfoHash[:], int64(i))
235                 tt, new, err := cl.AddTorrentSpec(&spec)
236                 assert.NoError(t, err)
237                 assert.True(t, new)
238                 defer tt.Drop()
239         }
240 }
241
242 type FileCacheClientStorageFactoryParams struct {
243         Capacity    int64
244         SetCapacity bool
245         Wrapper     func(*filecache.Cache) storage.ClientImpl
246 }
247
248 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
249         return func(dataDir string) storage.ClientImpl {
250                 fc, err := filecache.NewCache(dataDir)
251                 if err != nil {
252                         panic(err)
253                 }
254                 if ps.SetCapacity {
255                         fc.SetCapacity(ps.Capacity)
256                 }
257                 return ps.Wrapper(fc)
258         }
259 }
260
261 type storageFactory func(string) storage.ClientImpl
262
263 func TestClientTransferDefault(t *testing.T) {
264         testClientTransfer(t, testClientTransferParams{
265                 ExportClientStatus: true,
266                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
267                         Wrapper: fileCachePieceResourceStorage,
268                 }),
269         })
270 }
271
272 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
273         return storage.NewResourcePieces(fc.AsResourceProvider())
274 }
275
276 func fileCachePieceFileStorage(fc *filecache.Cache) storage.ClientImpl {
277         return storage.NewFileStorePieces(fc.AsFileStore())
278 }
279
280 func TestClientTransferSmallCache(t *testing.T) {
281         testClientTransfer(t, testClientTransferParams{
282                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
283                         SetCapacity: true,
284                         // Going below the piece length means it can't complete a piece so
285                         // that it can be hashed.
286                         Capacity: 5,
287                         Wrapper:  fileCachePieceResourceStorage,
288                 }),
289                 SetReadahead: true,
290                 // Can't readahead too far or the cache will thrash and drop data we
291                 // thought we had.
292                 Readahead:          0,
293                 ExportClientStatus: true,
294         })
295 }
296
297 func TestClientTransferVarious(t *testing.T) {
298         for _, ls := range []storageFactory{
299                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
300                         Wrapper: fileCachePieceFileStorage,
301                 }),
302                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
303                         Wrapper: fileCachePieceResourceStorage,
304                 }),
305                 storage.NewBoltDB,
306         } {
307                 for _, ss := range []func(string) storage.ClientImpl{
308                         storage.NewFile,
309                         storage.NewMMap,
310                 } {
311                         for _, responsive := range []bool{false, true} {
312                                 testClientTransfer(t, testClientTransferParams{
313                                         Responsive:     responsive,
314                                         SeederStorage:  ss,
315                                         LeecherStorage: ls,
316                                 })
317                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
318                                         testClientTransfer(t, testClientTransferParams{
319                                                 SeederStorage:  ss,
320                                                 Responsive:     responsive,
321                                                 SetReadahead:   true,
322                                                 Readahead:      readahead,
323                                                 LeecherStorage: ls,
324                                         })
325                                 }
326                         }
327                 }
328         }
329 }
330
331 type testClientTransferParams struct {
332         Responsive         bool
333         Readahead          int64
334         SetReadahead       bool
335         ExportClientStatus bool
336         LeecherStorage     func(string) storage.ClientImpl
337         SeederStorage      func(string) storage.ClientImpl
338 }
339
340 // Creates a seeder and a leecher, and ensures the data transfers when a read
341 // is attempted on the leecher.
342 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
343         greetingTempDir, mi := testutil.GreetingTestTorrent()
344         defer os.RemoveAll(greetingTempDir)
345         // Create seeder and a Torrent.
346         cfg := TestingConfig
347         cfg.Seed = true
348         // cfg.ListenAddr = "localhost:4000"
349         if ps.SeederStorage != nil {
350                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
351         } else {
352                 cfg.DataDir = greetingTempDir
353         }
354         seeder, err := NewClient(&cfg)
355         require.NoError(t, err)
356         defer seeder.Close()
357         if ps.ExportClientStatus {
358                 testutil.ExportStatusWriter(seeder, "s")
359         }
360         // seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
361         _, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
362         require.NoError(t, err)
363         assert.True(t, new)
364         // Create leecher and a Torrent.
365         leecherDataDir, err := ioutil.TempDir("", "")
366         require.NoError(t, err)
367         defer os.RemoveAll(leecherDataDir)
368         cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
369         // cfg.ListenAddr = "localhost:4001"
370         leecher, err := NewClient(&cfg)
371         require.NoError(t, err)
372         defer leecher.Close()
373         if ps.ExportClientStatus {
374                 testutil.ExportStatusWriter(leecher, "l")
375         }
376         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
377                 ret = TorrentSpecFromMetaInfo(mi)
378                 ret.ChunkSize = 2
379                 ret.Storage = storage.NewFile(leecherDataDir)
380                 return
381         }())
382         require.NoError(t, err)
383         assert.True(t, new)
384         // Now do some things with leecher and seeder.
385         addClientPeer(leecherGreeting, seeder)
386         r := leecherGreeting.NewReader()
387         defer r.Close()
388         if ps.Responsive {
389                 r.SetResponsive()
390         }
391         if ps.SetReadahead {
392                 r.SetReadahead(ps.Readahead)
393         }
394         assertReadAllGreeting(t, r)
395         // After one read through, we can assume certain torrent statistics.
396         // These are not a strict requirement. It is however interesting to
397         // follow.
398         // t.Logf("%#v", seederTorrent.Stats())
399         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
400         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
401         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
402         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
403         // Read through again for the cases where the torrent data size exceeds
404         // the size of the cache.
405         assertReadAllGreeting(t, r)
406 }
407
408 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
409         pos, err := r.Seek(0, os.SEEK_SET)
410         assert.NoError(t, err)
411         assert.EqualValues(t, 0, pos)
412         _greeting, err := ioutil.ReadAll(r)
413         assert.NoError(t, err)
414         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
415 }
416
417 // Check that after completing leeching, a leecher transitions to a seeding
418 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
419 func TestSeedAfterDownloading(t *testing.T) {
420         greetingTempDir, mi := testutil.GreetingTestTorrent()
421         defer os.RemoveAll(greetingTempDir)
422         cfg := TestingConfig
423         cfg.Seed = true
424         cfg.DataDir = greetingTempDir
425         seeder, err := NewClient(&cfg)
426         require.NoError(t, err)
427         defer seeder.Close()
428         testutil.ExportStatusWriter(seeder, "s")
429         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
430         cfg.DataDir, err = ioutil.TempDir("", "")
431         require.NoError(t, err)
432         defer os.RemoveAll(cfg.DataDir)
433         leecher, err := NewClient(&cfg)
434         require.NoError(t, err)
435         defer leecher.Close()
436         testutil.ExportStatusWriter(leecher, "l")
437         cfg.Seed = false
438         // cfg.TorrentDataOpener = nil
439         cfg.DataDir, err = ioutil.TempDir("", "")
440         require.NoError(t, err)
441         defer os.RemoveAll(cfg.DataDir)
442         leecherLeecher, _ := NewClient(&cfg)
443         defer leecherLeecher.Close()
444         testutil.ExportStatusWriter(leecherLeecher, "ll")
445         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
446                 ret = TorrentSpecFromMetaInfo(mi)
447                 ret.ChunkSize = 2
448                 return
449         }())
450         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
451                 ret = TorrentSpecFromMetaInfo(mi)
452                 ret.ChunkSize = 3
453                 return
454         }())
455         // Simultaneously DownloadAll in Leecher, and read the contents
456         // consecutively in LeecherLeecher. This non-deterministically triggered a
457         // case where the leecher wouldn't unchoke the LeecherLeecher.
458         var wg sync.WaitGroup
459         wg.Add(1)
460         go func() {
461                 defer wg.Done()
462                 r := llg.NewReader()
463                 defer r.Close()
464                 b, err := ioutil.ReadAll(r)
465                 require.NoError(t, err)
466                 assert.EqualValues(t, testutil.GreetingFileContents, b)
467         }()
468         addClientPeer(leecherGreeting, seeder)
469         addClientPeer(leecherGreeting, leecherLeecher)
470         wg.Add(1)
471         go func() {
472                 defer wg.Done()
473                 leecherGreeting.DownloadAll()
474                 leecher.WaitAll()
475         }()
476         wg.Wait()
477 }
478
479 func TestMergingTrackersByAddingSpecs(t *testing.T) {
480         cl, err := NewClient(&TestingConfig)
481         require.NoError(t, err)
482         defer cl.Close()
483         spec := TorrentSpec{}
484         T, new, _ := cl.AddTorrentSpec(&spec)
485         if !new {
486                 t.FailNow()
487         }
488         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
489         _, new, _ = cl.AddTorrentSpec(&spec)
490         assert.False(t, new)
491         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
492         // Because trackers are disabled in TestingConfig.
493         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
494 }
495
496 type badStorage struct{}
497
498 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
499         return bs, nil
500 }
501
502 func (bs badStorage) Close() error {
503         return nil
504 }
505
506 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
507         return badStoragePiece{p}
508 }
509
510 type badStoragePiece struct {
511         p metainfo.Piece
512 }
513
514 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
515         return 0, nil
516 }
517
518 func (p badStoragePiece) GetIsComplete() bool {
519         return true
520 }
521
522 func (p badStoragePiece) MarkComplete() error {
523         return errors.New("psyyyyyyyche")
524 }
525
526 func (p badStoragePiece) MarkNotComplete() error {
527         return errors.New("psyyyyyyyche")
528 }
529
530 func (p badStoragePiece) randomlyTruncatedDataString() string {
531         return "hello, world\n"[:rand.Intn(14)]
532 }
533
534 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
535         r := strings.NewReader(p.randomlyTruncatedDataString())
536         return r.ReadAt(b, off+p.p.Offset())
537 }
538
539 // We read from a piece which is marked completed, but is missing data.
540 func TestCompletedPieceWrongSize(t *testing.T) {
541         cfg := TestingConfig
542         cfg.DefaultStorage = badStorage{}
543         cl, err := NewClient(&cfg)
544         require.NoError(t, err)
545         defer cl.Close()
546         info := metainfo.Info{
547                 PieceLength: 15,
548                 Pieces:      make([]byte, 20),
549                 Files: []metainfo.FileInfo{
550                         metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
551                 },
552         }
553         b, err := bencode.Marshal(info)
554         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
555                 InfoBytes: b,
556                 InfoHash:  metainfo.HashBytes(b),
557         })
558         require.NoError(t, err)
559         defer tt.Drop()
560         assert.True(t, new)
561         r := tt.NewReader()
562         defer r.Close()
563         b, err = ioutil.ReadAll(r)
564         assert.Len(t, b, 13)
565         assert.NoError(t, err)
566 }
567
568 func BenchmarkAddLargeTorrent(b *testing.B) {
569         cfg := TestingConfig
570         cfg.DisableTCP = true
571         cfg.DisableUTP = true
572         cfg.ListenAddr = "redonk"
573         cl, err := NewClient(&cfg)
574         require.NoError(b, err)
575         defer cl.Close()
576         for range iter.N(b.N) {
577                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
578                 if err != nil {
579                         b.Fatal(err)
580                 }
581                 t.Drop()
582         }
583 }
584
585 func TestResponsive(t *testing.T) {
586         seederDataDir, mi := testutil.GreetingTestTorrent()
587         defer os.RemoveAll(seederDataDir)
588         cfg := TestingConfig
589         cfg.Seed = true
590         cfg.DataDir = seederDataDir
591         seeder, err := NewClient(&cfg)
592         require.Nil(t, err)
593         defer seeder.Close()
594         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
595         leecherDataDir, err := ioutil.TempDir("", "")
596         require.Nil(t, err)
597         defer os.RemoveAll(leecherDataDir)
598         cfg = TestingConfig
599         cfg.DataDir = leecherDataDir
600         leecher, err := NewClient(&cfg)
601         require.Nil(t, err)
602         defer leecher.Close()
603         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
604                 ret = TorrentSpecFromMetaInfo(mi)
605                 ret.ChunkSize = 2
606                 return
607         }())
608         addClientPeer(leecherTorrent, seeder)
609         reader := leecherTorrent.NewReader()
610         defer reader.Close()
611         reader.SetReadahead(0)
612         reader.SetResponsive()
613         b := make([]byte, 2)
614         _, err = reader.Seek(3, os.SEEK_SET)
615         require.NoError(t, err)
616         _, err = io.ReadFull(reader, b)
617         assert.Nil(t, err)
618         assert.EqualValues(t, "lo", string(b))
619         _, err = reader.Seek(11, os.SEEK_SET)
620         require.NoError(t, err)
621         n, err := io.ReadFull(reader, b)
622         assert.Nil(t, err)
623         assert.EqualValues(t, 2, n)
624         assert.EqualValues(t, "d\n", string(b))
625 }
626
627 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
628         seederDataDir, mi := testutil.GreetingTestTorrent()
629         defer os.RemoveAll(seederDataDir)
630         cfg := TestingConfig
631         cfg.Seed = true
632         cfg.DataDir = seederDataDir
633         seeder, err := NewClient(&cfg)
634         require.Nil(t, err)
635         defer seeder.Close()
636         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
637         leecherDataDir, err := ioutil.TempDir("", "")
638         require.Nil(t, err)
639         defer os.RemoveAll(leecherDataDir)
640         cfg = TestingConfig
641         cfg.DataDir = leecherDataDir
642         leecher, err := NewClient(&cfg)
643         require.Nil(t, err)
644         defer leecher.Close()
645         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
646                 ret = TorrentSpecFromMetaInfo(mi)
647                 ret.ChunkSize = 2
648                 return
649         }())
650         addClientPeer(leecherTorrent, seeder)
651         reader := leecherTorrent.NewReader()
652         defer reader.Close()
653         reader.SetReadahead(0)
654         reader.SetResponsive()
655         b := make([]byte, 2)
656         _, err = reader.Seek(3, os.SEEK_SET)
657         require.NoError(t, err)
658         _, err = io.ReadFull(reader, b)
659         assert.Nil(t, err)
660         assert.EqualValues(t, "lo", string(b))
661         go leecherTorrent.Drop()
662         _, err = reader.Seek(11, os.SEEK_SET)
663         require.NoError(t, err)
664         n, err := reader.Read(b)
665         assert.EqualError(t, err, "torrent closed")
666         assert.EqualValues(t, 0, n)
667 }
668
669 func TestDHTInheritBlocklist(t *testing.T) {
670         ipl := iplist.New(nil)
671         require.NotNil(t, ipl)
672         cfg := TestingConfig
673         cfg.IPBlocklist = ipl
674         cfg.NoDHT = false
675         cl, err := NewClient(&cfg)
676         require.NoError(t, err)
677         defer cl.Close()
678         require.Equal(t, ipl, cl.DHT().IPBlocklist())
679 }
680
681 // Check that stuff is merged in subsequent AddTorrentSpec for the same
682 // infohash.
683 func TestAddTorrentSpecMerging(t *testing.T) {
684         cl, err := NewClient(&TestingConfig)
685         require.NoError(t, err)
686         defer cl.Close()
687         dir, mi := testutil.GreetingTestTorrent()
688         defer os.RemoveAll(dir)
689         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
690                 InfoHash: mi.HashInfoBytes(),
691         })
692         require.NoError(t, err)
693         require.True(t, new)
694         require.Nil(t, tt.Info())
695         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
696         require.NoError(t, err)
697         require.False(t, new)
698         require.NotNil(t, tt.Info())
699 }
700
701 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
702         dir, mi := testutil.GreetingTestTorrent()
703         os.RemoveAll(dir)
704         cl, _ := NewClient(&TestingConfig)
705         defer cl.Close()
706         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
707                 InfoHash: mi.HashInfoBytes(),
708         })
709         tt.Drop()
710         assert.EqualValues(t, 0, len(cl.Torrents()))
711         select {
712         case <-tt.GotInfo():
713                 t.FailNow()
714         default:
715         }
716 }
717
718 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
719         for i := range iter.N(info.NumPieces()) {
720                 p := info.Piece(i)
721                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
722         }
723 }
724
725 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
726         fileCacheDir, err := ioutil.TempDir("", "")
727         require.NoError(t, err)
728         defer os.RemoveAll(fileCacheDir)
729         fileCache, err := filecache.NewCache(fileCacheDir)
730         require.NoError(t, err)
731         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
732         defer os.RemoveAll(greetingDataTempDir)
733         filePieceStore := csf(fileCache)
734         info, err := greetingMetainfo.UnmarshalInfo()
735         require.NoError(t, err)
736         ih := greetingMetainfo.HashInfoBytes()
737         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
738         require.NoError(t, err)
739         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
740         // require.Equal(t, len(testutil.GreetingFileContents), written)
741         // require.NoError(t, err)
742         for i := 0; i < info.NumPieces(); i++ {
743                 p := info.Piece(i)
744                 if alreadyCompleted {
745                         err := greetingData.Piece(p).MarkComplete()
746                         assert.NoError(t, err)
747                 }
748         }
749         cfg := TestingConfig
750         // TODO: Disable network option?
751         cfg.DisableTCP = true
752         cfg.DisableUTP = true
753         cfg.DefaultStorage = filePieceStore
754         cl, err := NewClient(&cfg)
755         require.NoError(t, err)
756         defer cl.Close()
757         tt, err := cl.AddTorrent(greetingMetainfo)
758         require.NoError(t, err)
759         psrs := tt.PieceStateRuns()
760         assert.Len(t, psrs, 1)
761         assert.EqualValues(t, 3, psrs[0].Length)
762         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
763         if alreadyCompleted {
764                 r := tt.NewReader()
765                 b, err := ioutil.ReadAll(r)
766                 assert.NoError(t, err)
767                 assert.EqualValues(t, testutil.GreetingFileContents, b)
768         }
769 }
770
771 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
772         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceFileStorage)
773         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
774 }
775
776 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
777         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceFileStorage)
778         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
779 }
780
781 func TestAddMetainfoWithNodes(t *testing.T) {
782         cfg := TestingConfig
783         cfg.NoDHT = false
784         // For now, we want to just jam the nodes into the table, without
785         // verifying them first. Also the DHT code doesn't support mixing secure
786         // and insecure nodes if security is enabled (yet).
787         cfg.DHTConfig.NoSecurity = true
788         cl, err := NewClient(&cfg)
789         require.NoError(t, err)
790         defer cl.Close()
791         assert.EqualValues(t, cl.DHT().NumNodes(), 0)
792         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
793         require.NoError(t, err)
794         assert.Len(t, tt.metainfo.AnnounceList, 5)
795         assert.EqualValues(t, 6, cl.DHT().NumNodes())
796 }
797
798 type testDownloadCancelParams struct {
799         ExportClientStatus        bool
800         SetLeecherStorageCapacity bool
801         LeecherStorageCapacity    int64
802         Cancel                    bool
803 }
804
805 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
806         greetingTempDir, mi := testutil.GreetingTestTorrent()
807         defer os.RemoveAll(greetingTempDir)
808         cfg := TestingConfig
809         cfg.Seed = true
810         cfg.DataDir = greetingTempDir
811         seeder, err := NewClient(&cfg)
812         require.NoError(t, err)
813         defer seeder.Close()
814         if ps.ExportClientStatus {
815                 testutil.ExportStatusWriter(seeder, "s")
816         }
817         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
818         leecherDataDir, err := ioutil.TempDir("", "")
819         require.NoError(t, err)
820         defer os.RemoveAll(leecherDataDir)
821         fc, err := filecache.NewCache(leecherDataDir)
822         require.NoError(t, err)
823         if ps.SetLeecherStorageCapacity {
824                 fc.SetCapacity(ps.LeecherStorageCapacity)
825         }
826         cfg.DefaultStorage = storage.NewFileStorePieces(fc.AsFileStore())
827         cfg.DataDir = leecherDataDir
828         leecher, _ := NewClient(&cfg)
829         defer leecher.Close()
830         if ps.ExportClientStatus {
831                 testutil.ExportStatusWriter(leecher, "l")
832         }
833         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
834                 ret = TorrentSpecFromMetaInfo(mi)
835                 ret.ChunkSize = 2
836                 return
837         }())
838         require.NoError(t, err)
839         assert.True(t, new)
840         psc := leecherGreeting.SubscribePieceStateChanges()
841         defer psc.Close()
842         leecherGreeting.DownloadAll()
843         if ps.Cancel {
844                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
845         }
846         addClientPeer(leecherGreeting, seeder)
847         completes := make(map[int]bool, 3)
848 values:
849         for {
850                 // started := time.Now()
851                 select {
852                 case _v := <-psc.Values:
853                         // log.Print(time.Since(started))
854                         v := _v.(PieceStateChange)
855                         completes[v.Index] = v.Complete
856                 case <-time.After(100 * time.Millisecond):
857                         break values
858                 }
859         }
860         if ps.Cancel {
861                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
862         } else {
863                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
864         }
865
866 }
867
868 func TestTorrentDownloadAll(t *testing.T) {
869         testDownloadCancel(t, testDownloadCancelParams{})
870 }
871
872 func TestTorrentDownloadAllThenCancel(t *testing.T) {
873         testDownloadCancel(t, testDownloadCancelParams{
874                 Cancel: true,
875         })
876 }
877
878 // Ensure that it's an error for a peer to send an invalid have message.
879 func TestPeerInvalidHave(t *testing.T) {
880         cl, err := NewClient(&TestingConfig)
881         require.NoError(t, err)
882         defer cl.Close()
883         info := metainfo.Info{
884                 PieceLength: 1,
885                 Pieces:      make([]byte, 20),
886                 Files:       []metainfo.FileInfo{{Length: 1}},
887         }
888         infoBytes, err := bencode.Marshal(info)
889         require.NoError(t, err)
890         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
891                 InfoBytes: infoBytes,
892                 InfoHash:  metainfo.HashBytes(infoBytes),
893         })
894         require.NoError(t, err)
895         assert.True(t, _new)
896         defer tt.Drop()
897         cn := &connection{
898                 t: tt,
899         }
900         assert.NoError(t, cn.peerSentHave(0))
901         assert.Error(t, cn.peerSentHave(1))
902 }
903
904 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
905         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
906         defer os.RemoveAll(greetingTempDir)
907         cfg := TestingConfig
908         cfg.DataDir = greetingTempDir
909         seeder, err := NewClient(&TestingConfig)
910         require.NoError(t, err)
911         seeder.AddTorrentSpec(&TorrentSpec{
912                 InfoBytes: greetingMetainfo.InfoBytes,
913         })
914 }
915
916 func TestPrepareTrackerAnnounce(t *testing.T) {
917         cl := &Client{}
918         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
919         require.NoError(t, err)
920         assert.False(t, blocked)
921         assert.EqualValues(t, "localhost:1234", host)
922         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
923 }
924
925 // Check that when the listen port is 0, all the protocols listened on have
926 // the same port, and it isn't zero.
927 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
928         cl, err := NewClient(&TestingConfig)
929         require.NoError(t, err)
930         defer cl.Close()
931         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
932         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
933 }
934
935 func TestClientDynamicListenTCPOnly(t *testing.T) {
936         cfg := TestingConfig
937         cfg.DisableUTP = true
938         cl, err := NewClient(&cfg)
939         require.NoError(t, err)
940         defer cl.Close()
941         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
942         assert.Nil(t, cl.utpSock)
943 }
944
945 func TestClientDynamicListenUTPOnly(t *testing.T) {
946         cfg := TestingConfig
947         cfg.DisableTCP = true
948         cl, err := NewClient(&cfg)
949         require.NoError(t, err)
950         defer cl.Close()
951         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
952         assert.Nil(t, cl.tcpListener)
953 }
954
955 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
956         cfg := TestingConfig
957         cfg.DisableTCP = true
958         cfg.DisableUTP = true
959         cl, err := NewClient(&cfg)
960         require.NoError(t, err)
961         defer cl.Close()
962         assert.Nil(t, cl.ListenAddr())
963 }
964
965 func addClientPeer(t *Torrent, cl *Client) {
966         t.AddPeers([]Peer{
967                 Peer{
968                         IP:   missinggo.AddrIP(cl.ListenAddr()),
969                         Port: missinggo.AddrPort(cl.ListenAddr()),
970                 },
971         })
972 }
973
974 func printConnPeerCounts(t *Torrent) {
975         t.cl.mu.Lock()
976         log.Println(len(t.conns), len(t.peers))
977         t.cl.mu.Unlock()
978 }
979
980 func totalConns(tts []*Torrent) (ret int) {
981         for _, tt := range tts {
982                 tt.cl.mu.Lock()
983                 ret += len(tt.conns)
984                 tt.cl.mu.Unlock()
985         }
986         return
987 }
988
989 func TestSetMaxEstablishedConn(t *testing.T) {
990         var tts []*Torrent
991         ih := testutil.GreetingMetaInfo().HashInfoBytes()
992         cfg := TestingConfig
993         for i := range iter.N(3) {
994                 cl, err := NewClient(&cfg)
995                 require.NoError(t, err)
996                 defer cl.Close()
997                 tt, _ := cl.AddTorrentInfoHash(ih)
998                 tt.SetMaxEstablishedConns(2)
999                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1000                 tts = append(tts, tt)
1001         }
1002         addPeers := func() {
1003                 for i, tt := range tts {
1004                         for _, _tt := range tts[:i] {
1005                                 addClientPeer(tt, _tt.cl)
1006                         }
1007                 }
1008         }
1009         waitTotalConns := func(num int) {
1010                 for totalConns(tts) != num {
1011                         time.Sleep(time.Millisecond)
1012                 }
1013         }
1014         addPeers()
1015         waitTotalConns(6)
1016         tts[0].SetMaxEstablishedConns(1)
1017         waitTotalConns(4)
1018         tts[0].SetMaxEstablishedConns(0)
1019         waitTotalConns(2)
1020         tts[0].SetMaxEstablishedConns(1)
1021         addPeers()
1022         waitTotalConns(4)
1023         tts[0].SetMaxEstablishedConns(2)
1024         addPeers()
1025         waitTotalConns(6)
1026 }
1027
1028 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1029         file, err := os.Create(filepath.Join(dir, name))
1030         require.NoError(t, err)
1031         file.Write([]byte(name))
1032         file.Close()
1033         mi := metainfo.MetaInfo{}
1034         mi.SetDefaults()
1035         info := metainfo.Info{PieceLength: 256 * 1024}
1036         err = info.BuildFromFilePath(filepath.Join(dir, name))
1037         require.NoError(t, err)
1038         mi.InfoBytes, err = bencode.Marshal(info)
1039         require.NoError(t, err)
1040         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1041         tr, err := cl.AddTorrent(&mi)
1042         require.NoError(t, err)
1043         assert.True(t, tr.Seeding())
1044         return magnet
1045 }
1046
1047 // https://github.com/anacrolix/torrent/issues/114
1048 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1049         cfg := TestingConfig
1050         cfg.DisableUTP = true
1051         cfg.Seed = true
1052         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1053         cfg.Debug = true
1054         cfg.ForceEncryption = true
1055         os.Mkdir(cfg.DataDir, 0755)
1056         server, err := NewClient(&cfg)
1057         require.NoError(t, err)
1058         defer server.Close()
1059         testutil.ExportStatusWriter(server, "s")
1060         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1061         makeMagnet(t, server, cfg.DataDir, "test2")
1062         cfg = TestingConfig
1063         cfg.DisableUTP = true
1064         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1065         cfg.Debug = true
1066         cfg.ForceEncryption = true
1067         client, err := NewClient(&cfg)
1068         require.NoError(t, err)
1069         defer client.Close()
1070         testutil.ExportStatusWriter(client, "c")
1071         tr, err := client.AddMagnet(magnet1)
1072         require.NoError(t, err)
1073         tr.AddPeers([]Peer{Peer{
1074                 IP:   missinggo.AddrIP(server.ListenAddr()),
1075                 Port: missinggo.AddrPort(server.ListenAddr()),
1076         }})
1077         <-tr.GotInfo()
1078         tr.DownloadAll()
1079         client.WaitAll()
1080 }