]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Merge commit '6ab65a49a8a72dea1a28968b2ab42a85fd4566ec'
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "math/rand"
10         "net"
11         "os"
12         "path/filepath"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/bradfitz/iter"
23         "github.com/stretchr/testify/assert"
24         "github.com/stretchr/testify/require"
25         "golang.org/x/time/rate"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/internal/testutil"
29         "github.com/anacrolix/torrent/iplist"
30         "github.com/anacrolix/torrent/metainfo"
31         "github.com/anacrolix/torrent/storage"
32 )
33
34 func TestingConfig() *Config {
35         return &Config{
36                 ListenAddr:      "localhost:0",
37                 NoDHT:           true,
38                 DataDir:         tempDir(),
39                 DisableTrackers: true,
40                 // Debug:           true,
41         }
42 }
43
44 func TestClientDefault(t *testing.T) {
45         cl, err := NewClient(TestingConfig())
46         require.NoError(t, err)
47         cl.Close()
48 }
49
50 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
51         cfg := TestingConfig()
52         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
53         require.NoError(t, err)
54         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
55         defer ci.Close()
56         cfg.DefaultStorage = ci
57         cl, err := NewClient(cfg)
58         require.NoError(t, err)
59         cl.Close()
60         // And again, https://github.com/anacrolix/torrent/issues/158
61         cl, err = NewClient(cfg)
62         require.NoError(t, err)
63         cl.Close()
64 }
65
66 func TestAddDropTorrent(t *testing.T) {
67         cl, err := NewClient(TestingConfig())
68         require.NoError(t, err)
69         defer cl.Close()
70         dir, mi := testutil.GreetingTestTorrent()
71         defer os.RemoveAll(dir)
72         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
73         require.NoError(t, err)
74         assert.True(t, new)
75         tt.SetMaxEstablishedConns(0)
76         tt.SetMaxEstablishedConns(1)
77         tt.Drop()
78 }
79
80 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
81         t.SkipNow()
82 }
83
84 func TestAddTorrentNoUsableURLs(t *testing.T) {
85         t.SkipNow()
86 }
87
88 func TestAddPeersToUnknownTorrent(t *testing.T) {
89         t.SkipNow()
90 }
91
92 func TestPieceHashSize(t *testing.T) {
93         if pieceHash.Size() != 20 {
94                 t.FailNow()
95         }
96 }
97
98 func TestTorrentInitialState(t *testing.T) {
99         dir, mi := testutil.GreetingTestTorrent()
100         defer os.RemoveAll(dir)
101         tor := &Torrent{
102                 infoHash:          mi.HashInfoBytes(),
103                 pieceStateChanges: pubsub.NewPubSub(),
104         }
105         tor.chunkSize = 2
106         tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()))
107         // Needed to lock for asynchronous piece verification.
108         tor.cl = new(Client)
109         tor.cl.mu.Lock()
110         err := tor.setInfoBytes(mi.InfoBytes)
111         tor.cl.mu.Unlock()
112         require.NoError(t, err)
113         require.Len(t, tor.pieces, 3)
114         tor.pendAllChunkSpecs(0)
115         tor.cl.mu.Lock()
116         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
117         tor.cl.mu.Unlock()
118         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
119 }
120
121 func TestUnmarshalPEXMsg(t *testing.T) {
122         var m peerExchangeMessage
123         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
124                 t.Fatal(err)
125         }
126         if len(m.Added) != 2 {
127                 t.FailNow()
128         }
129         if m.Added[0].Port != 0x506 {
130                 t.FailNow()
131         }
132 }
133
134 func TestReducedDialTimeout(t *testing.T) {
135         cfg := &Config{}
136         cfg.setDefaults()
137         for _, _case := range []struct {
138                 Max             time.Duration
139                 HalfOpenLimit   int
140                 PendingPeers    int
141                 ExpectedReduced time.Duration
142         }{
143                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
144                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
145                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
146                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
147                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
148                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
149         } {
150                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
151                 expected := _case.ExpectedReduced
152                 if expected < cfg.MinDialTimeout {
153                         expected = cfg.MinDialTimeout
154                 }
155                 if reduced != expected {
156                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
157                 }
158         }
159 }
160
161 func TestUTPRawConn(t *testing.T) {
162         l, err := NewUtpSocket("udp", "")
163         require.NoError(t, err)
164         defer l.Close()
165         go func() {
166                 for {
167                         _, err := l.Accept()
168                         if err != nil {
169                                 break
170                         }
171                 }
172         }()
173         // Connect a UTP peer to see if the RawConn will still work.
174         s, err := NewUtpSocket("udp", "")
175         require.NoError(t, err)
176         defer s.Close()
177         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
178         require.NoError(t, err)
179         defer utpPeer.Close()
180         peer, err := net.ListenPacket("udp", ":0")
181         require.NoError(t, err)
182         defer peer.Close()
183
184         msgsReceived := 0
185         // How many messages to send. I've set this to double the channel buffer
186         // size in the raw packetConn.
187         const N = 200
188         readerStopped := make(chan struct{})
189         // The reader goroutine.
190         go func() {
191                 defer close(readerStopped)
192                 b := make([]byte, 500)
193                 for i := 0; i < N; i++ {
194                         n, _, err := l.ReadFrom(b)
195                         require.NoError(t, err)
196                         msgsReceived++
197                         var d int
198                         fmt.Sscan(string(b[:n]), &d)
199                         assert.Equal(t, i, d)
200                 }
201         }()
202         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
203         require.NoError(t, err)
204         for i := 0; i < N; i++ {
205                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
206                 require.NoError(t, err)
207                 time.Sleep(time.Millisecond)
208         }
209         select {
210         case <-readerStopped:
211         case <-time.After(time.Second):
212                 t.Fatal("reader timed out")
213         }
214         if msgsReceived != N {
215                 t.Fatalf("messages received: %d", msgsReceived)
216         }
217 }
218
219 func TestTwoClientsArbitraryPorts(t *testing.T) {
220         for i := 0; i < 2; i++ {
221                 cl, err := NewClient(TestingConfig())
222                 if err != nil {
223                         t.Fatal(err)
224                 }
225                 defer cl.Close()
226         }
227 }
228
229 func TestAddDropManyTorrents(t *testing.T) {
230         cl, err := NewClient(TestingConfig())
231         require.NoError(t, err)
232         defer cl.Close()
233         for i := range iter.N(1000) {
234                 var spec TorrentSpec
235                 binary.PutVarint(spec.InfoHash[:], int64(i))
236                 tt, new, err := cl.AddTorrentSpec(&spec)
237                 assert.NoError(t, err)
238                 assert.True(t, new)
239                 defer tt.Drop()
240         }
241 }
242
243 type FileCacheClientStorageFactoryParams struct {
244         Capacity    int64
245         SetCapacity bool
246         Wrapper     func(*filecache.Cache) storage.ClientImpl
247 }
248
249 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
250         return func(dataDir string) storage.ClientImpl {
251                 fc, err := filecache.NewCache(dataDir)
252                 if err != nil {
253                         panic(err)
254                 }
255                 if ps.SetCapacity {
256                         fc.SetCapacity(ps.Capacity)
257                 }
258                 return ps.Wrapper(fc)
259         }
260 }
261
262 type storageFactory func(string) storage.ClientImpl
263
264 func TestClientTransferDefault(t *testing.T) {
265         testClientTransfer(t, testClientTransferParams{
266                 ExportClientStatus: true,
267                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
268                         Wrapper: fileCachePieceResourceStorage,
269                 }),
270         })
271 }
272
273 func TestClientTransferRateLimitedUpload(t *testing.T) {
274         started := time.Now()
275         testClientTransfer(t, testClientTransferParams{
276                 // We are uploading 13 bytes (the length of the greeting torrent). The
277                 // chunks are 2 bytes in length. Then the smallest burst we can run
278                 // with is 2. Time taken is (13-burst)/rate.
279                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
280         })
281         require.True(t, time.Since(started) > time.Second)
282 }
283
284 func TestClientTransferRateLimitedDownload(t *testing.T) {
285         testClientTransfer(t, testClientTransferParams{
286                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
287         })
288 }
289
290 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
291         return storage.NewResourcePieces(fc.AsResourceProvider())
292 }
293
294 func TestClientTransferSmallCache(t *testing.T) {
295         testClientTransfer(t, testClientTransferParams{
296                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
297                         SetCapacity: true,
298                         // Going below the piece length means it can't complete a piece so
299                         // that it can be hashed.
300                         Capacity: 5,
301                         Wrapper:  fileCachePieceResourceStorage,
302                 }),
303                 SetReadahead: true,
304                 // Can't readahead too far or the cache will thrash and drop data we
305                 // thought we had.
306                 Readahead:          0,
307                 ExportClientStatus: true,
308         })
309 }
310
311 func TestClientTransferVarious(t *testing.T) {
312         // Leecher storage
313         for _, ls := range []storageFactory{
314                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
315                         Wrapper: fileCachePieceResourceStorage,
316                 }),
317                 storage.NewBoltDB,
318         } {
319                 // Seeder storage
320                 for _, ss := range []func(string) storage.ClientImpl{
321                         storage.NewFile,
322                         storage.NewMMap,
323                 } {
324                         for _, responsive := range []bool{false, true} {
325                                 testClientTransfer(t, testClientTransferParams{
326                                         Responsive:     responsive,
327                                         SeederStorage:  ss,
328                                         LeecherStorage: ls,
329                                 })
330                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
331                                         testClientTransfer(t, testClientTransferParams{
332                                                 SeederStorage:  ss,
333                                                 Responsive:     responsive,
334                                                 SetReadahead:   true,
335                                                 Readahead:      readahead,
336                                                 LeecherStorage: ls,
337                                         })
338                                 }
339                         }
340                 }
341         }
342 }
343
344 type testClientTransferParams struct {
345         Responsive                 bool
346         Readahead                  int64
347         SetReadahead               bool
348         ExportClientStatus         bool
349         LeecherStorage             func(string) storage.ClientImpl
350         SeederStorage              func(string) storage.ClientImpl
351         SeederUploadRateLimiter    *rate.Limiter
352         LeecherDownloadRateLimiter *rate.Limiter
353 }
354
355 // Creates a seeder and a leecher, and ensures the data transfers when a read
356 // is attempted on the leecher.
357 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
358         greetingTempDir, mi := testutil.GreetingTestTorrent()
359         defer os.RemoveAll(greetingTempDir)
360         // Create seeder and a Torrent.
361         cfg := TestingConfig()
362         cfg.Seed = true
363         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
364         // cfg.ListenAddr = "localhost:4000"
365         if ps.SeederStorage != nil {
366                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
367                 defer cfg.DefaultStorage.Close()
368         } else {
369                 cfg.DataDir = greetingTempDir
370         }
371         seeder, err := NewClient(cfg)
372         require.NoError(t, err)
373         defer seeder.Close()
374         if ps.ExportClientStatus {
375                 testutil.ExportStatusWriter(seeder, "s")
376         }
377         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
378         seederTorrent.VerifyData()
379         // Create leecher and a Torrent.
380         leecherDataDir, err := ioutil.TempDir("", "")
381         require.NoError(t, err)
382         defer os.RemoveAll(leecherDataDir)
383         if ps.LeecherStorage == nil {
384                 cfg.DataDir = leecherDataDir
385         } else {
386                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
387         }
388         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
389         // cfg.ListenAddr = "localhost:4001"
390         leecher, err := NewClient(cfg)
391         require.NoError(t, err)
392         defer leecher.Close()
393         if ps.ExportClientStatus {
394                 testutil.ExportStatusWriter(leecher, "l")
395         }
396         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
397                 ret = TorrentSpecFromMetaInfo(mi)
398                 ret.ChunkSize = 2
399                 return
400         }())
401         require.NoError(t, err)
402         assert.True(t, new)
403         // Now do some things with leecher and seeder.
404         addClientPeer(leecherGreeting, seeder)
405         r := leecherGreeting.NewReader()
406         defer r.Close()
407         if ps.Responsive {
408                 r.SetResponsive()
409         }
410         if ps.SetReadahead {
411                 r.SetReadahead(ps.Readahead)
412         }
413         assertReadAllGreeting(t, r)
414         // After one read through, we can assume certain torrent statistics.
415         // These are not a strict requirement. It is however interesting to
416         // follow.
417         // t.Logf("%#v", seederTorrent.Stats())
418         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
419         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
420         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
421         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
422         // Read through again for the cases where the torrent data size exceeds
423         // the size of the cache.
424         assertReadAllGreeting(t, r)
425 }
426
427 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
428         pos, err := r.Seek(0, io.SeekStart)
429         assert.NoError(t, err)
430         assert.EqualValues(t, 0, pos)
431         _greeting, err := ioutil.ReadAll(r)
432         assert.NoError(t, err)
433         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
434 }
435
436 // Check that after completing leeching, a leecher transitions to a seeding
437 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
438 func TestSeedAfterDownloading(t *testing.T) {
439         greetingTempDir, mi := testutil.GreetingTestTorrent()
440         defer os.RemoveAll(greetingTempDir)
441         cfg := TestingConfig()
442         cfg.Seed = true
443         cfg.DataDir = greetingTempDir
444         seeder, err := NewClient(cfg)
445         require.NoError(t, err)
446         defer seeder.Close()
447         testutil.ExportStatusWriter(seeder, "s")
448         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
449         seederTorrent.VerifyData()
450         cfg.DataDir, err = ioutil.TempDir("", "")
451         require.NoError(t, err)
452         defer os.RemoveAll(cfg.DataDir)
453         leecher, err := NewClient(cfg)
454         require.NoError(t, err)
455         defer leecher.Close()
456         testutil.ExportStatusWriter(leecher, "l")
457         cfg.Seed = false
458         // cfg.TorrentDataOpener = nil
459         cfg.DataDir, err = ioutil.TempDir("", "")
460         require.NoError(t, err)
461         defer os.RemoveAll(cfg.DataDir)
462         leecherLeecher, _ := NewClient(cfg)
463         defer leecherLeecher.Close()
464         testutil.ExportStatusWriter(leecherLeecher, "ll")
465         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
466                 ret = TorrentSpecFromMetaInfo(mi)
467                 ret.ChunkSize = 2
468                 return
469         }())
470         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
471                 ret = TorrentSpecFromMetaInfo(mi)
472                 ret.ChunkSize = 3
473                 return
474         }())
475         // Simultaneously DownloadAll in Leecher, and read the contents
476         // consecutively in LeecherLeecher. This non-deterministically triggered a
477         // case where the leecher wouldn't unchoke the LeecherLeecher.
478         var wg sync.WaitGroup
479         wg.Add(1)
480         go func() {
481                 defer wg.Done()
482                 r := llg.NewReader()
483                 defer r.Close()
484                 b, err := ioutil.ReadAll(r)
485                 require.NoError(t, err)
486                 assert.EqualValues(t, testutil.GreetingFileContents, b)
487         }()
488         addClientPeer(leecherGreeting, seeder)
489         addClientPeer(leecherGreeting, leecherLeecher)
490         wg.Add(1)
491         go func() {
492                 defer wg.Done()
493                 leecherGreeting.DownloadAll()
494                 leecher.WaitAll()
495         }()
496         wg.Wait()
497 }
498
499 func TestMergingTrackersByAddingSpecs(t *testing.T) {
500         cl, err := NewClient(TestingConfig())
501         require.NoError(t, err)
502         defer cl.Close()
503         spec := TorrentSpec{}
504         T, new, _ := cl.AddTorrentSpec(&spec)
505         if !new {
506                 t.FailNow()
507         }
508         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
509         _, new, _ = cl.AddTorrentSpec(&spec)
510         assert.False(t, new)
511         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
512         // Because trackers are disabled in TestingConfig.
513         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
514 }
515
516 type badStorage struct{}
517
518 var _ storage.ClientImpl = badStorage{}
519
520 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
521         return bs, nil
522 }
523
524 func (bs badStorage) Close() error {
525         return nil
526 }
527
528 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
529         return badStoragePiece{p}
530 }
531
532 type badStoragePiece struct {
533         p metainfo.Piece
534 }
535
536 var _ storage.PieceImpl = badStoragePiece{}
537
538 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
539         return 0, nil
540 }
541
542 func (p badStoragePiece) Completion() storage.Completion {
543         return storage.Completion{Complete: true, Ok: true}
544 }
545
546 func (p badStoragePiece) MarkComplete() error {
547         return errors.New("psyyyyyyyche")
548 }
549
550 func (p badStoragePiece) MarkNotComplete() error {
551         return errors.New("psyyyyyyyche")
552 }
553
554 func (p badStoragePiece) randomlyTruncatedDataString() string {
555         return "hello, world\n"[:rand.Intn(14)]
556 }
557
558 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
559         r := strings.NewReader(p.randomlyTruncatedDataString())
560         return r.ReadAt(b, off+p.p.Offset())
561 }
562
563 // We read from a piece which is marked completed, but is missing data.
564 func TestCompletedPieceWrongSize(t *testing.T) {
565         cfg := TestingConfig()
566         cfg.DefaultStorage = badStorage{}
567         cl, err := NewClient(cfg)
568         require.NoError(t, err)
569         defer cl.Close()
570         info := metainfo.Info{
571                 PieceLength: 15,
572                 Pieces:      make([]byte, 20),
573                 Files: []metainfo.FileInfo{
574                         {Path: []string{"greeting"}, Length: 13},
575                 },
576         }
577         b, err := bencode.Marshal(info)
578         require.NoError(t, err)
579         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
580                 InfoBytes: b,
581                 InfoHash:  metainfo.HashBytes(b),
582         })
583         require.NoError(t, err)
584         defer tt.Drop()
585         assert.True(t, new)
586         r := tt.NewReader()
587         defer r.Close()
588         b, err = ioutil.ReadAll(r)
589         assert.Len(t, b, 13)
590         assert.NoError(t, err)
591 }
592
593 func BenchmarkAddLargeTorrent(b *testing.B) {
594         cfg := TestingConfig()
595         cfg.DisableTCP = true
596         cfg.DisableUTP = true
597         cfg.ListenAddr = "redonk"
598         cl, err := NewClient(cfg)
599         require.NoError(b, err)
600         defer cl.Close()
601         for range iter.N(b.N) {
602                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
603                 if err != nil {
604                         b.Fatal(err)
605                 }
606                 t.Drop()
607         }
608 }
609
610 func TestResponsive(t *testing.T) {
611         seederDataDir, mi := testutil.GreetingTestTorrent()
612         defer os.RemoveAll(seederDataDir)
613         cfg := TestingConfig()
614         cfg.Seed = true
615         cfg.DataDir = seederDataDir
616         seeder, err := NewClient(cfg)
617         require.Nil(t, err)
618         defer seeder.Close()
619         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
620         seederTorrent.VerifyData()
621         leecherDataDir, err := ioutil.TempDir("", "")
622         require.Nil(t, err)
623         defer os.RemoveAll(leecherDataDir)
624         cfg = TestingConfig()
625         cfg.DataDir = leecherDataDir
626         leecher, err := NewClient(cfg)
627         require.Nil(t, err)
628         defer leecher.Close()
629         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
630                 ret = TorrentSpecFromMetaInfo(mi)
631                 ret.ChunkSize = 2
632                 return
633         }())
634         addClientPeer(leecherTorrent, seeder)
635         reader := leecherTorrent.NewReader()
636         defer reader.Close()
637         reader.SetReadahead(0)
638         reader.SetResponsive()
639         b := make([]byte, 2)
640         _, err = reader.Seek(3, io.SeekStart)
641         require.NoError(t, err)
642         _, err = io.ReadFull(reader, b)
643         assert.Nil(t, err)
644         assert.EqualValues(t, "lo", string(b))
645         _, err = reader.Seek(11, io.SeekStart)
646         require.NoError(t, err)
647         n, err := io.ReadFull(reader, b)
648         assert.Nil(t, err)
649         assert.EqualValues(t, 2, n)
650         assert.EqualValues(t, "d\n", string(b))
651 }
652
653 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
654         seederDataDir, mi := testutil.GreetingTestTorrent()
655         defer os.RemoveAll(seederDataDir)
656         cfg := TestingConfig()
657         cfg.Seed = true
658         cfg.DataDir = seederDataDir
659         seeder, err := NewClient(cfg)
660         require.Nil(t, err)
661         defer seeder.Close()
662         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
663         seederTorrent.VerifyData()
664         leecherDataDir, err := ioutil.TempDir("", "")
665         require.Nil(t, err)
666         defer os.RemoveAll(leecherDataDir)
667         cfg = TestingConfig()
668         cfg.DataDir = leecherDataDir
669         leecher, err := NewClient(cfg)
670         require.Nil(t, err)
671         defer leecher.Close()
672         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
673                 ret = TorrentSpecFromMetaInfo(mi)
674                 ret.ChunkSize = 2
675                 return
676         }())
677         addClientPeer(leecherTorrent, seeder)
678         reader := leecherTorrent.NewReader()
679         defer reader.Close()
680         reader.SetReadahead(0)
681         reader.SetResponsive()
682         b := make([]byte, 2)
683         _, err = reader.Seek(3, io.SeekStart)
684         require.NoError(t, err)
685         _, err = io.ReadFull(reader, b)
686         assert.Nil(t, err)
687         assert.EqualValues(t, "lo", string(b))
688         go leecherTorrent.Drop()
689         _, err = reader.Seek(11, io.SeekStart)
690         require.NoError(t, err)
691         n, err := reader.Read(b)
692         assert.EqualError(t, err, "torrent closed")
693         assert.EqualValues(t, 0, n)
694 }
695
696 func TestDHTInheritBlocklist(t *testing.T) {
697         ipl := iplist.New(nil)
698         require.NotNil(t, ipl)
699         cfg := TestingConfig()
700         cfg.IPBlocklist = ipl
701         cfg.NoDHT = false
702         cl, err := NewClient(cfg)
703         require.NoError(t, err)
704         defer cl.Close()
705         require.Equal(t, ipl, cl.DHT().IPBlocklist())
706 }
707
708 // Check that stuff is merged in subsequent AddTorrentSpec for the same
709 // infohash.
710 func TestAddTorrentSpecMerging(t *testing.T) {
711         cl, err := NewClient(TestingConfig())
712         require.NoError(t, err)
713         defer cl.Close()
714         dir, mi := testutil.GreetingTestTorrent()
715         defer os.RemoveAll(dir)
716         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
717                 InfoHash: mi.HashInfoBytes(),
718         })
719         require.NoError(t, err)
720         require.True(t, new)
721         require.Nil(t, tt.Info())
722         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
723         require.NoError(t, err)
724         require.False(t, new)
725         require.NotNil(t, tt.Info())
726 }
727
728 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
729         dir, mi := testutil.GreetingTestTorrent()
730         os.RemoveAll(dir)
731         cl, _ := NewClient(TestingConfig())
732         defer cl.Close()
733         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
734                 InfoHash: mi.HashInfoBytes(),
735         })
736         tt.Drop()
737         assert.EqualValues(t, 0, len(cl.Torrents()))
738         select {
739         case <-tt.GotInfo():
740                 t.FailNow()
741         default:
742         }
743 }
744
745 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
746         for i := range iter.N(info.NumPieces()) {
747                 p := info.Piece(i)
748                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
749         }
750 }
751
752 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
753         fileCacheDir, err := ioutil.TempDir("", "")
754         require.NoError(t, err)
755         defer os.RemoveAll(fileCacheDir)
756         fileCache, err := filecache.NewCache(fileCacheDir)
757         require.NoError(t, err)
758         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
759         defer os.RemoveAll(greetingDataTempDir)
760         filePieceStore := csf(fileCache)
761         defer filePieceStore.Close()
762         info, err := greetingMetainfo.UnmarshalInfo()
763         require.NoError(t, err)
764         ih := greetingMetainfo.HashInfoBytes()
765         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
766         require.NoError(t, err)
767         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
768         // require.Equal(t, len(testutil.GreetingFileContents), written)
769         // require.NoError(t, err)
770         for i := 0; i < info.NumPieces(); i++ {
771                 p := info.Piece(i)
772                 if alreadyCompleted {
773                         require.NoError(t, greetingData.Piece(p).MarkComplete())
774                 }
775         }
776         cfg := TestingConfig()
777         // TODO: Disable network option?
778         cfg.DisableTCP = true
779         cfg.DisableUTP = true
780         cfg.DefaultStorage = filePieceStore
781         cl, err := NewClient(cfg)
782         require.NoError(t, err)
783         defer cl.Close()
784         tt, err := cl.AddTorrent(greetingMetainfo)
785         require.NoError(t, err)
786         psrs := tt.PieceStateRuns()
787         assert.Len(t, psrs, 1)
788         assert.EqualValues(t, 3, psrs[0].Length)
789         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
790         if alreadyCompleted {
791                 r := tt.NewReader()
792                 b, err := ioutil.ReadAll(r)
793                 assert.NoError(t, err)
794                 assert.EqualValues(t, testutil.GreetingFileContents, b)
795         }
796 }
797
798 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
799         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
800 }
801
802 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
803         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
804 }
805
806 func TestAddMetainfoWithNodes(t *testing.T) {
807         cfg := TestingConfig()
808         cfg.ListenAddr = ":0"
809         cfg.NoDHT = false
810         // For now, we want to just jam the nodes into the table, without
811         // verifying them first. Also the DHT code doesn't support mixing secure
812         // and insecure nodes if security is enabled (yet).
813         cfg.DHTConfig.NoSecurity = true
814         cl, err := NewClient(cfg)
815         require.NoError(t, err)
816         defer cl.Close()
817         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
818         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
819         require.NoError(t, err)
820         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
821         // check if the announce-list is here instead. TODO: Add nodes.
822         assert.Len(t, tt.metainfo.AnnounceList, 5)
823         // There are 6 nodes in the torrent file.
824         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
825 }
826
827 type testDownloadCancelParams struct {
828         ExportClientStatus        bool
829         SetLeecherStorageCapacity bool
830         LeecherStorageCapacity    int64
831         Cancel                    bool
832 }
833
834 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
835         greetingTempDir, mi := testutil.GreetingTestTorrent()
836         defer os.RemoveAll(greetingTempDir)
837         cfg := TestingConfig()
838         cfg.Seed = true
839         cfg.DataDir = greetingTempDir
840         seeder, err := NewClient(cfg)
841         require.NoError(t, err)
842         defer seeder.Close()
843         if ps.ExportClientStatus {
844                 testutil.ExportStatusWriter(seeder, "s")
845         }
846         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
847         seederTorrent.VerifyData()
848         leecherDataDir, err := ioutil.TempDir("", "")
849         require.NoError(t, err)
850         defer os.RemoveAll(leecherDataDir)
851         fc, err := filecache.NewCache(leecherDataDir)
852         require.NoError(t, err)
853         if ps.SetLeecherStorageCapacity {
854                 fc.SetCapacity(ps.LeecherStorageCapacity)
855         }
856         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
857         cfg.DataDir = leecherDataDir
858         leecher, _ := NewClient(cfg)
859         defer leecher.Close()
860         if ps.ExportClientStatus {
861                 testutil.ExportStatusWriter(leecher, "l")
862         }
863         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
864                 ret = TorrentSpecFromMetaInfo(mi)
865                 ret.ChunkSize = 2
866                 return
867         }())
868         require.NoError(t, err)
869         assert.True(t, new)
870         psc := leecherGreeting.SubscribePieceStateChanges()
871         defer psc.Close()
872         leecherGreeting.DownloadAll()
873         if ps.Cancel {
874                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
875         }
876         addClientPeer(leecherGreeting, seeder)
877         completes := make(map[int]bool, 3)
878 values:
879         for {
880                 // started := time.Now()
881                 select {
882                 case _v := <-psc.Values:
883                         // log.Print(time.Since(started))
884                         v := _v.(PieceStateChange)
885                         completes[v.Index] = v.Complete
886                 case <-time.After(100 * time.Millisecond):
887                         break values
888                 }
889         }
890         if ps.Cancel {
891                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
892         } else {
893                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
894         }
895
896 }
897
898 func TestTorrentDownloadAll(t *testing.T) {
899         testDownloadCancel(t, testDownloadCancelParams{})
900 }
901
902 func TestTorrentDownloadAllThenCancel(t *testing.T) {
903         testDownloadCancel(t, testDownloadCancelParams{
904                 Cancel: true,
905         })
906 }
907
908 // Ensure that it's an error for a peer to send an invalid have message.
909 func TestPeerInvalidHave(t *testing.T) {
910         cl, err := NewClient(TestingConfig())
911         require.NoError(t, err)
912         defer cl.Close()
913         info := metainfo.Info{
914                 PieceLength: 1,
915                 Pieces:      make([]byte, 20),
916                 Files:       []metainfo.FileInfo{{Length: 1}},
917         }
918         infoBytes, err := bencode.Marshal(info)
919         require.NoError(t, err)
920         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
921                 InfoBytes: infoBytes,
922                 InfoHash:  metainfo.HashBytes(infoBytes),
923                 Storage:   badStorage{},
924         })
925         require.NoError(t, err)
926         assert.True(t, _new)
927         defer tt.Drop()
928         cn := &connection{
929                 t: tt,
930         }
931         assert.NoError(t, cn.peerSentHave(0))
932         assert.Error(t, cn.peerSentHave(1))
933 }
934
935 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
936         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
937         defer os.RemoveAll(greetingTempDir)
938         cfg := TestingConfig()
939         cfg.DataDir = greetingTempDir
940         seeder, err := NewClient(TestingConfig())
941         require.NoError(t, err)
942         seeder.AddTorrentSpec(&TorrentSpec{
943                 InfoBytes: greetingMetainfo.InfoBytes,
944         })
945 }
946
947 func TestPrepareTrackerAnnounce(t *testing.T) {
948         cl := &Client{}
949         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
950         require.NoError(t, err)
951         assert.False(t, blocked)
952         assert.EqualValues(t, "localhost:1234", host)
953         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
954 }
955
956 // Check that when the listen port is 0, all the protocols listened on have
957 // the same port, and it isn't zero.
958 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
959         cl, err := NewClient(TestingConfig())
960         require.NoError(t, err)
961         defer cl.Close()
962         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
963         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
964 }
965
966 func TestClientDynamicListenTCPOnly(t *testing.T) {
967         cfg := TestingConfig()
968         cfg.DisableUTP = true
969         cl, err := NewClient(cfg)
970         require.NoError(t, err)
971         defer cl.Close()
972         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
973         assert.Nil(t, cl.utpSock)
974 }
975
976 func TestClientDynamicListenUTPOnly(t *testing.T) {
977         cfg := TestingConfig()
978         cfg.DisableTCP = true
979         cl, err := NewClient(cfg)
980         require.NoError(t, err)
981         defer cl.Close()
982         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
983         assert.Nil(t, cl.tcpListener)
984 }
985
986 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
987         cfg := TestingConfig()
988         cfg.DisableTCP = true
989         cfg.DisableUTP = true
990         cl, err := NewClient(cfg)
991         require.NoError(t, err)
992         defer cl.Close()
993         assert.Nil(t, cl.ListenAddr())
994 }
995
996 func addClientPeer(t *Torrent, cl *Client) {
997         t.AddPeers([]Peer{
998                 {
999                         IP:   missinggo.AddrIP(cl.ListenAddr()),
1000                         Port: missinggo.AddrPort(cl.ListenAddr()),
1001                 },
1002         })
1003 }
1004
1005 func totalConns(tts []*Torrent) (ret int) {
1006         for _, tt := range tts {
1007                 tt.cl.mu.Lock()
1008                 ret += len(tt.conns)
1009                 tt.cl.mu.Unlock()
1010         }
1011         return
1012 }
1013
1014 func TestSetMaxEstablishedConn(t *testing.T) {
1015         var tts []*Torrent
1016         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1017         for i := range iter.N(3) {
1018                 cl, err := NewClient(TestingConfig())
1019                 require.NoError(t, err)
1020                 defer cl.Close()
1021                 tt, _ := cl.AddTorrentInfoHash(ih)
1022                 tt.SetMaxEstablishedConns(2)
1023                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1024                 tts = append(tts, tt)
1025         }
1026         addPeers := func() {
1027                 for i, tt := range tts {
1028                         for _, _tt := range tts[:i] {
1029                                 addClientPeer(tt, _tt.cl)
1030                         }
1031                 }
1032         }
1033         waitTotalConns := func(num int) {
1034                 for totalConns(tts) != num {
1035                         time.Sleep(time.Millisecond)
1036                 }
1037         }
1038         addPeers()
1039         waitTotalConns(6)
1040         tts[0].SetMaxEstablishedConns(1)
1041         waitTotalConns(4)
1042         tts[0].SetMaxEstablishedConns(0)
1043         waitTotalConns(2)
1044         tts[0].SetMaxEstablishedConns(1)
1045         addPeers()
1046         waitTotalConns(4)
1047         tts[0].SetMaxEstablishedConns(2)
1048         addPeers()
1049         waitTotalConns(6)
1050 }
1051
1052 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1053         os.MkdirAll(dir, 0770)
1054         file, err := os.Create(filepath.Join(dir, name))
1055         require.NoError(t, err)
1056         file.Write([]byte(name))
1057         file.Close()
1058         mi := metainfo.MetaInfo{}
1059         mi.SetDefaults()
1060         info := metainfo.Info{PieceLength: 256 * 1024}
1061         err = info.BuildFromFilePath(filepath.Join(dir, name))
1062         require.NoError(t, err)
1063         mi.InfoBytes, err = bencode.Marshal(info)
1064         require.NoError(t, err)
1065         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1066         tr, err := cl.AddTorrent(&mi)
1067         require.NoError(t, err)
1068         require.True(t, tr.Seeding())
1069         tr.VerifyData()
1070         return magnet
1071 }
1072
1073 // https://github.com/anacrolix/torrent/issues/114
1074 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1075         cfg := TestingConfig()
1076         cfg.DisableUTP = true
1077         cfg.Seed = true
1078         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1079         cfg.Debug = true
1080         cfg.ForceEncryption = true
1081         os.Mkdir(cfg.DataDir, 0755)
1082         server, err := NewClient(cfg)
1083         require.NoError(t, err)
1084         defer server.Close()
1085         testutil.ExportStatusWriter(server, "s")
1086         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1087         makeMagnet(t, server, cfg.DataDir, "test2")
1088         cfg = TestingConfig()
1089         cfg.DisableUTP = true
1090         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1091         cfg.Debug = true
1092         cfg.ForceEncryption = true
1093         client, err := NewClient(cfg)
1094         require.NoError(t, err)
1095         defer client.Close()
1096         testutil.ExportStatusWriter(client, "c")
1097         tr, err := client.AddMagnet(magnet1)
1098         require.NoError(t, err)
1099         tr.AddPeers([]Peer{{
1100                 IP:   missinggo.AddrIP(server.ListenAddr()),
1101                 Port: missinggo.AddrPort(server.ListenAddr()),
1102         }})
1103         <-tr.GotInfo()
1104         tr.DownloadAll()
1105         client.WaitAll()
1106 }
1107
1108 func TestClientAddressInUse(t *testing.T) {
1109         s, _ := NewUtpSocket("udp", ":50007")
1110         if s != nil {
1111                 defer s.Close()
1112         }
1113         cfg := TestingConfig()
1114         cfg.ListenAddr = ":50007"
1115         cl, err := NewClient(cfg)
1116         require.Error(t, err)
1117         require.Nil(t, cl)
1118 }