]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Fix mentions of TorrentDataOpener
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "math/rand"
10         "net"
11         "os"
12         "path/filepath"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/bradfitz/iter"
23         "github.com/stretchr/testify/assert"
24         "github.com/stretchr/testify/require"
25         "golang.org/x/time/rate"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/internal/testutil"
29         "github.com/anacrolix/torrent/iplist"
30         "github.com/anacrolix/torrent/metainfo"
31         "github.com/anacrolix/torrent/storage"
32 )
33
34 func TestingConfig() *Config {
35         return &Config{
36                 ListenAddr:      "localhost:0",
37                 NoDHT:           true,
38                 DataDir:         tempDir(),
39                 DisableTrackers: true,
40                 // Debug:           true,
41         }
42 }
43
44 func TestClientDefault(t *testing.T) {
45         cl, err := NewClient(TestingConfig())
46         require.NoError(t, err)
47         cl.Close()
48 }
49
50 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
51         cfg := TestingConfig()
52         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
53         require.NoError(t, err)
54         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
55         defer ci.Close()
56         cfg.DefaultStorage = ci
57         cl, err := NewClient(cfg)
58         require.NoError(t, err)
59         cl.Close()
60         // And again, https://github.com/anacrolix/torrent/issues/158
61         cl, err = NewClient(cfg)
62         require.NoError(t, err)
63         cl.Close()
64 }
65
66 func TestAddDropTorrent(t *testing.T) {
67         cl, err := NewClient(TestingConfig())
68         require.NoError(t, err)
69         defer cl.Close()
70         dir, mi := testutil.GreetingTestTorrent()
71         defer os.RemoveAll(dir)
72         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
73         require.NoError(t, err)
74         assert.True(t, new)
75         tt.SetMaxEstablishedConns(0)
76         tt.SetMaxEstablishedConns(1)
77         tt.Drop()
78 }
79
80 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
81         // TODO?
82         t.SkipNow()
83 }
84
85 func TestAddTorrentNoUsableURLs(t *testing.T) {
86         // TODO?
87         t.SkipNow()
88 }
89
90 func TestAddPeersToUnknownTorrent(t *testing.T) {
91         // TODO?
92         t.SkipNow()
93 }
94
95 func TestPieceHashSize(t *testing.T) {
96         assert.Equal(t, 20, pieceHash.Size())
97 }
98
99 func TestTorrentInitialState(t *testing.T) {
100         dir, mi := testutil.GreetingTestTorrent()
101         defer os.RemoveAll(dir)
102         tor := &Torrent{
103                 infoHash:          mi.HashInfoBytes(),
104                 pieceStateChanges: pubsub.NewPubSub(),
105         }
106         tor.chunkSize = 2
107         tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()))
108         // Needed to lock for asynchronous piece verification.
109         tor.cl = new(Client)
110         tor.cl.mu.Lock()
111         err := tor.setInfoBytes(mi.InfoBytes)
112         tor.cl.mu.Unlock()
113         require.NoError(t, err)
114         require.Len(t, tor.pieces, 3)
115         tor.pendAllChunkSpecs(0)
116         tor.cl.mu.Lock()
117         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
118         tor.cl.mu.Unlock()
119         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
120 }
121
122 func TestUnmarshalPEXMsg(t *testing.T) {
123         var m peerExchangeMessage
124         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
125                 t.Fatal(err)
126         }
127         if len(m.Added) != 2 {
128                 t.FailNow()
129         }
130         if m.Added[0].Port != 0x506 {
131                 t.FailNow()
132         }
133 }
134
135 func TestReducedDialTimeout(t *testing.T) {
136         cfg := &Config{}
137         cfg.setDefaults()
138         for _, _case := range []struct {
139                 Max             time.Duration
140                 HalfOpenLimit   int
141                 PendingPeers    int
142                 ExpectedReduced time.Duration
143         }{
144                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
145                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
146                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
147                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
148                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
149                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
150         } {
151                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
152                 expected := _case.ExpectedReduced
153                 if expected < cfg.MinDialTimeout {
154                         expected = cfg.MinDialTimeout
155                 }
156                 if reduced != expected {
157                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
158                 }
159         }
160 }
161
162 func TestUTPRawConn(t *testing.T) {
163         l, err := NewUtpSocket("udp", "")
164         require.NoError(t, err)
165         defer l.Close()
166         go func() {
167                 for {
168                         _, err := l.Accept()
169                         if err != nil {
170                                 break
171                         }
172                 }
173         }()
174         // Connect a UTP peer to see if the RawConn will still work.
175         s, err := NewUtpSocket("udp", "")
176         require.NoError(t, err)
177         defer s.Close()
178         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
179         require.NoError(t, err)
180         defer utpPeer.Close()
181         peer, err := net.ListenPacket("udp", ":0")
182         require.NoError(t, err)
183         defer peer.Close()
184
185         msgsReceived := 0
186         // How many messages to send. I've set this to double the channel buffer
187         // size in the raw packetConn.
188         const N = 200
189         readerStopped := make(chan struct{})
190         // The reader goroutine.
191         go func() {
192                 defer close(readerStopped)
193                 b := make([]byte, 500)
194                 for i := 0; i < N; i++ {
195                         n, _, err := l.ReadFrom(b)
196                         require.NoError(t, err)
197                         msgsReceived++
198                         var d int
199                         fmt.Sscan(string(b[:n]), &d)
200                         assert.Equal(t, i, d)
201                 }
202         }()
203         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
204         require.NoError(t, err)
205         for i := 0; i < N; i++ {
206                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
207                 require.NoError(t, err)
208                 time.Sleep(time.Millisecond)
209         }
210         select {
211         case <-readerStopped:
212         case <-time.After(time.Second):
213                 t.Fatal("reader timed out")
214         }
215         if msgsReceived != N {
216                 t.Fatalf("messages received: %d", msgsReceived)
217         }
218 }
219
220 func TestTwoClientsArbitraryPorts(t *testing.T) {
221         for i := 0; i < 2; i++ {
222                 cl, err := NewClient(TestingConfig())
223                 if err != nil {
224                         t.Fatal(err)
225                 }
226                 defer cl.Close()
227         }
228 }
229
230 func TestAddDropManyTorrents(t *testing.T) {
231         cl, err := NewClient(TestingConfig())
232         require.NoError(t, err)
233         defer cl.Close()
234         for i := range iter.N(1000) {
235                 var spec TorrentSpec
236                 binary.PutVarint(spec.InfoHash[:], int64(i))
237                 tt, new, err := cl.AddTorrentSpec(&spec)
238                 assert.NoError(t, err)
239                 assert.True(t, new)
240                 defer tt.Drop()
241         }
242 }
243
244 type FileCacheClientStorageFactoryParams struct {
245         Capacity    int64
246         SetCapacity bool
247         Wrapper     func(*filecache.Cache) storage.ClientImpl
248 }
249
250 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
251         return func(dataDir string) storage.ClientImpl {
252                 fc, err := filecache.NewCache(dataDir)
253                 if err != nil {
254                         panic(err)
255                 }
256                 if ps.SetCapacity {
257                         fc.SetCapacity(ps.Capacity)
258                 }
259                 return ps.Wrapper(fc)
260         }
261 }
262
263 type storageFactory func(string) storage.ClientImpl
264
265 func TestClientTransferDefault(t *testing.T) {
266         testClientTransfer(t, testClientTransferParams{
267                 ExportClientStatus: true,
268                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
269                         Wrapper: fileCachePieceResourceStorage,
270                 }),
271         })
272 }
273
274 func TestClientTransferRateLimitedUpload(t *testing.T) {
275         started := time.Now()
276         testClientTransfer(t, testClientTransferParams{
277                 // We are uploading 13 bytes (the length of the greeting torrent). The
278                 // chunks are 2 bytes in length. Then the smallest burst we can run
279                 // with is 2. Time taken is (13-burst)/rate.
280                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
281         })
282         require.True(t, time.Since(started) > time.Second)
283 }
284
285 func TestClientTransferRateLimitedDownload(t *testing.T) {
286         testClientTransfer(t, testClientTransferParams{
287                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
288         })
289 }
290
291 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
292         return storage.NewResourcePieces(fc.AsResourceProvider())
293 }
294
295 func TestClientTransferSmallCache(t *testing.T) {
296         testClientTransfer(t, testClientTransferParams{
297                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
298                         SetCapacity: true,
299                         // Going below the piece length means it can't complete a piece so
300                         // that it can be hashed.
301                         Capacity: 5,
302                         Wrapper:  fileCachePieceResourceStorage,
303                 }),
304                 SetReadahead: true,
305                 // Can't readahead too far or the cache will thrash and drop data we
306                 // thought we had.
307                 Readahead:          0,
308                 ExportClientStatus: true,
309         })
310 }
311
312 func TestClientTransferVarious(t *testing.T) {
313         // Leecher storage
314         for _, ls := range []storageFactory{
315                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
316                         Wrapper: fileCachePieceResourceStorage,
317                 }),
318                 storage.NewBoltDB,
319         } {
320                 // Seeder storage
321                 for _, ss := range []func(string) storage.ClientImpl{
322                         storage.NewFile,
323                         storage.NewMMap,
324                 } {
325                         for _, responsive := range []bool{false, true} {
326                                 testClientTransfer(t, testClientTransferParams{
327                                         Responsive:     responsive,
328                                         SeederStorage:  ss,
329                                         LeecherStorage: ls,
330                                 })
331                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
332                                         testClientTransfer(t, testClientTransferParams{
333                                                 SeederStorage:  ss,
334                                                 Responsive:     responsive,
335                                                 SetReadahead:   true,
336                                                 Readahead:      readahead,
337                                                 LeecherStorage: ls,
338                                         })
339                                 }
340                         }
341                 }
342         }
343 }
344
345 type testClientTransferParams struct {
346         Responsive                 bool
347         Readahead                  int64
348         SetReadahead               bool
349         ExportClientStatus         bool
350         LeecherStorage             func(string) storage.ClientImpl
351         SeederStorage              func(string) storage.ClientImpl
352         SeederUploadRateLimiter    *rate.Limiter
353         LeecherDownloadRateLimiter *rate.Limiter
354 }
355
356 // Creates a seeder and a leecher, and ensures the data transfers when a read
357 // is attempted on the leecher.
358 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
359         greetingTempDir, mi := testutil.GreetingTestTorrent()
360         defer os.RemoveAll(greetingTempDir)
361         // Create seeder and a Torrent.
362         cfg := TestingConfig()
363         cfg.Seed = true
364         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
365         // cfg.ListenAddr = "localhost:4000"
366         if ps.SeederStorage != nil {
367                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
368                 defer cfg.DefaultStorage.Close()
369         } else {
370                 cfg.DataDir = greetingTempDir
371         }
372         seeder, err := NewClient(cfg)
373         require.NoError(t, err)
374         if ps.ExportClientStatus {
375                 testutil.ExportStatusWriter(seeder, "s")
376         }
377         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
378         // Run a Stats right after Closing the Client. This will trigger the Stats
379         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
380         defer seederTorrent.Stats()
381         defer seeder.Close()
382         seederTorrent.VerifyData()
383         // Create leecher and a Torrent.
384         leecherDataDir, err := ioutil.TempDir("", "")
385         require.NoError(t, err)
386         defer os.RemoveAll(leecherDataDir)
387         if ps.LeecherStorage == nil {
388                 cfg.DataDir = leecherDataDir
389         } else {
390                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
391         }
392         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
393         // cfg.ListenAddr = "localhost:4001"
394         leecher, err := NewClient(cfg)
395         require.NoError(t, err)
396         defer leecher.Close()
397         if ps.ExportClientStatus {
398                 testutil.ExportStatusWriter(leecher, "l")
399         }
400         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
401                 ret = TorrentSpecFromMetaInfo(mi)
402                 ret.ChunkSize = 2
403                 return
404         }())
405         require.NoError(t, err)
406         assert.True(t, new)
407         // Now do some things with leecher and seeder.
408         addClientPeer(leecherGreeting, seeder)
409         r := leecherGreeting.NewReader()
410         defer r.Close()
411         if ps.Responsive {
412                 r.SetResponsive()
413         }
414         if ps.SetReadahead {
415                 r.SetReadahead(ps.Readahead)
416         }
417         assertReadAllGreeting(t, r)
418         // After one read through, we can assume certain torrent statistics.
419         // These are not a strict requirement. It is however interesting to
420         // follow.
421         // t.Logf("%#v", seederTorrent.Stats())
422         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
423         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
424         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
425         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
426         // Read through again for the cases where the torrent data size exceeds
427         // the size of the cache.
428         assertReadAllGreeting(t, r)
429 }
430
431 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
432         pos, err := r.Seek(0, io.SeekStart)
433         assert.NoError(t, err)
434         assert.EqualValues(t, 0, pos)
435         _greeting, err := ioutil.ReadAll(r)
436         assert.NoError(t, err)
437         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
438 }
439
440 // Check that after completing leeching, a leecher transitions to a seeding
441 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
442 func TestSeedAfterDownloading(t *testing.T) {
443         greetingTempDir, mi := testutil.GreetingTestTorrent()
444         defer os.RemoveAll(greetingTempDir)
445         cfg := TestingConfig()
446         cfg.Seed = true
447         cfg.DataDir = greetingTempDir
448         seeder, err := NewClient(cfg)
449         require.NoError(t, err)
450         defer seeder.Close()
451         testutil.ExportStatusWriter(seeder, "s")
452         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
453         seederTorrent.VerifyData()
454         cfg.DataDir, err = ioutil.TempDir("", "")
455         require.NoError(t, err)
456         defer os.RemoveAll(cfg.DataDir)
457         leecher, err := NewClient(cfg)
458         require.NoError(t, err)
459         defer leecher.Close()
460         testutil.ExportStatusWriter(leecher, "l")
461         cfg.Seed = false
462         cfg.DataDir, err = ioutil.TempDir("", "")
463         require.NoError(t, err)
464         defer os.RemoveAll(cfg.DataDir)
465         leecherLeecher, _ := NewClient(cfg)
466         defer leecherLeecher.Close()
467         testutil.ExportStatusWriter(leecherLeecher, "ll")
468         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
469                 ret = TorrentSpecFromMetaInfo(mi)
470                 ret.ChunkSize = 2
471                 return
472         }())
473         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
474                 ret = TorrentSpecFromMetaInfo(mi)
475                 ret.ChunkSize = 3
476                 return
477         }())
478         // Simultaneously DownloadAll in Leecher, and read the contents
479         // consecutively in LeecherLeecher. This non-deterministically triggered a
480         // case where the leecher wouldn't unchoke the LeecherLeecher.
481         var wg sync.WaitGroup
482         wg.Add(1)
483         go func() {
484                 defer wg.Done()
485                 r := llg.NewReader()
486                 defer r.Close()
487                 b, err := ioutil.ReadAll(r)
488                 require.NoError(t, err)
489                 assert.EqualValues(t, testutil.GreetingFileContents, b)
490         }()
491         addClientPeer(leecherGreeting, seeder)
492         addClientPeer(leecherGreeting, leecherLeecher)
493         wg.Add(1)
494         go func() {
495                 defer wg.Done()
496                 leecherGreeting.DownloadAll()
497                 leecher.WaitAll()
498         }()
499         wg.Wait()
500 }
501
502 func TestMergingTrackersByAddingSpecs(t *testing.T) {
503         cl, err := NewClient(TestingConfig())
504         require.NoError(t, err)
505         defer cl.Close()
506         spec := TorrentSpec{}
507         T, new, _ := cl.AddTorrentSpec(&spec)
508         if !new {
509                 t.FailNow()
510         }
511         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
512         _, new, _ = cl.AddTorrentSpec(&spec)
513         assert.False(t, new)
514         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
515         // Because trackers are disabled in TestingConfig.
516         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
517 }
518
519 type badStorage struct{}
520
521 var _ storage.ClientImpl = badStorage{}
522
523 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
524         return bs, nil
525 }
526
527 func (bs badStorage) Close() error {
528         return nil
529 }
530
531 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
532         return badStoragePiece{p}
533 }
534
535 type badStoragePiece struct {
536         p metainfo.Piece
537 }
538
539 var _ storage.PieceImpl = badStoragePiece{}
540
541 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
542         return 0, nil
543 }
544
545 func (p badStoragePiece) Completion() storage.Completion {
546         return storage.Completion{Complete: true, Ok: true}
547 }
548
549 func (p badStoragePiece) MarkComplete() error {
550         return errors.New("psyyyyyyyche")
551 }
552
553 func (p badStoragePiece) MarkNotComplete() error {
554         return errors.New("psyyyyyyyche")
555 }
556
557 func (p badStoragePiece) randomlyTruncatedDataString() string {
558         return "hello, world\n"[:rand.Intn(14)]
559 }
560
561 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
562         r := strings.NewReader(p.randomlyTruncatedDataString())
563         return r.ReadAt(b, off+p.p.Offset())
564 }
565
566 // We read from a piece which is marked completed, but is missing data.
567 func TestCompletedPieceWrongSize(t *testing.T) {
568         cfg := TestingConfig()
569         cfg.DefaultStorage = badStorage{}
570         cl, err := NewClient(cfg)
571         require.NoError(t, err)
572         defer cl.Close()
573         info := metainfo.Info{
574                 PieceLength: 15,
575                 Pieces:      make([]byte, 20),
576                 Files: []metainfo.FileInfo{
577                         {Path: []string{"greeting"}, Length: 13},
578                 },
579         }
580         b, err := bencode.Marshal(info)
581         require.NoError(t, err)
582         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
583                 InfoBytes: b,
584                 InfoHash:  metainfo.HashBytes(b),
585         })
586         require.NoError(t, err)
587         defer tt.Drop()
588         assert.True(t, new)
589         r := tt.NewReader()
590         defer r.Close()
591         b, err = ioutil.ReadAll(r)
592         assert.Len(t, b, 13)
593         assert.NoError(t, err)
594 }
595
596 func BenchmarkAddLargeTorrent(b *testing.B) {
597         cfg := TestingConfig()
598         cfg.DisableTCP = true
599         cfg.DisableUTP = true
600         cfg.ListenAddr = "redonk"
601         cl, err := NewClient(cfg)
602         require.NoError(b, err)
603         defer cl.Close()
604         for range iter.N(b.N) {
605                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
606                 if err != nil {
607                         b.Fatal(err)
608                 }
609                 t.Drop()
610         }
611 }
612
613 func TestResponsive(t *testing.T) {
614         seederDataDir, mi := testutil.GreetingTestTorrent()
615         defer os.RemoveAll(seederDataDir)
616         cfg := TestingConfig()
617         cfg.Seed = true
618         cfg.DataDir = seederDataDir
619         seeder, err := NewClient(cfg)
620         require.Nil(t, err)
621         defer seeder.Close()
622         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
623         seederTorrent.VerifyData()
624         leecherDataDir, err := ioutil.TempDir("", "")
625         require.Nil(t, err)
626         defer os.RemoveAll(leecherDataDir)
627         cfg = TestingConfig()
628         cfg.DataDir = leecherDataDir
629         leecher, err := NewClient(cfg)
630         require.Nil(t, err)
631         defer leecher.Close()
632         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
633                 ret = TorrentSpecFromMetaInfo(mi)
634                 ret.ChunkSize = 2
635                 return
636         }())
637         addClientPeer(leecherTorrent, seeder)
638         reader := leecherTorrent.NewReader()
639         defer reader.Close()
640         reader.SetReadahead(0)
641         reader.SetResponsive()
642         b := make([]byte, 2)
643         _, err = reader.Seek(3, io.SeekStart)
644         require.NoError(t, err)
645         _, err = io.ReadFull(reader, b)
646         assert.Nil(t, err)
647         assert.EqualValues(t, "lo", string(b))
648         _, err = reader.Seek(11, io.SeekStart)
649         require.NoError(t, err)
650         n, err := io.ReadFull(reader, b)
651         assert.Nil(t, err)
652         assert.EqualValues(t, 2, n)
653         assert.EqualValues(t, "d\n", string(b))
654 }
655
656 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
657         seederDataDir, mi := testutil.GreetingTestTorrent()
658         defer os.RemoveAll(seederDataDir)
659         cfg := TestingConfig()
660         cfg.Seed = true
661         cfg.DataDir = seederDataDir
662         seeder, err := NewClient(cfg)
663         require.Nil(t, err)
664         defer seeder.Close()
665         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
666         seederTorrent.VerifyData()
667         leecherDataDir, err := ioutil.TempDir("", "")
668         require.Nil(t, err)
669         defer os.RemoveAll(leecherDataDir)
670         cfg = TestingConfig()
671         cfg.DataDir = leecherDataDir
672         leecher, err := NewClient(cfg)
673         require.Nil(t, err)
674         defer leecher.Close()
675         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
676                 ret = TorrentSpecFromMetaInfo(mi)
677                 ret.ChunkSize = 2
678                 return
679         }())
680         addClientPeer(leecherTorrent, seeder)
681         reader := leecherTorrent.NewReader()
682         defer reader.Close()
683         reader.SetReadahead(0)
684         reader.SetResponsive()
685         b := make([]byte, 2)
686         _, err = reader.Seek(3, io.SeekStart)
687         require.NoError(t, err)
688         _, err = io.ReadFull(reader, b)
689         assert.Nil(t, err)
690         assert.EqualValues(t, "lo", string(b))
691         go leecherTorrent.Drop()
692         _, err = reader.Seek(11, io.SeekStart)
693         require.NoError(t, err)
694         n, err := reader.Read(b)
695         assert.EqualError(t, err, "torrent closed")
696         assert.EqualValues(t, 0, n)
697 }
698
699 func TestDHTInheritBlocklist(t *testing.T) {
700         ipl := iplist.New(nil)
701         require.NotNil(t, ipl)
702         cfg := TestingConfig()
703         cfg.IPBlocklist = ipl
704         cfg.NoDHT = false
705         cl, err := NewClient(cfg)
706         require.NoError(t, err)
707         defer cl.Close()
708         require.Equal(t, ipl, cl.DHT().IPBlocklist())
709 }
710
711 // Check that stuff is merged in subsequent AddTorrentSpec for the same
712 // infohash.
713 func TestAddTorrentSpecMerging(t *testing.T) {
714         cl, err := NewClient(TestingConfig())
715         require.NoError(t, err)
716         defer cl.Close()
717         dir, mi := testutil.GreetingTestTorrent()
718         defer os.RemoveAll(dir)
719         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
720                 InfoHash: mi.HashInfoBytes(),
721         })
722         require.NoError(t, err)
723         require.True(t, new)
724         require.Nil(t, tt.Info())
725         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
726         require.NoError(t, err)
727         require.False(t, new)
728         require.NotNil(t, tt.Info())
729 }
730
731 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
732         dir, mi := testutil.GreetingTestTorrent()
733         os.RemoveAll(dir)
734         cl, _ := NewClient(TestingConfig())
735         defer cl.Close()
736         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
737                 InfoHash: mi.HashInfoBytes(),
738         })
739         tt.Drop()
740         assert.EqualValues(t, 0, len(cl.Torrents()))
741         select {
742         case <-tt.GotInfo():
743                 t.FailNow()
744         default:
745         }
746 }
747
748 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
749         for i := range iter.N(info.NumPieces()) {
750                 p := info.Piece(i)
751                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
752         }
753 }
754
755 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
756         fileCacheDir, err := ioutil.TempDir("", "")
757         require.NoError(t, err)
758         defer os.RemoveAll(fileCacheDir)
759         fileCache, err := filecache.NewCache(fileCacheDir)
760         require.NoError(t, err)
761         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
762         defer os.RemoveAll(greetingDataTempDir)
763         filePieceStore := csf(fileCache)
764         defer filePieceStore.Close()
765         info, err := greetingMetainfo.UnmarshalInfo()
766         require.NoError(t, err)
767         ih := greetingMetainfo.HashInfoBytes()
768         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
769         require.NoError(t, err)
770         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
771         // require.Equal(t, len(testutil.GreetingFileContents), written)
772         // require.NoError(t, err)
773         for i := 0; i < info.NumPieces(); i++ {
774                 p := info.Piece(i)
775                 if alreadyCompleted {
776                         require.NoError(t, greetingData.Piece(p).MarkComplete())
777                 }
778         }
779         cfg := TestingConfig()
780         // TODO: Disable network option?
781         cfg.DisableTCP = true
782         cfg.DisableUTP = true
783         cfg.DefaultStorage = filePieceStore
784         cl, err := NewClient(cfg)
785         require.NoError(t, err)
786         defer cl.Close()
787         tt, err := cl.AddTorrent(greetingMetainfo)
788         require.NoError(t, err)
789         psrs := tt.PieceStateRuns()
790         assert.Len(t, psrs, 1)
791         assert.EqualValues(t, 3, psrs[0].Length)
792         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
793         if alreadyCompleted {
794                 r := tt.NewReader()
795                 b, err := ioutil.ReadAll(r)
796                 assert.NoError(t, err)
797                 assert.EqualValues(t, testutil.GreetingFileContents, b)
798         }
799 }
800
801 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
802         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
803 }
804
805 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
806         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
807 }
808
809 func TestAddMetainfoWithNodes(t *testing.T) {
810         cfg := TestingConfig()
811         cfg.ListenAddr = ":0"
812         cfg.NoDHT = false
813         // For now, we want to just jam the nodes into the table, without
814         // verifying them first. Also the DHT code doesn't support mixing secure
815         // and insecure nodes if security is enabled (yet).
816         cfg.DHTConfig.NoSecurity = true
817         cl, err := NewClient(cfg)
818         require.NoError(t, err)
819         defer cl.Close()
820         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
821         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
822         require.NoError(t, err)
823         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
824         // check if the announce-list is here instead. TODO: Add nodes.
825         assert.Len(t, tt.metainfo.AnnounceList, 5)
826         // There are 6 nodes in the torrent file.
827         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
828 }
829
830 type testDownloadCancelParams struct {
831         ExportClientStatus        bool
832         SetLeecherStorageCapacity bool
833         LeecherStorageCapacity    int64
834         Cancel                    bool
835 }
836
837 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
838         greetingTempDir, mi := testutil.GreetingTestTorrent()
839         defer os.RemoveAll(greetingTempDir)
840         cfg := TestingConfig()
841         cfg.Seed = true
842         cfg.DataDir = greetingTempDir
843         seeder, err := NewClient(cfg)
844         require.NoError(t, err)
845         defer seeder.Close()
846         if ps.ExportClientStatus {
847                 testutil.ExportStatusWriter(seeder, "s")
848         }
849         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
850         seederTorrent.VerifyData()
851         leecherDataDir, err := ioutil.TempDir("", "")
852         require.NoError(t, err)
853         defer os.RemoveAll(leecherDataDir)
854         fc, err := filecache.NewCache(leecherDataDir)
855         require.NoError(t, err)
856         if ps.SetLeecherStorageCapacity {
857                 fc.SetCapacity(ps.LeecherStorageCapacity)
858         }
859         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
860         cfg.DataDir = leecherDataDir
861         leecher, _ := NewClient(cfg)
862         defer leecher.Close()
863         if ps.ExportClientStatus {
864                 testutil.ExportStatusWriter(leecher, "l")
865         }
866         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
867                 ret = TorrentSpecFromMetaInfo(mi)
868                 ret.ChunkSize = 2
869                 return
870         }())
871         require.NoError(t, err)
872         assert.True(t, new)
873         psc := leecherGreeting.SubscribePieceStateChanges()
874         defer psc.Close()
875         leecherGreeting.DownloadAll()
876         if ps.Cancel {
877                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
878         }
879         addClientPeer(leecherGreeting, seeder)
880         completes := make(map[int]bool, 3)
881 values:
882         for {
883                 // started := time.Now()
884                 select {
885                 case _v := <-psc.Values:
886                         // log.Print(time.Since(started))
887                         v := _v.(PieceStateChange)
888                         completes[v.Index] = v.Complete
889                 case <-time.After(100 * time.Millisecond):
890                         break values
891                 }
892         }
893         if ps.Cancel {
894                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
895         } else {
896                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
897         }
898
899 }
900
901 func TestTorrentDownloadAll(t *testing.T) {
902         testDownloadCancel(t, testDownloadCancelParams{})
903 }
904
905 func TestTorrentDownloadAllThenCancel(t *testing.T) {
906         testDownloadCancel(t, testDownloadCancelParams{
907                 Cancel: true,
908         })
909 }
910
911 // Ensure that it's an error for a peer to send an invalid have message.
912 func TestPeerInvalidHave(t *testing.T) {
913         cl, err := NewClient(TestingConfig())
914         require.NoError(t, err)
915         defer cl.Close()
916         info := metainfo.Info{
917                 PieceLength: 1,
918                 Pieces:      make([]byte, 20),
919                 Files:       []metainfo.FileInfo{{Length: 1}},
920         }
921         infoBytes, err := bencode.Marshal(info)
922         require.NoError(t, err)
923         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
924                 InfoBytes: infoBytes,
925                 InfoHash:  metainfo.HashBytes(infoBytes),
926                 Storage:   badStorage{},
927         })
928         require.NoError(t, err)
929         assert.True(t, _new)
930         defer tt.Drop()
931         cn := &connection{
932                 t: tt,
933         }
934         assert.NoError(t, cn.peerSentHave(0))
935         assert.Error(t, cn.peerSentHave(1))
936 }
937
938 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
939         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
940         defer os.RemoveAll(greetingTempDir)
941         cfg := TestingConfig()
942         cfg.DataDir = greetingTempDir
943         seeder, err := NewClient(TestingConfig())
944         require.NoError(t, err)
945         seeder.AddTorrentSpec(&TorrentSpec{
946                 InfoBytes: greetingMetainfo.InfoBytes,
947         })
948 }
949
950 func TestPrepareTrackerAnnounce(t *testing.T) {
951         cl := &Client{}
952         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
953         require.NoError(t, err)
954         assert.False(t, blocked)
955         assert.EqualValues(t, "localhost:1234", host)
956         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
957 }
958
959 // Check that when the listen port is 0, all the protocols listened on have
960 // the same port, and it isn't zero.
961 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
962         cl, err := NewClient(TestingConfig())
963         require.NoError(t, err)
964         defer cl.Close()
965         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
966         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
967 }
968
969 func TestClientDynamicListenTCPOnly(t *testing.T) {
970         cfg := TestingConfig()
971         cfg.DisableUTP = true
972         cl, err := NewClient(cfg)
973         require.NoError(t, err)
974         defer cl.Close()
975         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
976         assert.Nil(t, cl.utpSock)
977 }
978
979 func TestClientDynamicListenUTPOnly(t *testing.T) {
980         cfg := TestingConfig()
981         cfg.DisableTCP = true
982         cl, err := NewClient(cfg)
983         require.NoError(t, err)
984         defer cl.Close()
985         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
986         assert.Nil(t, cl.tcpListener)
987 }
988
989 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
990         cfg := TestingConfig()
991         cfg.DisableTCP = true
992         cfg.DisableUTP = true
993         cl, err := NewClient(cfg)
994         require.NoError(t, err)
995         defer cl.Close()
996         assert.Nil(t, cl.ListenAddr())
997 }
998
999 func addClientPeer(t *Torrent, cl *Client) {
1000         t.AddPeers([]Peer{
1001                 {
1002                         IP:   missinggo.AddrIP(cl.ListenAddr()),
1003                         Port: missinggo.AddrPort(cl.ListenAddr()),
1004                 },
1005         })
1006 }
1007
1008 func totalConns(tts []*Torrent) (ret int) {
1009         for _, tt := range tts {
1010                 tt.cl.mu.Lock()
1011                 ret += len(tt.conns)
1012                 tt.cl.mu.Unlock()
1013         }
1014         return
1015 }
1016
1017 func TestSetMaxEstablishedConn(t *testing.T) {
1018         var tts []*Torrent
1019         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1020         for i := range iter.N(3) {
1021                 cl, err := NewClient(TestingConfig())
1022                 require.NoError(t, err)
1023                 defer cl.Close()
1024                 tt, _ := cl.AddTorrentInfoHash(ih)
1025                 tt.SetMaxEstablishedConns(2)
1026                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1027                 tts = append(tts, tt)
1028         }
1029         addPeers := func() {
1030                 for i, tt := range tts {
1031                         for _, _tt := range tts[:i] {
1032                                 addClientPeer(tt, _tt.cl)
1033                         }
1034                 }
1035         }
1036         waitTotalConns := func(num int) {
1037                 for totalConns(tts) != num {
1038                         time.Sleep(time.Millisecond)
1039                 }
1040         }
1041         addPeers()
1042         waitTotalConns(6)
1043         tts[0].SetMaxEstablishedConns(1)
1044         waitTotalConns(4)
1045         tts[0].SetMaxEstablishedConns(0)
1046         waitTotalConns(2)
1047         tts[0].SetMaxEstablishedConns(1)
1048         addPeers()
1049         waitTotalConns(4)
1050         tts[0].SetMaxEstablishedConns(2)
1051         addPeers()
1052         waitTotalConns(6)
1053 }
1054
1055 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1056         os.MkdirAll(dir, 0770)
1057         file, err := os.Create(filepath.Join(dir, name))
1058         require.NoError(t, err)
1059         file.Write([]byte(name))
1060         file.Close()
1061         mi := metainfo.MetaInfo{}
1062         mi.SetDefaults()
1063         info := metainfo.Info{PieceLength: 256 * 1024}
1064         err = info.BuildFromFilePath(filepath.Join(dir, name))
1065         require.NoError(t, err)
1066         mi.InfoBytes, err = bencode.Marshal(info)
1067         require.NoError(t, err)
1068         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1069         tr, err := cl.AddTorrent(&mi)
1070         require.NoError(t, err)
1071         require.True(t, tr.Seeding())
1072         tr.VerifyData()
1073         return magnet
1074 }
1075
1076 // https://github.com/anacrolix/torrent/issues/114
1077 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1078         cfg := TestingConfig()
1079         cfg.DisableUTP = true
1080         cfg.Seed = true
1081         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1082         cfg.Debug = true
1083         cfg.ForceEncryption = true
1084         os.Mkdir(cfg.DataDir, 0755)
1085         server, err := NewClient(cfg)
1086         require.NoError(t, err)
1087         defer server.Close()
1088         testutil.ExportStatusWriter(server, "s")
1089         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1090         makeMagnet(t, server, cfg.DataDir, "test2")
1091         cfg = TestingConfig()
1092         cfg.DisableUTP = true
1093         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1094         cfg.Debug = true
1095         cfg.ForceEncryption = true
1096         client, err := NewClient(cfg)
1097         require.NoError(t, err)
1098         defer client.Close()
1099         testutil.ExportStatusWriter(client, "c")
1100         tr, err := client.AddMagnet(magnet1)
1101         require.NoError(t, err)
1102         tr.AddPeers([]Peer{{
1103                 IP:   missinggo.AddrIP(server.ListenAddr()),
1104                 Port: missinggo.AddrPort(server.ListenAddr()),
1105         }})
1106         <-tr.GotInfo()
1107         tr.DownloadAll()
1108         client.WaitAll()
1109 }
1110
1111 func TestClientAddressInUse(t *testing.T) {
1112         s, _ := NewUtpSocket("udp", ":50007")
1113         if s != nil {
1114                 defer s.Close()
1115         }
1116         cfg := TestingConfig()
1117         cfg.ListenAddr = ":50007"
1118         cl, err := NewClient(cfg)
1119         require.Error(t, err)
1120         require.Nil(t, cl)
1121 }