]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Fixes for changes to metainfo.MetaInfo.UnmarshalInfo
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "math/rand"
11         "net"
12         "os"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/anacrolix/utp"
23         "github.com/bradfitz/iter"
24         "github.com/stretchr/testify/assert"
25         "github.com/stretchr/testify/require"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/dht"
29         "github.com/anacrolix/torrent/internal/testutil"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32         "github.com/anacrolix/torrent/storage"
33 )
34
35 func init() {
36         log.SetFlags(log.LstdFlags | log.Llongfile)
37 }
38
39 var TestingConfig = Config{
40         ListenAddr:      "localhost:0",
41         NoDHT:           true,
42         DisableTrackers: true,
43         DataDir:         "/tmp/anacrolix",
44         DHTConfig: dht.ServerConfig{
45                 NoDefaultBootstrap: true,
46         },
47 }
48
49 func TestClientDefault(t *testing.T) {
50         cl, err := NewClient(&TestingConfig)
51         require.NoError(t, err)
52         cl.Close()
53 }
54
55 func TestAddDropTorrent(t *testing.T) {
56         cl, err := NewClient(&TestingConfig)
57         require.NoError(t, err)
58         defer cl.Close()
59         dir, mi := testutil.GreetingTestTorrent()
60         defer os.RemoveAll(dir)
61         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
62         require.NoError(t, err)
63         assert.True(t, new)
64         tt.SetMaxEstablishedConns(0)
65         tt.SetMaxEstablishedConns(1)
66         tt.Drop()
67 }
68
69 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
70         t.SkipNow()
71 }
72
73 func TestAddTorrentNoUsableURLs(t *testing.T) {
74         t.SkipNow()
75 }
76
77 func TestAddPeersToUnknownTorrent(t *testing.T) {
78         t.SkipNow()
79 }
80
81 func TestPieceHashSize(t *testing.T) {
82         if pieceHash.Size() != 20 {
83                 t.FailNow()
84         }
85 }
86
87 func TestTorrentInitialState(t *testing.T) {
88         dir, mi := testutil.GreetingTestTorrent()
89         defer os.RemoveAll(dir)
90         tor := &Torrent{
91                 infoHash:          mi.HashInfoBytes(),
92                 pieceStateChanges: pubsub.NewPubSub(),
93         }
94         tor.chunkSize = 2
95         tor.storageOpener = storage.NewClient(storage.NewFile("/dev/null"))
96         // Needed to lock for asynchronous piece verification.
97         tor.cl = new(Client)
98         err := tor.setInfoBytes(mi.InfoBytes)
99         require.NoError(t, err)
100         require.Len(t, tor.pieces, 3)
101         tor.pendAllChunkSpecs(0)
102         tor.cl.mu.Lock()
103         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
104         tor.cl.mu.Unlock()
105         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
106 }
107
108 func TestUnmarshalPEXMsg(t *testing.T) {
109         var m peerExchangeMessage
110         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
111                 t.Fatal(err)
112         }
113         if len(m.Added) != 2 {
114                 t.FailNow()
115         }
116         if m.Added[0].Port != 0x506 {
117                 t.FailNow()
118         }
119 }
120
121 func TestReducedDialTimeout(t *testing.T) {
122         for _, _case := range []struct {
123                 Max             time.Duration
124                 HalfOpenLimit   int
125                 PendingPeers    int
126                 ExpectedReduced time.Duration
127         }{
128                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
129                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
130                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
131                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
132                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
133                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
134         } {
135                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
136                 expected := _case.ExpectedReduced
137                 if expected < minDialTimeout {
138                         expected = minDialTimeout
139                 }
140                 if reduced != expected {
141                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
142                 }
143         }
144 }
145
146 func TestUTPRawConn(t *testing.T) {
147         l, err := utp.NewSocket("udp", "")
148         if err != nil {
149                 t.Fatal(err)
150         }
151         defer l.Close()
152         go func() {
153                 for {
154                         _, err := l.Accept()
155                         if err != nil {
156                                 break
157                         }
158                 }
159         }()
160         // Connect a UTP peer to see if the RawConn will still work.
161         s, _ := utp.NewSocket("udp", "")
162         defer s.Close()
163         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
164         if err != nil {
165                 t.Fatalf("error dialing utp listener: %s", err)
166         }
167         defer utpPeer.Close()
168         peer, err := net.ListenPacket("udp", ":0")
169         if err != nil {
170                 t.Fatal(err)
171         }
172         defer peer.Close()
173
174         msgsReceived := 0
175         // How many messages to send. I've set this to double the channel buffer
176         // size in the raw packetConn.
177         const N = 200
178         readerStopped := make(chan struct{})
179         // The reader goroutine.
180         go func() {
181                 defer close(readerStopped)
182                 b := make([]byte, 500)
183                 for i := 0; i < N; i++ {
184                         n, _, err := l.ReadFrom(b)
185                         if err != nil {
186                                 t.Fatalf("error reading from raw conn: %s", err)
187                         }
188                         msgsReceived++
189                         var d int
190                         fmt.Sscan(string(b[:n]), &d)
191                         if d != i {
192                                 log.Printf("got wrong number: expected %d, got %d", i, d)
193                         }
194                 }
195         }()
196         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
197         if err != nil {
198                 t.Fatal(err)
199         }
200         for i := 0; i < N; i++ {
201                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
202                 if err != nil {
203                         t.Fatal(err)
204                 }
205                 time.Sleep(time.Microsecond)
206         }
207         select {
208         case <-readerStopped:
209         case <-time.After(time.Second):
210                 t.Fatal("reader timed out")
211         }
212         if msgsReceived != N {
213                 t.Fatalf("messages received: %d", msgsReceived)
214         }
215 }
216
217 func TestTwoClientsArbitraryPorts(t *testing.T) {
218         for i := 0; i < 2; i++ {
219                 cl, err := NewClient(&TestingConfig)
220                 if err != nil {
221                         t.Fatal(err)
222                 }
223                 defer cl.Close()
224         }
225 }
226
227 func TestAddDropManyTorrents(t *testing.T) {
228         cl, err := NewClient(&TestingConfig)
229         require.NoError(t, err)
230         defer cl.Close()
231         for i := range iter.N(1000) {
232                 var spec TorrentSpec
233                 binary.PutVarint(spec.InfoHash[:], int64(i))
234                 tt, new, err := cl.AddTorrentSpec(&spec)
235                 assert.NoError(t, err)
236                 assert.True(t, new)
237                 defer tt.Drop()
238         }
239 }
240
241 type FileCacheClientStorageFactoryParams struct {
242         Capacity    int64
243         SetCapacity bool
244         Wrapper     func(*filecache.Cache) storage.ClientImpl
245 }
246
247 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
248         return func(dataDir string) storage.ClientImpl {
249                 fc, err := filecache.NewCache(dataDir)
250                 if err != nil {
251                         panic(err)
252                 }
253                 if ps.SetCapacity {
254                         fc.SetCapacity(ps.Capacity)
255                 }
256                 return ps.Wrapper(fc)
257         }
258 }
259
260 type storageFactory func(string) storage.ClientImpl
261
262 func TestClientTransferDefault(t *testing.T) {
263         testClientTransfer(t, testClientTransferParams{
264                 ExportClientStatus: true,
265                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
266                         Wrapper: fileCachePieceResourceStorage,
267                 }),
268         })
269 }
270
271 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
272         return storage.NewResourcePieces(fc.AsResourceProvider())
273 }
274
275 func fileCachePieceFileStorage(fc *filecache.Cache) storage.ClientImpl {
276         return storage.NewFileStorePieces(fc.AsFileStore())
277 }
278
279 func TestClientTransferSmallCache(t *testing.T) {
280         testClientTransfer(t, testClientTransferParams{
281                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
282                         SetCapacity: true,
283                         // Going below the piece length means it can't complete a piece so
284                         // that it can be hashed.
285                         Capacity: 5,
286                         Wrapper:  fileCachePieceResourceStorage,
287                 }),
288                 SetReadahead: true,
289                 // Can't readahead too far or the cache will thrash and drop data we
290                 // thought we had.
291                 Readahead:          0,
292                 ExportClientStatus: true,
293         })
294 }
295
296 func TestClientTransferVarious(t *testing.T) {
297         for _, ls := range []storageFactory{
298                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
299                         Wrapper: fileCachePieceFileStorage,
300                 }),
301                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
302                         Wrapper: fileCachePieceResourceStorage,
303                 }),
304                 storage.NewBoltDB,
305         } {
306                 for _, ss := range []func(string) storage.ClientImpl{
307                         storage.NewFile,
308                         storage.NewMMap,
309                 } {
310                         for _, responsive := range []bool{false, true} {
311                                 testClientTransfer(t, testClientTransferParams{
312                                         Responsive:     responsive,
313                                         SeederStorage:  ss,
314                                         LeecherStorage: ls,
315                                 })
316                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
317                                         testClientTransfer(t, testClientTransferParams{
318                                                 SeederStorage:  ss,
319                                                 Responsive:     responsive,
320                                                 SetReadahead:   true,
321                                                 Readahead:      readahead,
322                                                 LeecherStorage: ls,
323                                         })
324                                 }
325                         }
326                 }
327         }
328 }
329
330 type testClientTransferParams struct {
331         Responsive         bool
332         Readahead          int64
333         SetReadahead       bool
334         ExportClientStatus bool
335         LeecherStorage     func(string) storage.ClientImpl
336         SeederStorage      func(string) storage.ClientImpl
337 }
338
339 // Creates a seeder and a leecher, and ensures the data transfers when a read
340 // is attempted on the leecher.
341 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
342         greetingTempDir, mi := testutil.GreetingTestTorrent()
343         defer os.RemoveAll(greetingTempDir)
344         // Create seeder and a Torrent.
345         cfg := TestingConfig
346         cfg.Seed = true
347         // cfg.ListenAddr = "localhost:4000"
348         if ps.SeederStorage != nil {
349                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
350         } else {
351                 cfg.DataDir = greetingTempDir
352         }
353         seeder, err := NewClient(&cfg)
354         require.NoError(t, err)
355         defer seeder.Close()
356         if ps.ExportClientStatus {
357                 testutil.ExportStatusWriter(seeder, "s")
358         }
359         // seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
360         _, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
361         require.NoError(t, err)
362         assert.True(t, new)
363         // Create leecher and a Torrent.
364         leecherDataDir, err := ioutil.TempDir("", "")
365         require.NoError(t, err)
366         defer os.RemoveAll(leecherDataDir)
367         cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
368         // cfg.ListenAddr = "localhost:4001"
369         leecher, err := NewClient(&cfg)
370         require.NoError(t, err)
371         defer leecher.Close()
372         if ps.ExportClientStatus {
373                 testutil.ExportStatusWriter(leecher, "l")
374         }
375         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
376                 ret = TorrentSpecFromMetaInfo(mi)
377                 ret.ChunkSize = 2
378                 ret.Storage = storage.NewFile(leecherDataDir)
379                 return
380         }())
381         require.NoError(t, err)
382         assert.True(t, new)
383         // Now do some things with leecher and seeder.
384         addClientPeer(leecherGreeting, seeder)
385         r := leecherGreeting.NewReader()
386         defer r.Close()
387         if ps.Responsive {
388                 r.SetResponsive()
389         }
390         if ps.SetReadahead {
391                 r.SetReadahead(ps.Readahead)
392         }
393         assertReadAllGreeting(t, r)
394         // After one read through, we can assume certain torrent statistics.
395         // These are not a strict requirement. It is however interesting to
396         // follow.
397         // t.Logf("%#v", seederTorrent.Stats())
398         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
399         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
400         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
401         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
402         // Read through again for the cases where the torrent data size exceeds
403         // the size of the cache.
404         assertReadAllGreeting(t, r)
405 }
406
407 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
408         pos, err := r.Seek(0, os.SEEK_SET)
409         assert.NoError(t, err)
410         assert.EqualValues(t, 0, pos)
411         _greeting, err := ioutil.ReadAll(r)
412         assert.NoError(t, err)
413         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
414 }
415
416 // Check that after completing leeching, a leecher transitions to a seeding
417 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
418 func TestSeedAfterDownloading(t *testing.T) {
419         greetingTempDir, mi := testutil.GreetingTestTorrent()
420         defer os.RemoveAll(greetingTempDir)
421         cfg := TestingConfig
422         cfg.Seed = true
423         cfg.DataDir = greetingTempDir
424         seeder, err := NewClient(&cfg)
425         require.NoError(t, err)
426         defer seeder.Close()
427         testutil.ExportStatusWriter(seeder, "s")
428         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
429         cfg.DataDir, err = ioutil.TempDir("", "")
430         require.NoError(t, err)
431         defer os.RemoveAll(cfg.DataDir)
432         leecher, err := NewClient(&cfg)
433         require.NoError(t, err)
434         defer leecher.Close()
435         testutil.ExportStatusWriter(leecher, "l")
436         cfg.Seed = false
437         // cfg.TorrentDataOpener = nil
438         cfg.DataDir, err = ioutil.TempDir("", "")
439         require.NoError(t, err)
440         defer os.RemoveAll(cfg.DataDir)
441         leecherLeecher, _ := NewClient(&cfg)
442         defer leecherLeecher.Close()
443         testutil.ExportStatusWriter(leecherLeecher, "ll")
444         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
445                 ret = TorrentSpecFromMetaInfo(mi)
446                 ret.ChunkSize = 2
447                 return
448         }())
449         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
450                 ret = TorrentSpecFromMetaInfo(mi)
451                 ret.ChunkSize = 3
452                 return
453         }())
454         // Simultaneously DownloadAll in Leecher, and read the contents
455         // consecutively in LeecherLeecher. This non-deterministically triggered a
456         // case where the leecher wouldn't unchoke the LeecherLeecher.
457         var wg sync.WaitGroup
458         wg.Add(1)
459         go func() {
460                 defer wg.Done()
461                 r := llg.NewReader()
462                 defer r.Close()
463                 b, err := ioutil.ReadAll(r)
464                 require.NoError(t, err)
465                 assert.EqualValues(t, testutil.GreetingFileContents, b)
466         }()
467         addClientPeer(leecherGreeting, seeder)
468         addClientPeer(leecherGreeting, leecherLeecher)
469         wg.Add(1)
470         go func() {
471                 defer wg.Done()
472                 leecherGreeting.DownloadAll()
473                 leecher.WaitAll()
474         }()
475         wg.Wait()
476 }
477
478 func TestMergingTrackersByAddingSpecs(t *testing.T) {
479         cl, err := NewClient(&TestingConfig)
480         require.NoError(t, err)
481         defer cl.Close()
482         spec := TorrentSpec{}
483         T, new, _ := cl.AddTorrentSpec(&spec)
484         if !new {
485                 t.FailNow()
486         }
487         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
488         _, new, _ = cl.AddTorrentSpec(&spec)
489         assert.False(t, new)
490         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
491         // Because trackers are disabled in TestingConfig.
492         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
493 }
494
495 type badStorage struct{}
496
497 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
498         return bs, nil
499 }
500
501 func (bs badStorage) Close() error {
502         return nil
503 }
504
505 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
506         return badStoragePiece{p}
507 }
508
509 type badStoragePiece struct {
510         p metainfo.Piece
511 }
512
513 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
514         return 0, nil
515 }
516
517 func (p badStoragePiece) GetIsComplete() bool {
518         return true
519 }
520
521 func (p badStoragePiece) MarkComplete() error {
522         return errors.New("psyyyyyyyche")
523 }
524
525 func (p badStoragePiece) MarkNotComplete() error {
526         return errors.New("psyyyyyyyche")
527 }
528
529 func (p badStoragePiece) randomlyTruncatedDataString() string {
530         return "hello, world\n"[:rand.Intn(14)]
531 }
532
533 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
534         r := strings.NewReader(p.randomlyTruncatedDataString())
535         return r.ReadAt(b, off+p.p.Offset())
536 }
537
538 // We read from a piece which is marked completed, but is missing data.
539 func TestCompletedPieceWrongSize(t *testing.T) {
540         cfg := TestingConfig
541         cfg.DefaultStorage = badStorage{}
542         cl, err := NewClient(&cfg)
543         require.NoError(t, err)
544         defer cl.Close()
545         info := metainfo.Info{
546                 PieceLength: 15,
547                 Pieces:      make([]byte, 20),
548                 Files: []metainfo.FileInfo{
549                         metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
550                 },
551         }
552         b, err := bencode.Marshal(info)
553         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
554                 InfoBytes: b,
555                 InfoHash:  metainfo.HashBytes(b),
556         })
557         require.NoError(t, err)
558         defer tt.Drop()
559         assert.True(t, new)
560         r := tt.NewReader()
561         defer r.Close()
562         b, err = ioutil.ReadAll(r)
563         assert.Len(t, b, 13)
564         assert.NoError(t, err)
565 }
566
567 func BenchmarkAddLargeTorrent(b *testing.B) {
568         cfg := TestingConfig
569         cfg.DisableTCP = true
570         cfg.DisableUTP = true
571         cfg.ListenAddr = "redonk"
572         cl, err := NewClient(&cfg)
573         require.NoError(b, err)
574         defer cl.Close()
575         for range iter.N(b.N) {
576                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
577                 if err != nil {
578                         b.Fatal(err)
579                 }
580                 t.Drop()
581         }
582 }
583
584 func TestResponsive(t *testing.T) {
585         seederDataDir, mi := testutil.GreetingTestTorrent()
586         defer os.RemoveAll(seederDataDir)
587         cfg := TestingConfig
588         cfg.Seed = true
589         cfg.DataDir = seederDataDir
590         seeder, err := NewClient(&cfg)
591         require.Nil(t, err)
592         defer seeder.Close()
593         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
594         leecherDataDir, err := ioutil.TempDir("", "")
595         require.Nil(t, err)
596         defer os.RemoveAll(leecherDataDir)
597         cfg = TestingConfig
598         cfg.DataDir = leecherDataDir
599         leecher, err := NewClient(&cfg)
600         require.Nil(t, err)
601         defer leecher.Close()
602         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
603                 ret = TorrentSpecFromMetaInfo(mi)
604                 ret.ChunkSize = 2
605                 return
606         }())
607         addClientPeer(leecherTorrent, seeder)
608         reader := leecherTorrent.NewReader()
609         defer reader.Close()
610         reader.SetReadahead(0)
611         reader.SetResponsive()
612         b := make([]byte, 2)
613         _, err = reader.Seek(3, os.SEEK_SET)
614         require.NoError(t, err)
615         _, err = io.ReadFull(reader, b)
616         assert.Nil(t, err)
617         assert.EqualValues(t, "lo", string(b))
618         _, err = reader.Seek(11, os.SEEK_SET)
619         require.NoError(t, err)
620         n, err := io.ReadFull(reader, b)
621         assert.Nil(t, err)
622         assert.EqualValues(t, 2, n)
623         assert.EqualValues(t, "d\n", string(b))
624 }
625
626 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
627         seederDataDir, mi := testutil.GreetingTestTorrent()
628         defer os.RemoveAll(seederDataDir)
629         cfg := TestingConfig
630         cfg.Seed = true
631         cfg.DataDir = seederDataDir
632         seeder, err := NewClient(&cfg)
633         require.Nil(t, err)
634         defer seeder.Close()
635         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
636         leecherDataDir, err := ioutil.TempDir("", "")
637         require.Nil(t, err)
638         defer os.RemoveAll(leecherDataDir)
639         cfg = TestingConfig
640         cfg.DataDir = leecherDataDir
641         leecher, err := NewClient(&cfg)
642         require.Nil(t, err)
643         defer leecher.Close()
644         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
645                 ret = TorrentSpecFromMetaInfo(mi)
646                 ret.ChunkSize = 2
647                 return
648         }())
649         addClientPeer(leecherTorrent, seeder)
650         reader := leecherTorrent.NewReader()
651         defer reader.Close()
652         reader.SetReadahead(0)
653         reader.SetResponsive()
654         b := make([]byte, 2)
655         _, err = reader.Seek(3, os.SEEK_SET)
656         require.NoError(t, err)
657         _, err = io.ReadFull(reader, b)
658         assert.Nil(t, err)
659         assert.EqualValues(t, "lo", string(b))
660         go leecherTorrent.Drop()
661         _, err = reader.Seek(11, os.SEEK_SET)
662         require.NoError(t, err)
663         n, err := reader.Read(b)
664         assert.EqualError(t, err, "torrent closed")
665         assert.EqualValues(t, 0, n)
666 }
667
668 func TestDHTInheritBlocklist(t *testing.T) {
669         ipl := iplist.New(nil)
670         require.NotNil(t, ipl)
671         cfg := TestingConfig
672         cfg.IPBlocklist = ipl
673         cfg.NoDHT = false
674         cl, err := NewClient(&cfg)
675         require.NoError(t, err)
676         defer cl.Close()
677         require.Equal(t, ipl, cl.DHT().IPBlocklist())
678 }
679
680 // Check that stuff is merged in subsequent AddTorrentSpec for the same
681 // infohash.
682 func TestAddTorrentSpecMerging(t *testing.T) {
683         cl, err := NewClient(&TestingConfig)
684         require.NoError(t, err)
685         defer cl.Close()
686         dir, mi := testutil.GreetingTestTorrent()
687         defer os.RemoveAll(dir)
688         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
689                 InfoHash: mi.HashInfoBytes(),
690         })
691         require.NoError(t, err)
692         require.True(t, new)
693         require.Nil(t, tt.Info())
694         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
695         require.NoError(t, err)
696         require.False(t, new)
697         require.NotNil(t, tt.Info())
698 }
699
700 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
701         dir, mi := testutil.GreetingTestTorrent()
702         os.RemoveAll(dir)
703         cl, _ := NewClient(&TestingConfig)
704         defer cl.Close()
705         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
706                 InfoHash: mi.HashInfoBytes(),
707         })
708         tt.Drop()
709         assert.EqualValues(t, 0, len(cl.Torrents()))
710         select {
711         case <-tt.GotInfo():
712                 t.FailNow()
713         default:
714         }
715 }
716
717 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
718         for i := range iter.N(info.NumPieces()) {
719                 p := info.Piece(i)
720                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
721         }
722 }
723
724 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
725         fileCacheDir, err := ioutil.TempDir("", "")
726         require.NoError(t, err)
727         defer os.RemoveAll(fileCacheDir)
728         fileCache, err := filecache.NewCache(fileCacheDir)
729         require.NoError(t, err)
730         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
731         defer os.RemoveAll(greetingDataTempDir)
732         filePieceStore := csf(fileCache)
733         info, err := greetingMetainfo.UnmarshalInfo()
734         require.NoError(t, err)
735         ih := greetingMetainfo.HashInfoBytes()
736         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
737         require.NoError(t, err)
738         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
739         // require.Equal(t, len(testutil.GreetingFileContents), written)
740         // require.NoError(t, err)
741         for i := 0; i < info.NumPieces(); i++ {
742                 p := info.Piece(i)
743                 if alreadyCompleted {
744                         err := greetingData.Piece(p).MarkComplete()
745                         assert.NoError(t, err)
746                 }
747         }
748         cfg := TestingConfig
749         // TODO: Disable network option?
750         cfg.DisableTCP = true
751         cfg.DisableUTP = true
752         cfg.DefaultStorage = filePieceStore
753         cl, err := NewClient(&cfg)
754         require.NoError(t, err)
755         defer cl.Close()
756         tt, err := cl.AddTorrent(greetingMetainfo)
757         require.NoError(t, err)
758         psrs := tt.PieceStateRuns()
759         assert.Len(t, psrs, 1)
760         assert.EqualValues(t, 3, psrs[0].Length)
761         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
762         if alreadyCompleted {
763                 r := tt.NewReader()
764                 b, err := ioutil.ReadAll(r)
765                 assert.NoError(t, err)
766                 assert.EqualValues(t, testutil.GreetingFileContents, b)
767         }
768 }
769
770 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
771         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceFileStorage)
772         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
773 }
774
775 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
776         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceFileStorage)
777         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
778 }
779
780 func TestAddMetainfoWithNodes(t *testing.T) {
781         cfg := TestingConfig
782         cfg.NoDHT = false
783         // For now, we want to just jam the nodes into the table, without
784         // verifying them first. Also the DHT code doesn't support mixing secure
785         // and insecure nodes if security is enabled (yet).
786         cfg.DHTConfig.NoSecurity = true
787         cl, err := NewClient(&cfg)
788         require.NoError(t, err)
789         defer cl.Close()
790         assert.EqualValues(t, cl.DHT().NumNodes(), 0)
791         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
792         require.NoError(t, err)
793         assert.Len(t, tt.metainfo.AnnounceList, 5)
794         assert.EqualValues(t, 6, cl.DHT().NumNodes())
795 }
796
797 type testDownloadCancelParams struct {
798         ExportClientStatus        bool
799         SetLeecherStorageCapacity bool
800         LeecherStorageCapacity    int64
801         Cancel                    bool
802 }
803
804 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
805         greetingTempDir, mi := testutil.GreetingTestTorrent()
806         defer os.RemoveAll(greetingTempDir)
807         cfg := TestingConfig
808         cfg.Seed = true
809         cfg.DataDir = greetingTempDir
810         seeder, err := NewClient(&cfg)
811         require.NoError(t, err)
812         defer seeder.Close()
813         if ps.ExportClientStatus {
814                 testutil.ExportStatusWriter(seeder, "s")
815         }
816         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
817         leecherDataDir, err := ioutil.TempDir("", "")
818         require.NoError(t, err)
819         defer os.RemoveAll(leecherDataDir)
820         fc, err := filecache.NewCache(leecherDataDir)
821         require.NoError(t, err)
822         if ps.SetLeecherStorageCapacity {
823                 fc.SetCapacity(ps.LeecherStorageCapacity)
824         }
825         cfg.DefaultStorage = storage.NewFileStorePieces(fc.AsFileStore())
826         cfg.DataDir = leecherDataDir
827         leecher, _ := NewClient(&cfg)
828         defer leecher.Close()
829         if ps.ExportClientStatus {
830                 testutil.ExportStatusWriter(leecher, "l")
831         }
832         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
833                 ret = TorrentSpecFromMetaInfo(mi)
834                 ret.ChunkSize = 2
835                 return
836         }())
837         require.NoError(t, err)
838         assert.True(t, new)
839         psc := leecherGreeting.SubscribePieceStateChanges()
840         defer psc.Close()
841         leecherGreeting.DownloadAll()
842         if ps.Cancel {
843                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
844         }
845         addClientPeer(leecherGreeting, seeder)
846         completes := make(map[int]bool, 3)
847 values:
848         for {
849                 // started := time.Now()
850                 select {
851                 case _v := <-psc.Values:
852                         // log.Print(time.Since(started))
853                         v := _v.(PieceStateChange)
854                         completes[v.Index] = v.Complete
855                 case <-time.After(100 * time.Millisecond):
856                         break values
857                 }
858         }
859         if ps.Cancel {
860                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
861         } else {
862                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
863         }
864
865 }
866
867 func TestTorrentDownloadAll(t *testing.T) {
868         testDownloadCancel(t, testDownloadCancelParams{})
869 }
870
871 func TestTorrentDownloadAllThenCancel(t *testing.T) {
872         testDownloadCancel(t, testDownloadCancelParams{
873                 Cancel: true,
874         })
875 }
876
877 // Ensure that it's an error for a peer to send an invalid have message.
878 func TestPeerInvalidHave(t *testing.T) {
879         cl, err := NewClient(&TestingConfig)
880         require.NoError(t, err)
881         defer cl.Close()
882         info := metainfo.Info{
883                 PieceLength: 1,
884                 Pieces:      make([]byte, 20),
885                 Files:       []metainfo.FileInfo{{Length: 1}},
886         }
887         infoBytes, err := bencode.Marshal(info)
888         require.NoError(t, err)
889         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
890                 InfoBytes: infoBytes,
891                 InfoHash:  metainfo.HashBytes(infoBytes),
892         })
893         require.NoError(t, err)
894         assert.True(t, _new)
895         defer tt.Drop()
896         cn := &connection{
897                 t: tt,
898         }
899         assert.NoError(t, cn.peerSentHave(0))
900         assert.Error(t, cn.peerSentHave(1))
901 }
902
903 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
904         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
905         defer os.RemoveAll(greetingTempDir)
906         cfg := TestingConfig
907         cfg.DataDir = greetingTempDir
908         seeder, err := NewClient(&TestingConfig)
909         require.NoError(t, err)
910         seeder.AddTorrentSpec(&TorrentSpec{
911                 InfoBytes: greetingMetainfo.InfoBytes,
912         })
913 }
914
915 func TestPrepareTrackerAnnounce(t *testing.T) {
916         cl := &Client{}
917         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
918         require.NoError(t, err)
919         assert.False(t, blocked)
920         assert.EqualValues(t, "localhost:1234", host)
921         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
922 }
923
924 // Check that when the listen port is 0, all the protocols listened on have
925 // the same port, and it isn't zero.
926 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
927         cl, err := NewClient(&TestingConfig)
928         require.NoError(t, err)
929         defer cl.Close()
930         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
931         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
932 }
933
934 func TestClientDynamicListenTCPOnly(t *testing.T) {
935         cfg := TestingConfig
936         cfg.DisableUTP = true
937         cl, err := NewClient(&cfg)
938         require.NoError(t, err)
939         defer cl.Close()
940         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
941         assert.Nil(t, cl.utpSock)
942 }
943
944 func TestClientDynamicListenUTPOnly(t *testing.T) {
945         cfg := TestingConfig
946         cfg.DisableTCP = true
947         cl, err := NewClient(&cfg)
948         require.NoError(t, err)
949         defer cl.Close()
950         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
951         assert.Nil(t, cl.tcpListener)
952 }
953
954 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
955         cfg := TestingConfig
956         cfg.DisableTCP = true
957         cfg.DisableUTP = true
958         cl, err := NewClient(&cfg)
959         require.NoError(t, err)
960         defer cl.Close()
961         assert.Nil(t, cl.ListenAddr())
962 }
963
964 func addClientPeer(t *Torrent, cl *Client) {
965         t.AddPeers([]Peer{
966                 Peer{
967                         IP:   missinggo.AddrIP(cl.ListenAddr()),
968                         Port: missinggo.AddrPort(cl.ListenAddr()),
969                 },
970         })
971 }
972
973 func printConnPeerCounts(t *Torrent) {
974         t.cl.mu.Lock()
975         log.Println(len(t.conns), len(t.peers))
976         t.cl.mu.Unlock()
977 }
978
979 func totalConns(tts []*Torrent) (ret int) {
980         for _, tt := range tts {
981                 tt.cl.mu.Lock()
982                 ret += len(tt.conns)
983                 tt.cl.mu.Unlock()
984         }
985         return
986 }
987
988 func TestSetMaxEstablishedConn(t *testing.T) {
989         var tts []*Torrent
990         ih := testutil.GreetingMetaInfo().HashInfoBytes()
991         cfg := TestingConfig
992         for i := range iter.N(3) {
993                 cl, err := NewClient(&cfg)
994                 require.NoError(t, err)
995                 defer cl.Close()
996                 tt, _ := cl.AddTorrentInfoHash(ih)
997                 tt.SetMaxEstablishedConns(2)
998                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
999                 tts = append(tts, tt)
1000         }
1001         addPeers := func() {
1002                 for i, tt := range tts {
1003                         for _, _tt := range tts[:i] {
1004                                 addClientPeer(tt, _tt.cl)
1005                         }
1006                 }
1007         }
1008         waitTotalConns := func(num int) {
1009                 for totalConns(tts) != num {
1010                         time.Sleep(time.Millisecond)
1011                 }
1012         }
1013         addPeers()
1014         waitTotalConns(6)
1015         tts[0].SetMaxEstablishedConns(1)
1016         waitTotalConns(4)
1017         tts[0].SetMaxEstablishedConns(0)
1018         waitTotalConns(2)
1019         tts[0].SetMaxEstablishedConns(1)
1020         addPeers()
1021         waitTotalConns(4)
1022         tts[0].SetMaxEstablishedConns(2)
1023         addPeers()
1024         waitTotalConns(6)
1025 }
1026
1027 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1028         var err error
1029         file, err := os.Create(dir + "/" + name)
1030         require.NoError(t, err)
1031         file.Write([]byte(name))
1032         file.Close()    
1033         mi := metainfo.MetaInfo{}
1034         mi.SetDefaults()
1035         info := metainfo.Info{ PieceLength: 256 * 1024 }
1036         err = info.BuildFromFilePath(dir + "/" + name)
1037         require.NoError(t, err)
1038         mi.InfoBytes, err = bencode.Marshal(info)
1039         require.NoError(t, err)
1040         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1041         tr, err := cl.AddTorrent(&mi)
1042         require.NoError(t, err)
1043         assert.True(t, tr.Seeding())
1044         return magnet
1045 }
1046
1047 // https://github.com/anacrolix/torrent/issues/114
1048 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1049         cfg := TestingConfig
1050         cfg.DisableUTP = true
1051         cfg.Seed = true
1052         cfg.DataDir = cfg.DataDir + "/server"
1053         cfg.Debug = true
1054         cfg.ForceEncryption = true
1055         os.Mkdir(cfg.DataDir, 0755)
1056         server, err := NewClient(&cfg)
1057         defer server.Close()
1058         require.NoError(t, err)
1059         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1060         makeMagnet(t, server, cfg.DataDir, "test2")
1061         cfg = TestingConfig
1062         cfg.DisableUTP = true
1063         cfg.DataDir = cfg.DataDir + "/client"
1064         cfg.Debug = true
1065         cfg.ForceEncryption = true
1066         client, err := NewClient(&cfg)
1067         require.NoError(t, err)
1068         defer client.Close()
1069         tr, err := client.AddMagnet(magnet1)
1070         require.NoError(t, err)
1071         tr.AddPeers([]Peer{Peer{
1072                 IP: missinggo.AddrIP(server.ListenAddr()),
1073                 Port: missinggo.AddrPort(server.ListenAddr()),
1074                 }})
1075         <- tr.GotInfo()
1076         tr.DownloadAll()
1077         client.WaitAll()
1078 }