]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Add more thorough tests for Torrent.SetMaxEstablishedConns
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "math/rand"
11         "net"
12         "os"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/anacrolix/utp"
23         "github.com/bradfitz/iter"
24         "github.com/stretchr/testify/assert"
25         "github.com/stretchr/testify/require"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/dht"
29         "github.com/anacrolix/torrent/internal/testutil"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32         "github.com/anacrolix/torrent/storage"
33 )
34
35 func init() {
36         log.SetFlags(log.LstdFlags | log.Llongfile)
37 }
38
39 var TestingConfig = Config{
40         ListenAddr:      "localhost:0",
41         NoDHT:           true,
42         DisableTrackers: true,
43         DataDir:         "/dev/null",
44         DHTConfig: dht.ServerConfig{
45                 NoDefaultBootstrap: true,
46         },
47 }
48
49 func TestClientDefault(t *testing.T) {
50         cl, err := NewClient(&TestingConfig)
51         require.NoError(t, err)
52         cl.Close()
53 }
54
55 func TestAddDropTorrent(t *testing.T) {
56         cl, err := NewClient(&TestingConfig)
57         require.NoError(t, err)
58         defer cl.Close()
59         dir, mi := testutil.GreetingTestTorrent()
60         defer os.RemoveAll(dir)
61         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
62         require.NoError(t, err)
63         assert.True(t, new)
64         tt.SetMaxEstablishedConns(0)
65         tt.SetMaxEstablishedConns(1)
66         tt.Drop()
67 }
68
69 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
70         t.SkipNow()
71 }
72
73 func TestAddTorrentNoUsableURLs(t *testing.T) {
74         t.SkipNow()
75 }
76
77 func TestAddPeersToUnknownTorrent(t *testing.T) {
78         t.SkipNow()
79 }
80
81 func TestPieceHashSize(t *testing.T) {
82         if pieceHash.Size() != 20 {
83                 t.FailNow()
84         }
85 }
86
87 func TestTorrentInitialState(t *testing.T) {
88         dir, mi := testutil.GreetingTestTorrent()
89         defer os.RemoveAll(dir)
90         tor := &Torrent{
91                 infoHash:          mi.Info.Hash(),
92                 pieceStateChanges: pubsub.NewPubSub(),
93         }
94         tor.chunkSize = 2
95         tor.storageOpener = storage.NewFile("/dev/null")
96         // Needed to lock for asynchronous piece verification.
97         tor.cl = new(Client)
98         err := tor.setInfoBytes(mi.Info.Bytes)
99         require.NoError(t, err)
100         require.Len(t, tor.pieces, 3)
101         tor.pendAllChunkSpecs(0)
102         tor.cl.mu.Lock()
103         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
104         tor.cl.mu.Unlock()
105         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
106 }
107
108 func TestUnmarshalPEXMsg(t *testing.T) {
109         var m peerExchangeMessage
110         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
111                 t.Fatal(err)
112         }
113         if len(m.Added) != 2 {
114                 t.FailNow()
115         }
116         if m.Added[0].Port != 0x506 {
117                 t.FailNow()
118         }
119 }
120
121 func TestReducedDialTimeout(t *testing.T) {
122         for _, _case := range []struct {
123                 Max             time.Duration
124                 HalfOpenLimit   int
125                 PendingPeers    int
126                 ExpectedReduced time.Duration
127         }{
128                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
129                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
130                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
131                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
132                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
133                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
134         } {
135                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
136                 expected := _case.ExpectedReduced
137                 if expected < minDialTimeout {
138                         expected = minDialTimeout
139                 }
140                 if reduced != expected {
141                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
142                 }
143         }
144 }
145
146 func TestUTPRawConn(t *testing.T) {
147         l, err := utp.NewSocket("udp", "")
148         if err != nil {
149                 t.Fatal(err)
150         }
151         defer l.Close()
152         go func() {
153                 for {
154                         _, err := l.Accept()
155                         if err != nil {
156                                 break
157                         }
158                 }
159         }()
160         // Connect a UTP peer to see if the RawConn will still work.
161         s, _ := utp.NewSocket("udp", "")
162         defer s.Close()
163         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
164         if err != nil {
165                 t.Fatalf("error dialing utp listener: %s", err)
166         }
167         defer utpPeer.Close()
168         peer, err := net.ListenPacket("udp", ":0")
169         if err != nil {
170                 t.Fatal(err)
171         }
172         defer peer.Close()
173
174         msgsReceived := 0
175         // How many messages to send. I've set this to double the channel buffer
176         // size in the raw packetConn.
177         const N = 200
178         readerStopped := make(chan struct{})
179         // The reader goroutine.
180         go func() {
181                 defer close(readerStopped)
182                 b := make([]byte, 500)
183                 for i := 0; i < N; i++ {
184                         n, _, err := l.ReadFrom(b)
185                         if err != nil {
186                                 t.Fatalf("error reading from raw conn: %s", err)
187                         }
188                         msgsReceived++
189                         var d int
190                         fmt.Sscan(string(b[:n]), &d)
191                         if d != i {
192                                 log.Printf("got wrong number: expected %d, got %d", i, d)
193                         }
194                 }
195         }()
196         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
197         if err != nil {
198                 t.Fatal(err)
199         }
200         for i := 0; i < N; i++ {
201                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
202                 if err != nil {
203                         t.Fatal(err)
204                 }
205                 time.Sleep(time.Microsecond)
206         }
207         select {
208         case <-readerStopped:
209         case <-time.After(time.Second):
210                 t.Fatal("reader timed out")
211         }
212         if msgsReceived != N {
213                 t.Fatalf("messages received: %d", msgsReceived)
214         }
215 }
216
217 func TestTwoClientsArbitraryPorts(t *testing.T) {
218         for i := 0; i < 2; i++ {
219                 cl, err := NewClient(&TestingConfig)
220                 if err != nil {
221                         t.Fatal(err)
222                 }
223                 defer cl.Close()
224         }
225 }
226
227 func TestAddDropManyTorrents(t *testing.T) {
228         cl, err := NewClient(&TestingConfig)
229         require.NoError(t, err)
230         defer cl.Close()
231         for i := range iter.N(1000) {
232                 var spec TorrentSpec
233                 binary.PutVarint(spec.InfoHash[:], int64(i))
234                 tt, new, err := cl.AddTorrentSpec(&spec)
235                 assert.NoError(t, err)
236                 assert.True(t, new)
237                 defer tt.Drop()
238         }
239 }
240
241 func TestClientTransferDefault(t *testing.T) {
242         testClientTransfer(t, testClientTransferParams{
243                 ExportClientStatus:                  true,
244                 LeecherFileCachePieceStorageFactory: fileCachePieceResourceStorage,
245         })
246 }
247
248 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.Client {
249         return storage.NewResourcePieces(fc.AsResourceProvider())
250 }
251
252 func fileCachePieceFileStorage(fc *filecache.Cache) storage.Client {
253         return storage.NewFileStorePieces(fc.AsFileStore())
254 }
255
256 func TestClientTransferSmallCache(t *testing.T) {
257         testClientTransfer(t, testClientTransferParams{
258                 SetLeecherStorageCapacity: true,
259                 // Going below the piece length means it can't complete a piece so
260                 // that it can be hashed.
261                 LeecherStorageCapacity: 5,
262                 SetReadahead:           true,
263                 // Can't readahead too far or the cache will thrash and drop data we
264                 // thought we had.
265                 Readahead:                           0,
266                 ExportClientStatus:                  true,
267                 LeecherFileCachePieceStorageFactory: fileCachePieceResourceStorage,
268         })
269 }
270
271 func TestClientTransferVarious(t *testing.T) {
272         for _, lsf := range []func(*filecache.Cache) storage.Client{
273                 fileCachePieceFileStorage,
274                 fileCachePieceResourceStorage,
275         } {
276                 for _, ss := range []func(string) storage.Client{
277                         storage.NewFile,
278                         storage.NewMMap,
279                 } {
280                         for _, responsive := range []bool{false, true} {
281                                 testClientTransfer(t, testClientTransferParams{
282                                         Responsive:                          responsive,
283                                         SeederStorage:                       ss,
284                                         LeecherFileCachePieceStorageFactory: lsf,
285                                 })
286                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
287                                         testClientTransfer(t, testClientTransferParams{
288                                                 SeederStorage:                       ss,
289                                                 Responsive:                          responsive,
290                                                 SetReadahead:                        true,
291                                                 Readahead:                           readahead,
292                                                 LeecherFileCachePieceStorageFactory: lsf,
293                                         })
294                                 }
295                         }
296                 }
297         }
298 }
299
300 type testClientTransferParams struct {
301         Responsive                          bool
302         Readahead                           int64
303         SetReadahead                        bool
304         ExportClientStatus                  bool
305         SetLeecherStorageCapacity           bool
306         LeecherStorageCapacity              int64
307         LeecherFileCachePieceStorageFactory func(*filecache.Cache) storage.Client
308         SeederStorage                       func(string) storage.Client
309 }
310
311 // Creates a seeder and a leecher, and ensures the data transfers when a read
312 // is attempted on the leecher.
313 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
314         greetingTempDir, mi := testutil.GreetingTestTorrent()
315         defer os.RemoveAll(greetingTempDir)
316         cfg := TestingConfig
317         cfg.Seed = true
318         if ps.SeederStorage != nil {
319                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
320         } else {
321                 cfg.DataDir = greetingTempDir
322         }
323         seeder, err := NewClient(&cfg)
324         require.NoError(t, err)
325         defer seeder.Close()
326         if ps.ExportClientStatus {
327                 testutil.ExportStatusWriter(seeder, "s")
328         }
329         seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
330         require.NoError(t, err)
331         assert.True(t, new)
332         leecherDataDir, err := ioutil.TempDir("", "")
333         require.NoError(t, err)
334         defer os.RemoveAll(leecherDataDir)
335         fc, err := filecache.NewCache(leecherDataDir)
336         require.NoError(t, err)
337         if ps.SetLeecherStorageCapacity {
338                 fc.SetCapacity(ps.LeecherStorageCapacity)
339         }
340         cfg.DefaultStorage = ps.LeecherFileCachePieceStorageFactory(fc)
341         leecher, err := NewClient(&cfg)
342         require.NoError(t, err)
343         defer leecher.Close()
344         if ps.ExportClientStatus {
345                 testutil.ExportStatusWriter(leecher, "l")
346         }
347         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
348                 ret = TorrentSpecFromMetaInfo(mi)
349                 ret.ChunkSize = 2
350                 ret.Storage = storage.NewFile(leecherDataDir)
351                 return
352         }())
353         require.NoError(t, err)
354         assert.True(t, new)
355         leecherGreeting.AddPeers([]Peer{
356                 Peer{
357                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
358                         Port: missinggo.AddrPort(seeder.ListenAddr()),
359                 },
360         })
361         r := leecherGreeting.NewReader()
362         defer r.Close()
363         if ps.Responsive {
364                 r.SetResponsive()
365         }
366         if ps.SetReadahead {
367                 r.SetReadahead(ps.Readahead)
368         }
369         assertReadAllGreeting(t, r)
370         // After one read through, we can assume certain torrent statistics.
371         assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesSent)
372         assert.EqualValues(t, 8, seederTorrent.Stats().ChunksSent)
373         // This is not a strict requirement. It is however interesting to follow.
374         assert.EqualValues(t, 261, seederTorrent.Stats().BytesSent)
375         // Read through again for the cases where the torrent data size exceed the
376         // size of the cache.
377         assertReadAllGreeting(t, r)
378 }
379
380 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
381         pos, err := r.Seek(0, os.SEEK_SET)
382         assert.NoError(t, err)
383         assert.EqualValues(t, 0, pos)
384         _greeting, err := ioutil.ReadAll(r)
385         assert.NoError(t, err)
386         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
387 }
388
389 // Check that after completing leeching, a leecher transitions to a seeding
390 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
391 func TestSeedAfterDownloading(t *testing.T) {
392         greetingTempDir, mi := testutil.GreetingTestTorrent()
393         defer os.RemoveAll(greetingTempDir)
394         cfg := TestingConfig
395         cfg.Seed = true
396         cfg.DataDir = greetingTempDir
397         seeder, err := NewClient(&cfg)
398         require.NoError(t, err)
399         defer seeder.Close()
400         testutil.ExportStatusWriter(seeder, "s")
401         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
402         cfg.DataDir, err = ioutil.TempDir("", "")
403         require.NoError(t, err)
404         defer os.RemoveAll(cfg.DataDir)
405         leecher, err := NewClient(&cfg)
406         require.NoError(t, err)
407         defer leecher.Close()
408         testutil.ExportStatusWriter(leecher, "l")
409         cfg.Seed = false
410         // cfg.TorrentDataOpener = nil
411         cfg.DataDir, err = ioutil.TempDir("", "")
412         require.NoError(t, err)
413         defer os.RemoveAll(cfg.DataDir)
414         leecherLeecher, _ := NewClient(&cfg)
415         defer leecherLeecher.Close()
416         testutil.ExportStatusWriter(leecherLeecher, "ll")
417         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
418                 ret = TorrentSpecFromMetaInfo(mi)
419                 ret.ChunkSize = 2
420                 return
421         }())
422         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
423                 ret = TorrentSpecFromMetaInfo(mi)
424                 ret.ChunkSize = 3
425                 return
426         }())
427         // Simultaneously DownloadAll in Leecher, and read the contents
428         // consecutively in LeecherLeecher. This non-deterministically triggered a
429         // case where the leecher wouldn't unchoke the LeecherLeecher.
430         var wg sync.WaitGroup
431         wg.Add(1)
432         go func() {
433                 defer wg.Done()
434                 r := llg.NewReader()
435                 defer r.Close()
436                 b, err := ioutil.ReadAll(r)
437                 require.NoError(t, err)
438                 assert.EqualValues(t, testutil.GreetingFileContents, b)
439         }()
440         leecherGreeting.AddPeers([]Peer{
441                 Peer{
442                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
443                         Port: missinggo.AddrPort(seeder.ListenAddr()),
444                 },
445                 Peer{
446                         IP:   missinggo.AddrIP(leecherLeecher.ListenAddr()),
447                         Port: missinggo.AddrPort(leecherLeecher.ListenAddr()),
448                 },
449         })
450         wg.Add(1)
451         go func() {
452                 defer wg.Done()
453                 leecherGreeting.DownloadAll()
454                 leecher.WaitAll()
455         }()
456         wg.Wait()
457 }
458
459 func TestMergingTrackersByAddingSpecs(t *testing.T) {
460         cl, err := NewClient(&TestingConfig)
461         require.NoError(t, err)
462         defer cl.Close()
463         spec := TorrentSpec{}
464         T, new, _ := cl.AddTorrentSpec(&spec)
465         if !new {
466                 t.FailNow()
467         }
468         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
469         _, new, _ = cl.AddTorrentSpec(&spec)
470         assert.False(t, new)
471         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
472         // Because trackers are disabled in TestingConfig.
473         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
474 }
475
476 type badStorage struct{}
477
478 func (bs badStorage) OpenTorrent(*metainfo.InfoEx) (storage.Torrent, error) {
479         return bs, nil
480 }
481
482 func (bs badStorage) Close() error {
483         return nil
484 }
485
486 func (bs badStorage) Piece(p metainfo.Piece) storage.Piece {
487         return badStoragePiece{p}
488 }
489
490 type badStoragePiece struct {
491         p metainfo.Piece
492 }
493
494 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
495         return 0, nil
496 }
497
498 func (p badStoragePiece) GetIsComplete() bool {
499         return true
500 }
501
502 func (p badStoragePiece) MarkComplete() error {
503         return errors.New("psyyyyyyyche")
504 }
505
506 func (p badStoragePiece) randomlyTruncatedDataString() string {
507         return "hello, world\n"[:rand.Intn(14)]
508 }
509
510 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
511         r := strings.NewReader(p.randomlyTruncatedDataString())
512         return r.ReadAt(b, off+p.p.Offset())
513 }
514
515 // We read from a piece which is marked completed, but is missing data.
516 func TestCompletedPieceWrongSize(t *testing.T) {
517         cfg := TestingConfig
518         cfg.DefaultStorage = badStorage{}
519         cl, err := NewClient(&cfg)
520         require.NoError(t, err)
521         defer cl.Close()
522         ie := metainfo.InfoEx{
523                 Info: metainfo.Info{
524                         PieceLength: 15,
525                         Pieces:      make([]byte, 20),
526                         Files: []metainfo.FileInfo{
527                                 metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
528                         },
529                 },
530         }
531         ie.UpdateBytes()
532         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
533                 Info:     &ie,
534                 InfoHash: ie.Hash(),
535         })
536         require.NoError(t, err)
537         defer tt.Drop()
538         assert.True(t, new)
539         r := tt.NewReader()
540         defer r.Close()
541         b, err := ioutil.ReadAll(r)
542         assert.Len(t, b, 13)
543         assert.NoError(t, err)
544 }
545
546 func BenchmarkAddLargeTorrent(b *testing.B) {
547         cfg := TestingConfig
548         cfg.DisableTCP = true
549         cfg.DisableUTP = true
550         cfg.ListenAddr = "redonk"
551         cl, _ := NewClient(&cfg)
552         defer cl.Close()
553         for range iter.N(b.N) {
554                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
555                 if err != nil {
556                         b.Fatal(err)
557                 }
558                 t.Drop()
559         }
560 }
561
562 func TestResponsive(t *testing.T) {
563         seederDataDir, mi := testutil.GreetingTestTorrent()
564         defer os.RemoveAll(seederDataDir)
565         cfg := TestingConfig
566         cfg.Seed = true
567         cfg.DataDir = seederDataDir
568         seeder, err := NewClient(&cfg)
569         require.Nil(t, err)
570         defer seeder.Close()
571         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
572         leecherDataDir, err := ioutil.TempDir("", "")
573         require.Nil(t, err)
574         defer os.RemoveAll(leecherDataDir)
575         cfg = TestingConfig
576         cfg.DataDir = leecherDataDir
577         leecher, err := NewClient(&cfg)
578         require.Nil(t, err)
579         defer leecher.Close()
580         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
581                 ret = TorrentSpecFromMetaInfo(mi)
582                 ret.ChunkSize = 2
583                 return
584         }())
585         leecherTorrent.AddPeers([]Peer{
586                 Peer{
587                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
588                         Port: missinggo.AddrPort(seeder.ListenAddr()),
589                 },
590         })
591         reader := leecherTorrent.NewReader()
592         defer reader.Close()
593         reader.SetReadahead(0)
594         reader.SetResponsive()
595         b := make([]byte, 2)
596         _, err = reader.Seek(3, os.SEEK_SET)
597         require.NoError(t, err)
598         _, err = io.ReadFull(reader, b)
599         assert.Nil(t, err)
600         assert.EqualValues(t, "lo", string(b))
601         _, err = reader.Seek(11, os.SEEK_SET)
602         require.NoError(t, err)
603         n, err := io.ReadFull(reader, b)
604         assert.Nil(t, err)
605         assert.EqualValues(t, 2, n)
606         assert.EqualValues(t, "d\n", string(b))
607 }
608
609 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
610         seederDataDir, mi := testutil.GreetingTestTorrent()
611         defer os.RemoveAll(seederDataDir)
612         cfg := TestingConfig
613         cfg.Seed = true
614         cfg.DataDir = seederDataDir
615         seeder, err := NewClient(&cfg)
616         require.Nil(t, err)
617         defer seeder.Close()
618         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
619         leecherDataDir, err := ioutil.TempDir("", "")
620         require.Nil(t, err)
621         defer os.RemoveAll(leecherDataDir)
622         cfg = TestingConfig
623         cfg.DataDir = leecherDataDir
624         leecher, err := NewClient(&cfg)
625         require.Nil(t, err)
626         defer leecher.Close()
627         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
628                 ret = TorrentSpecFromMetaInfo(mi)
629                 ret.ChunkSize = 2
630                 return
631         }())
632         leecherTorrent.AddPeers([]Peer{
633                 Peer{
634                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
635                         Port: missinggo.AddrPort(seeder.ListenAddr()),
636                 },
637         })
638         reader := leecherTorrent.NewReader()
639         defer reader.Close()
640         reader.SetReadahead(0)
641         reader.SetResponsive()
642         b := make([]byte, 2)
643         _, err = reader.Seek(3, os.SEEK_SET)
644         require.NoError(t, err)
645         _, err = io.ReadFull(reader, b)
646         assert.Nil(t, err)
647         assert.EqualValues(t, "lo", string(b))
648         go leecherTorrent.Drop()
649         _, err = reader.Seek(11, os.SEEK_SET)
650         require.NoError(t, err)
651         n, err := reader.Read(b)
652         assert.EqualError(t, err, "torrent closed")
653         assert.EqualValues(t, 0, n)
654 }
655
656 func TestDHTInheritBlocklist(t *testing.T) {
657         ipl := iplist.New(nil)
658         require.NotNil(t, ipl)
659         cfg := TestingConfig
660         cfg.IPBlocklist = ipl
661         cfg.NoDHT = false
662         cl, err := NewClient(&cfg)
663         require.NoError(t, err)
664         defer cl.Close()
665         require.Equal(t, ipl, cl.DHT().IPBlocklist())
666 }
667
668 // Check that stuff is merged in subsequent AddTorrentSpec for the same
669 // infohash.
670 func TestAddTorrentSpecMerging(t *testing.T) {
671         cl, err := NewClient(&TestingConfig)
672         require.NoError(t, err)
673         defer cl.Close()
674         dir, mi := testutil.GreetingTestTorrent()
675         defer os.RemoveAll(dir)
676         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
677                 InfoHash: mi.Info.Hash(),
678         })
679         require.NoError(t, err)
680         require.True(t, new)
681         require.Nil(t, tt.Info())
682         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
683         require.NoError(t, err)
684         require.False(t, new)
685         require.NotNil(t, tt.Info())
686 }
687
688 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
689         dir, mi := testutil.GreetingTestTorrent()
690         os.RemoveAll(dir)
691         cl, _ := NewClient(&TestingConfig)
692         defer cl.Close()
693         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
694                 InfoHash: mi.Info.Hash(),
695         })
696         tt.Drop()
697         assert.EqualValues(t, 0, len(cl.Torrents()))
698         select {
699         case <-tt.GotInfo():
700                 t.FailNow()
701         default:
702         }
703 }
704
705 func writeTorrentData(ts storage.Torrent, info *metainfo.InfoEx, b []byte) {
706         for i := range iter.N(info.NumPieces()) {
707                 n, _ := ts.Piece(info.Piece(i)).WriteAt(b, 0)
708                 b = b[n:]
709         }
710 }
711
712 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.Client) {
713         fileCacheDir, err := ioutil.TempDir("", "")
714         require.NoError(t, err)
715         defer os.RemoveAll(fileCacheDir)
716         fileCache, err := filecache.NewCache(fileCacheDir)
717         require.NoError(t, err)
718         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
719         defer os.RemoveAll(greetingDataTempDir)
720         filePieceStore := csf(fileCache)
721         greetingData, err := filePieceStore.OpenTorrent(&greetingMetainfo.Info)
722         require.NoError(t, err)
723         writeTorrentData(greetingData, &greetingMetainfo.Info, []byte(testutil.GreetingFileContents))
724         // require.Equal(t, len(testutil.GreetingFileContents), written)
725         // require.NoError(t, err)
726         for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
727                 p := greetingMetainfo.Info.Piece(i)
728                 if alreadyCompleted {
729                         err := greetingData.Piece(p).MarkComplete()
730                         assert.NoError(t, err)
731                 }
732         }
733         cfg := TestingConfig
734         // TODO: Disable network option?
735         cfg.DisableTCP = true
736         cfg.DisableUTP = true
737         cfg.DefaultStorage = filePieceStore
738         cl, err := NewClient(&cfg)
739         require.NoError(t, err)
740         defer cl.Close()
741         tt, err := cl.AddTorrent(greetingMetainfo)
742         require.NoError(t, err)
743         psrs := tt.PieceStateRuns()
744         assert.Len(t, psrs, 1)
745         assert.EqualValues(t, 3, psrs[0].Length)
746         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
747         if alreadyCompleted {
748                 r := tt.NewReader()
749                 b, err := ioutil.ReadAll(r)
750                 assert.NoError(t, err)
751                 assert.EqualValues(t, testutil.GreetingFileContents, b)
752         }
753 }
754
755 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
756         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceFileStorage)
757         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
758 }
759
760 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
761         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceFileStorage)
762         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
763 }
764
765 func TestAddMetainfoWithNodes(t *testing.T) {
766         cfg := TestingConfig
767         cfg.NoDHT = false
768         // For now, we want to just jam the nodes into the table, without
769         // verifying them first. Also the DHT code doesn't support mixing secure
770         // and insecure nodes if security is enabled (yet).
771         cfg.DHTConfig.NoSecurity = true
772         cl, err := NewClient(&cfg)
773         require.NoError(t, err)
774         defer cl.Close()
775         assert.EqualValues(t, cl.DHT().NumNodes(), 0)
776         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
777         require.NoError(t, err)
778         assert.Len(t, tt.metainfo.AnnounceList, 5)
779         assert.EqualValues(t, 6, cl.DHT().NumNodes())
780 }
781
782 type testDownloadCancelParams struct {
783         ExportClientStatus        bool
784         SetLeecherStorageCapacity bool
785         LeecherStorageCapacity    int64
786         Cancel                    bool
787 }
788
789 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
790         greetingTempDir, mi := testutil.GreetingTestTorrent()
791         defer os.RemoveAll(greetingTempDir)
792         cfg := TestingConfig
793         cfg.Seed = true
794         cfg.DataDir = greetingTempDir
795         seeder, err := NewClient(&cfg)
796         require.NoError(t, err)
797         defer seeder.Close()
798         if ps.ExportClientStatus {
799                 testutil.ExportStatusWriter(seeder, "s")
800         }
801         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
802         leecherDataDir, err := ioutil.TempDir("", "")
803         require.NoError(t, err)
804         defer os.RemoveAll(leecherDataDir)
805         fc, err := filecache.NewCache(leecherDataDir)
806         require.NoError(t, err)
807         if ps.SetLeecherStorageCapacity {
808                 fc.SetCapacity(ps.LeecherStorageCapacity)
809         }
810         cfg.DefaultStorage = storage.NewFileStorePieces(fc.AsFileStore())
811         cfg.DataDir = leecherDataDir
812         leecher, _ := NewClient(&cfg)
813         defer leecher.Close()
814         if ps.ExportClientStatus {
815                 testutil.ExportStatusWriter(leecher, "l")
816         }
817         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
818                 ret = TorrentSpecFromMetaInfo(mi)
819                 ret.ChunkSize = 2
820                 return
821         }())
822         require.NoError(t, err)
823         assert.True(t, new)
824         psc := leecherGreeting.SubscribePieceStateChanges()
825         defer psc.Close()
826         leecherGreeting.DownloadAll()
827         if ps.Cancel {
828                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
829         }
830         leecherGreeting.AddPeers([]Peer{
831                 Peer{
832                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
833                         Port: missinggo.AddrPort(seeder.ListenAddr()),
834                 },
835         })
836         completes := make(map[int]bool, 3)
837 values:
838         for {
839                 // started := time.Now()
840                 select {
841                 case _v := <-psc.Values:
842                         // log.Print(time.Since(started))
843                         v := _v.(PieceStateChange)
844                         completes[v.Index] = v.Complete
845                 case <-time.After(100 * time.Millisecond):
846                         break values
847                 }
848         }
849         if ps.Cancel {
850                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
851         } else {
852                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
853         }
854
855 }
856
857 func TestTorrentDownloadAll(t *testing.T) {
858         testDownloadCancel(t, testDownloadCancelParams{})
859 }
860
861 func TestTorrentDownloadAllThenCancel(t *testing.T) {
862         testDownloadCancel(t, testDownloadCancelParams{
863                 Cancel: true,
864         })
865 }
866
867 // Ensure that it's an error for a peer to send an invalid have message.
868 func TestPeerInvalidHave(t *testing.T) {
869         cl, err := NewClient(&TestingConfig)
870         require.NoError(t, err)
871         defer cl.Close()
872         ie := metainfo.InfoEx{
873                 Info: metainfo.Info{
874                         PieceLength: 1,
875                         Pieces:      make([]byte, 20),
876                         Files:       []metainfo.FileInfo{{Length: 1}},
877                 },
878         }
879         ie.UpdateBytes()
880         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
881                 Info:     &ie,
882                 InfoHash: ie.Hash(),
883         })
884         require.NoError(t, err)
885         assert.True(t, _new)
886         defer tt.Drop()
887         cn := &connection{
888                 t: tt,
889         }
890         assert.NoError(t, cn.peerSentHave(0))
891         assert.Error(t, cn.peerSentHave(1))
892 }
893
894 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
895         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
896         defer os.RemoveAll(greetingTempDir)
897         cfg := TestingConfig
898         cfg.DataDir = greetingTempDir
899         seeder, err := NewClient(&TestingConfig)
900         require.NoError(t, err)
901         seeder.AddTorrentSpec(&TorrentSpec{
902                 Info: &greetingMetainfo.Info,
903         })
904 }
905
906 func TestPrepareTrackerAnnounce(t *testing.T) {
907         cl := &Client{}
908         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
909         require.NoError(t, err)
910         assert.False(t, blocked)
911         assert.EqualValues(t, "localhost:1234", host)
912         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
913 }
914
915 // Check that when the listen port is 0, all the protocols listened on have
916 // the same port, and it isn't zero.
917 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
918         cl, err := NewClient(&TestingConfig)
919         require.NoError(t, err)
920         defer cl.Close()
921         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
922         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
923 }
924
925 func TestClientDynamicListenTCPOnly(t *testing.T) {
926         cfg := TestingConfig
927         cfg.DisableUTP = true
928         cl, err := NewClient(&cfg)
929         require.NoError(t, err)
930         defer cl.Close()
931         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
932         assert.Nil(t, cl.utpSock)
933 }
934
935 func TestClientDynamicListenUTPOnly(t *testing.T) {
936         cfg := TestingConfig
937         cfg.DisableTCP = true
938         cl, err := NewClient(&cfg)
939         require.NoError(t, err)
940         defer cl.Close()
941         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
942         assert.Nil(t, cl.tcpListener)
943 }
944
945 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
946         cfg := TestingConfig
947         cfg.DisableTCP = true
948         cfg.DisableUTP = true
949         cl, err := NewClient(&cfg)
950         require.NoError(t, err)
951         defer cl.Close()
952         t.Log(cl.listenAddr)
953         assert.Nil(t, cl.ListenAddr())
954 }
955
956 func addClientPeer(t *Torrent, cl *Client) {
957         t.AddPeers([]Peer{
958                 Peer{
959                         IP:   missinggo.AddrIP(cl.ListenAddr()),
960                         Port: missinggo.AddrPort(cl.ListenAddr()),
961                 },
962         })
963 }
964
965 func printConnPeerCounts(t *Torrent) {
966         t.cl.mu.Lock()
967         log.Println(len(t.conns), len(t.peers))
968         t.cl.mu.Unlock()
969 }
970
971 func totalConns(tts []*Torrent) (ret int) {
972         for _, tt := range tts {
973                 tt.cl.mu.Lock()
974                 ret += len(tt.conns)
975                 tt.cl.mu.Unlock()
976         }
977         return
978 }
979
980 func TestSetMaxEstablishedConn(t *testing.T) {
981         var tts []*Torrent
982         ih := testutil.GreetingMetaInfo().Info.Hash()
983         cfg := TestingConfig
984         cfg.DisableUTP = true
985         for i := range iter.N(3) {
986                 cl, err := NewClient(&cfg)
987                 require.NoError(t, err)
988                 defer cl.Close()
989                 tt, _ := cl.AddTorrentInfoHash(ih)
990                 tt.SetMaxEstablishedConns(2)
991                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
992                 tts = append(tts, tt)
993         }
994         addPeers := func() {
995                 for i, tt := range tts {
996                         for _, _tt := range tts[:i] {
997                                 addClientPeer(tt, _tt.cl)
998                         }
999                 }
1000         }
1001         waitTotalConns := func(num int) {
1002                 for totalConns(tts) != num {
1003                         time.Sleep(time.Millisecond)
1004                 }
1005         }
1006         addPeers()
1007         waitTotalConns(6)
1008         tts[0].SetMaxEstablishedConns(1)
1009         waitTotalConns(4)
1010         tts[0].SetMaxEstablishedConns(0)
1011         waitTotalConns(2)
1012         tts[0].SetMaxEstablishedConns(1)
1013         addPeers()
1014         waitTotalConns(4)
1015         tts[0].SetMaxEstablishedConns(2)
1016         addPeers()
1017         waitTotalConns(6)
1018 }