]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
More development of the new logging interface
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "math/rand"
10         "net"
11         "os"
12         "path/filepath"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/bradfitz/iter"
22         "github.com/stretchr/testify/assert"
23         "github.com/stretchr/testify/require"
24         "golang.org/x/time/rate"
25
26         "github.com/anacrolix/torrent/bencode"
27         "github.com/anacrolix/torrent/internal/testutil"
28         "github.com/anacrolix/torrent/iplist"
29         "github.com/anacrolix/torrent/metainfo"
30         "github.com/anacrolix/torrent/storage"
31 )
32
33 func TestingConfig() *Config {
34         return &Config{
35                 ListenAddr:              "localhost:0",
36                 NoDHT:                   true,
37                 DataDir:                 tempDir(),
38                 DisableTrackers:         true,
39                 NoDefaultPortForwarding: true,
40                 // Debug:           true,
41         }
42 }
43
44 func TestClientDefault(t *testing.T) {
45         cl, err := NewClient(TestingConfig())
46         require.NoError(t, err)
47         cl.Close()
48 }
49
50 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
51         cfg := TestingConfig()
52         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
53         require.NoError(t, err)
54         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
55         defer ci.Close()
56         cfg.DefaultStorage = ci
57         cl, err := NewClient(cfg)
58         require.NoError(t, err)
59         cl.Close()
60         // And again, https://github.com/anacrolix/torrent/issues/158
61         cl, err = NewClient(cfg)
62         require.NoError(t, err)
63         cl.Close()
64 }
65
66 func TestAddDropTorrent(t *testing.T) {
67         cl, err := NewClient(TestingConfig())
68         require.NoError(t, err)
69         defer cl.Close()
70         dir, mi := testutil.GreetingTestTorrent()
71         defer os.RemoveAll(dir)
72         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
73         require.NoError(t, err)
74         assert.True(t, new)
75         tt.SetMaxEstablishedConns(0)
76         tt.SetMaxEstablishedConns(1)
77         tt.Drop()
78 }
79
80 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
81         // TODO?
82         t.SkipNow()
83 }
84
85 func TestAddTorrentNoUsableURLs(t *testing.T) {
86         // TODO?
87         t.SkipNow()
88 }
89
90 func TestAddPeersToUnknownTorrent(t *testing.T) {
91         // TODO?
92         t.SkipNow()
93 }
94
95 func TestPieceHashSize(t *testing.T) {
96         assert.Equal(t, 20, pieceHash.Size())
97 }
98
99 func TestTorrentInitialState(t *testing.T) {
100         dir, mi := testutil.GreetingTestTorrent()
101         defer os.RemoveAll(dir)
102         cl := &Client{}
103         cl.initLogger()
104         tor := cl.newTorrent(
105                 mi.HashInfoBytes(),
106                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
107         )
108         tor.setChunkSize(2)
109         tor.cl.mu.Lock()
110         err := tor.setInfoBytes(mi.InfoBytes)
111         tor.cl.mu.Unlock()
112         require.NoError(t, err)
113         require.Len(t, tor.pieces, 3)
114         tor.pendAllChunkSpecs(0)
115         tor.cl.mu.Lock()
116         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
117         tor.cl.mu.Unlock()
118         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
119 }
120
121 func TestUnmarshalPEXMsg(t *testing.T) {
122         var m peerExchangeMessage
123         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
124                 t.Fatal(err)
125         }
126         if len(m.Added) != 2 {
127                 t.FailNow()
128         }
129         if m.Added[0].Port != 0x506 {
130                 t.FailNow()
131         }
132 }
133
134 func TestReducedDialTimeout(t *testing.T) {
135         cfg := &Config{}
136         cfg.setDefaults()
137         for _, _case := range []struct {
138                 Max             time.Duration
139                 HalfOpenLimit   int
140                 PendingPeers    int
141                 ExpectedReduced time.Duration
142         }{
143                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
144                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
145                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
146                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
147                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
148                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
149         } {
150                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
151                 expected := _case.ExpectedReduced
152                 if expected < cfg.MinDialTimeout {
153                         expected = cfg.MinDialTimeout
154                 }
155                 if reduced != expected {
156                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
157                 }
158         }
159 }
160
161 func TestUTPRawConn(t *testing.T) {
162         l, err := NewUtpSocket("udp", "")
163         require.NoError(t, err)
164         defer l.Close()
165         go func() {
166                 for {
167                         _, err := l.Accept()
168                         if err != nil {
169                                 break
170                         }
171                 }
172         }()
173         // Connect a UTP peer to see if the RawConn will still work.
174         s, err := NewUtpSocket("udp", "")
175         require.NoError(t, err)
176         defer s.Close()
177         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
178         require.NoError(t, err)
179         defer utpPeer.Close()
180         peer, err := net.ListenPacket("udp", ":0")
181         require.NoError(t, err)
182         defer peer.Close()
183
184         msgsReceived := 0
185         // How many messages to send. I've set this to double the channel buffer
186         // size in the raw packetConn.
187         const N = 200
188         readerStopped := make(chan struct{})
189         // The reader goroutine.
190         go func() {
191                 defer close(readerStopped)
192                 b := make([]byte, 500)
193                 for i := 0; i < N; i++ {
194                         n, _, err := l.ReadFrom(b)
195                         require.NoError(t, err)
196                         msgsReceived++
197                         var d int
198                         fmt.Sscan(string(b[:n]), &d)
199                         assert.Equal(t, i, d)
200                 }
201         }()
202         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
203         require.NoError(t, err)
204         for i := 0; i < N; i++ {
205                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
206                 require.NoError(t, err)
207                 time.Sleep(time.Millisecond)
208         }
209         select {
210         case <-readerStopped:
211         case <-time.After(time.Second):
212                 t.Fatal("reader timed out")
213         }
214         if msgsReceived != N {
215                 t.Fatalf("messages received: %d", msgsReceived)
216         }
217 }
218
219 func TestTwoClientsArbitraryPorts(t *testing.T) {
220         for i := 0; i < 2; i++ {
221                 cl, err := NewClient(TestingConfig())
222                 if err != nil {
223                         t.Fatal(err)
224                 }
225                 defer cl.Close()
226         }
227 }
228
229 func TestAddDropManyTorrents(t *testing.T) {
230         cl, err := NewClient(TestingConfig())
231         require.NoError(t, err)
232         defer cl.Close()
233         for i := range iter.N(1000) {
234                 var spec TorrentSpec
235                 binary.PutVarint(spec.InfoHash[:], int64(i))
236                 tt, new, err := cl.AddTorrentSpec(&spec)
237                 assert.NoError(t, err)
238                 assert.True(t, new)
239                 defer tt.Drop()
240         }
241 }
242
243 type FileCacheClientStorageFactoryParams struct {
244         Capacity    int64
245         SetCapacity bool
246         Wrapper     func(*filecache.Cache) storage.ClientImpl
247 }
248
249 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
250         return func(dataDir string) storage.ClientImpl {
251                 fc, err := filecache.NewCache(dataDir)
252                 if err != nil {
253                         panic(err)
254                 }
255                 if ps.SetCapacity {
256                         fc.SetCapacity(ps.Capacity)
257                 }
258                 return ps.Wrapper(fc)
259         }
260 }
261
262 type storageFactory func(string) storage.ClientImpl
263
264 func TestClientTransferDefault(t *testing.T) {
265         testClientTransfer(t, testClientTransferParams{
266                 ExportClientStatus: true,
267                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
268                         Wrapper: fileCachePieceResourceStorage,
269                 }),
270         })
271 }
272
273 func TestClientTransferRateLimitedUpload(t *testing.T) {
274         started := time.Now()
275         testClientTransfer(t, testClientTransferParams{
276                 // We are uploading 13 bytes (the length of the greeting torrent). The
277                 // chunks are 2 bytes in length. Then the smallest burst we can run
278                 // with is 2. Time taken is (13-burst)/rate.
279                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
280         })
281         require.True(t, time.Since(started) > time.Second)
282 }
283
284 func TestClientTransferRateLimitedDownload(t *testing.T) {
285         testClientTransfer(t, testClientTransferParams{
286                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
287         })
288 }
289
290 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
291         return storage.NewResourcePieces(fc.AsResourceProvider())
292 }
293
294 func TestClientTransferSmallCache(t *testing.T) {
295         testClientTransfer(t, testClientTransferParams{
296                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
297                         SetCapacity: true,
298                         // Going below the piece length means it can't complete a piece so
299                         // that it can be hashed.
300                         Capacity: 5,
301                         Wrapper:  fileCachePieceResourceStorage,
302                 }),
303                 SetReadahead: true,
304                 // Can't readahead too far or the cache will thrash and drop data we
305                 // thought we had.
306                 Readahead:          0,
307                 ExportClientStatus: true,
308         })
309 }
310
311 func TestClientTransferVarious(t *testing.T) {
312         // Leecher storage
313         for _, ls := range []storageFactory{
314                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
315                         Wrapper: fileCachePieceResourceStorage,
316                 }),
317                 storage.NewBoltDB,
318         } {
319                 // Seeder storage
320                 for _, ss := range []func(string) storage.ClientImpl{
321                         storage.NewFile,
322                         storage.NewMMap,
323                 } {
324                         for _, responsive := range []bool{false, true} {
325                                 testClientTransfer(t, testClientTransferParams{
326                                         Responsive:     responsive,
327                                         SeederStorage:  ss,
328                                         LeecherStorage: ls,
329                                 })
330                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
331                                         testClientTransfer(t, testClientTransferParams{
332                                                 SeederStorage:  ss,
333                                                 Responsive:     responsive,
334                                                 SetReadahead:   true,
335                                                 Readahead:      readahead,
336                                                 LeecherStorage: ls,
337                                         })
338                                 }
339                         }
340                 }
341         }
342 }
343
344 type testClientTransferParams struct {
345         Responsive                 bool
346         Readahead                  int64
347         SetReadahead               bool
348         ExportClientStatus         bool
349         LeecherStorage             func(string) storage.ClientImpl
350         SeederStorage              func(string) storage.ClientImpl
351         SeederUploadRateLimiter    *rate.Limiter
352         LeecherDownloadRateLimiter *rate.Limiter
353 }
354
355 // Creates a seeder and a leecher, and ensures the data transfers when a read
356 // is attempted on the leecher.
357 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
358         greetingTempDir, mi := testutil.GreetingTestTorrent()
359         defer os.RemoveAll(greetingTempDir)
360         // Create seeder and a Torrent.
361         cfg := TestingConfig()
362         cfg.Seed = true
363         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
364         // cfg.ListenAddr = "localhost:4000"
365         if ps.SeederStorage != nil {
366                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
367                 defer cfg.DefaultStorage.Close()
368         } else {
369                 cfg.DataDir = greetingTempDir
370         }
371         seeder, err := NewClient(cfg)
372         require.NoError(t, err)
373         if ps.ExportClientStatus {
374                 testutil.ExportStatusWriter(seeder, "s")
375         }
376         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
377         // Run a Stats right after Closing the Client. This will trigger the Stats
378         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
379         defer seederTorrent.Stats()
380         defer seeder.Close()
381         seederTorrent.VerifyData()
382         // Create leecher and a Torrent.
383         leecherDataDir, err := ioutil.TempDir("", "")
384         require.NoError(t, err)
385         defer os.RemoveAll(leecherDataDir)
386         if ps.LeecherStorage == nil {
387                 cfg.DataDir = leecherDataDir
388         } else {
389                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
390         }
391         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
392         // cfg.ListenAddr = "localhost:4001"
393         leecher, err := NewClient(cfg)
394         require.NoError(t, err)
395         defer leecher.Close()
396         if ps.ExportClientStatus {
397                 testutil.ExportStatusWriter(leecher, "l")
398         }
399         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
400                 ret = TorrentSpecFromMetaInfo(mi)
401                 ret.ChunkSize = 2
402                 return
403         }())
404         require.NoError(t, err)
405         assert.True(t, new)
406         // Now do some things with leecher and seeder.
407         addClientPeer(leecherGreeting, seeder)
408         r := leecherGreeting.NewReader()
409         defer r.Close()
410         if ps.Responsive {
411                 r.SetResponsive()
412         }
413         if ps.SetReadahead {
414                 r.SetReadahead(ps.Readahead)
415         }
416         assertReadAllGreeting(t, r)
417         // After one read through, we can assume certain torrent statistics.
418         // These are not a strict requirement. It is however interesting to
419         // follow.
420         // t.Logf("%#v", seederTorrent.Stats())
421         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
422         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
423         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
424         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
425         // Read through again for the cases where the torrent data size exceeds
426         // the size of the cache.
427         assertReadAllGreeting(t, r)
428 }
429
430 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
431         pos, err := r.Seek(0, io.SeekStart)
432         assert.NoError(t, err)
433         assert.EqualValues(t, 0, pos)
434         _greeting, err := ioutil.ReadAll(r)
435         assert.NoError(t, err)
436         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
437 }
438
439 // Check that after completing leeching, a leecher transitions to a seeding
440 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
441 func TestSeedAfterDownloading(t *testing.T) {
442         greetingTempDir, mi := testutil.GreetingTestTorrent()
443         defer os.RemoveAll(greetingTempDir)
444         cfg := TestingConfig()
445         cfg.Seed = true
446         cfg.DataDir = greetingTempDir
447         seeder, err := NewClient(cfg)
448         require.NoError(t, err)
449         defer seeder.Close()
450         testutil.ExportStatusWriter(seeder, "s")
451         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
452         seederTorrent.VerifyData()
453         cfg.DataDir, err = ioutil.TempDir("", "")
454         require.NoError(t, err)
455         defer os.RemoveAll(cfg.DataDir)
456         leecher, err := NewClient(cfg)
457         require.NoError(t, err)
458         defer leecher.Close()
459         testutil.ExportStatusWriter(leecher, "l")
460         cfg.Seed = false
461         cfg.DataDir, err = ioutil.TempDir("", "")
462         require.NoError(t, err)
463         defer os.RemoveAll(cfg.DataDir)
464         leecherLeecher, _ := NewClient(cfg)
465         defer leecherLeecher.Close()
466         testutil.ExportStatusWriter(leecherLeecher, "ll")
467         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
468                 ret = TorrentSpecFromMetaInfo(mi)
469                 ret.ChunkSize = 2
470                 return
471         }())
472         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
473                 ret = TorrentSpecFromMetaInfo(mi)
474                 ret.ChunkSize = 3
475                 return
476         }())
477         // Simultaneously DownloadAll in Leecher, and read the contents
478         // consecutively in LeecherLeecher. This non-deterministically triggered a
479         // case where the leecher wouldn't unchoke the LeecherLeecher.
480         var wg sync.WaitGroup
481         wg.Add(1)
482         go func() {
483                 defer wg.Done()
484                 r := llg.NewReader()
485                 defer r.Close()
486                 b, err := ioutil.ReadAll(r)
487                 require.NoError(t, err)
488                 assert.EqualValues(t, testutil.GreetingFileContents, b)
489         }()
490         addClientPeer(leecherGreeting, seeder)
491         addClientPeer(leecherGreeting, leecherLeecher)
492         wg.Add(1)
493         go func() {
494                 defer wg.Done()
495                 leecherGreeting.DownloadAll()
496                 leecher.WaitAll()
497         }()
498         wg.Wait()
499 }
500
501 func TestMergingTrackersByAddingSpecs(t *testing.T) {
502         cl, err := NewClient(TestingConfig())
503         require.NoError(t, err)
504         defer cl.Close()
505         spec := TorrentSpec{}
506         T, new, _ := cl.AddTorrentSpec(&spec)
507         if !new {
508                 t.FailNow()
509         }
510         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
511         _, new, _ = cl.AddTorrentSpec(&spec)
512         assert.False(t, new)
513         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
514         // Because trackers are disabled in TestingConfig.
515         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
516 }
517
518 type badStorage struct{}
519
520 var _ storage.ClientImpl = badStorage{}
521
522 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
523         return bs, nil
524 }
525
526 func (bs badStorage) Close() error {
527         return nil
528 }
529
530 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
531         return badStoragePiece{p}
532 }
533
534 type badStoragePiece struct {
535         p metainfo.Piece
536 }
537
538 var _ storage.PieceImpl = badStoragePiece{}
539
540 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
541         return 0, nil
542 }
543
544 func (p badStoragePiece) Completion() storage.Completion {
545         return storage.Completion{Complete: true, Ok: true}
546 }
547
548 func (p badStoragePiece) MarkComplete() error {
549         return errors.New("psyyyyyyyche")
550 }
551
552 func (p badStoragePiece) MarkNotComplete() error {
553         return errors.New("psyyyyyyyche")
554 }
555
556 func (p badStoragePiece) randomlyTruncatedDataString() string {
557         return "hello, world\n"[:rand.Intn(14)]
558 }
559
560 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
561         r := strings.NewReader(p.randomlyTruncatedDataString())
562         return r.ReadAt(b, off+p.p.Offset())
563 }
564
565 // We read from a piece which is marked completed, but is missing data.
566 func TestCompletedPieceWrongSize(t *testing.T) {
567         cfg := TestingConfig()
568         cfg.DefaultStorage = badStorage{}
569         cl, err := NewClient(cfg)
570         require.NoError(t, err)
571         defer cl.Close()
572         info := metainfo.Info{
573                 PieceLength: 15,
574                 Pieces:      make([]byte, 20),
575                 Files: []metainfo.FileInfo{
576                         {Path: []string{"greeting"}, Length: 13},
577                 },
578         }
579         b, err := bencode.Marshal(info)
580         require.NoError(t, err)
581         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
582                 InfoBytes: b,
583                 InfoHash:  metainfo.HashBytes(b),
584         })
585         require.NoError(t, err)
586         defer tt.Drop()
587         assert.True(t, new)
588         r := tt.NewReader()
589         defer r.Close()
590         b, err = ioutil.ReadAll(r)
591         assert.Len(t, b, 13)
592         assert.NoError(t, err)
593 }
594
595 func BenchmarkAddLargeTorrent(b *testing.B) {
596         cfg := TestingConfig()
597         cfg.DisableTCP = true
598         cfg.DisableUTP = true
599         cfg.ListenAddr = "redonk"
600         cl, err := NewClient(cfg)
601         require.NoError(b, err)
602         defer cl.Close()
603         for range iter.N(b.N) {
604                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
605                 if err != nil {
606                         b.Fatal(err)
607                 }
608                 t.Drop()
609         }
610 }
611
612 func TestResponsive(t *testing.T) {
613         seederDataDir, mi := testutil.GreetingTestTorrent()
614         defer os.RemoveAll(seederDataDir)
615         cfg := TestingConfig()
616         cfg.Seed = true
617         cfg.DataDir = seederDataDir
618         seeder, err := NewClient(cfg)
619         require.Nil(t, err)
620         defer seeder.Close()
621         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
622         seederTorrent.VerifyData()
623         leecherDataDir, err := ioutil.TempDir("", "")
624         require.Nil(t, err)
625         defer os.RemoveAll(leecherDataDir)
626         cfg = TestingConfig()
627         cfg.DataDir = leecherDataDir
628         leecher, err := NewClient(cfg)
629         require.Nil(t, err)
630         defer leecher.Close()
631         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
632                 ret = TorrentSpecFromMetaInfo(mi)
633                 ret.ChunkSize = 2
634                 return
635         }())
636         addClientPeer(leecherTorrent, seeder)
637         reader := leecherTorrent.NewReader()
638         defer reader.Close()
639         reader.SetReadahead(0)
640         reader.SetResponsive()
641         b := make([]byte, 2)
642         _, err = reader.Seek(3, io.SeekStart)
643         require.NoError(t, err)
644         _, err = io.ReadFull(reader, b)
645         assert.Nil(t, err)
646         assert.EqualValues(t, "lo", string(b))
647         _, err = reader.Seek(11, io.SeekStart)
648         require.NoError(t, err)
649         n, err := io.ReadFull(reader, b)
650         assert.Nil(t, err)
651         assert.EqualValues(t, 2, n)
652         assert.EqualValues(t, "d\n", string(b))
653 }
654
655 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
656         seederDataDir, mi := testutil.GreetingTestTorrent()
657         defer os.RemoveAll(seederDataDir)
658         cfg := TestingConfig()
659         cfg.Seed = true
660         cfg.DataDir = seederDataDir
661         seeder, err := NewClient(cfg)
662         require.Nil(t, err)
663         defer seeder.Close()
664         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
665         seederTorrent.VerifyData()
666         leecherDataDir, err := ioutil.TempDir("", "")
667         require.Nil(t, err)
668         defer os.RemoveAll(leecherDataDir)
669         cfg = TestingConfig()
670         cfg.DataDir = leecherDataDir
671         leecher, err := NewClient(cfg)
672         require.Nil(t, err)
673         defer leecher.Close()
674         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
675                 ret = TorrentSpecFromMetaInfo(mi)
676                 ret.ChunkSize = 2
677                 return
678         }())
679         addClientPeer(leecherTorrent, seeder)
680         reader := leecherTorrent.NewReader()
681         defer reader.Close()
682         reader.SetReadahead(0)
683         reader.SetResponsive()
684         b := make([]byte, 2)
685         _, err = reader.Seek(3, io.SeekStart)
686         require.NoError(t, err)
687         _, err = io.ReadFull(reader, b)
688         assert.Nil(t, err)
689         assert.EqualValues(t, "lo", string(b))
690         go leecherTorrent.Drop()
691         _, err = reader.Seek(11, io.SeekStart)
692         require.NoError(t, err)
693         n, err := reader.Read(b)
694         assert.EqualError(t, err, "torrent closed")
695         assert.EqualValues(t, 0, n)
696 }
697
698 func TestDHTInheritBlocklist(t *testing.T) {
699         ipl := iplist.New(nil)
700         require.NotNil(t, ipl)
701         cfg := TestingConfig()
702         cfg.IPBlocklist = ipl
703         cfg.NoDHT = false
704         cl, err := NewClient(cfg)
705         require.NoError(t, err)
706         defer cl.Close()
707         require.Equal(t, ipl, cl.DHT().IPBlocklist())
708 }
709
710 // Check that stuff is merged in subsequent AddTorrentSpec for the same
711 // infohash.
712 func TestAddTorrentSpecMerging(t *testing.T) {
713         cl, err := NewClient(TestingConfig())
714         require.NoError(t, err)
715         defer cl.Close()
716         dir, mi := testutil.GreetingTestTorrent()
717         defer os.RemoveAll(dir)
718         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
719                 InfoHash: mi.HashInfoBytes(),
720         })
721         require.NoError(t, err)
722         require.True(t, new)
723         require.Nil(t, tt.Info())
724         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
725         require.NoError(t, err)
726         require.False(t, new)
727         require.NotNil(t, tt.Info())
728 }
729
730 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
731         dir, mi := testutil.GreetingTestTorrent()
732         os.RemoveAll(dir)
733         cl, _ := NewClient(TestingConfig())
734         defer cl.Close()
735         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
736                 InfoHash: mi.HashInfoBytes(),
737         })
738         tt.Drop()
739         assert.EqualValues(t, 0, len(cl.Torrents()))
740         select {
741         case <-tt.GotInfo():
742                 t.FailNow()
743         default:
744         }
745 }
746
747 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
748         for i := range iter.N(info.NumPieces()) {
749                 p := info.Piece(i)
750                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
751         }
752 }
753
754 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
755         fileCacheDir, err := ioutil.TempDir("", "")
756         require.NoError(t, err)
757         defer os.RemoveAll(fileCacheDir)
758         fileCache, err := filecache.NewCache(fileCacheDir)
759         require.NoError(t, err)
760         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
761         defer os.RemoveAll(greetingDataTempDir)
762         filePieceStore := csf(fileCache)
763         defer filePieceStore.Close()
764         info, err := greetingMetainfo.UnmarshalInfo()
765         require.NoError(t, err)
766         ih := greetingMetainfo.HashInfoBytes()
767         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
768         require.NoError(t, err)
769         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
770         // require.Equal(t, len(testutil.GreetingFileContents), written)
771         // require.NoError(t, err)
772         for i := 0; i < info.NumPieces(); i++ {
773                 p := info.Piece(i)
774                 if alreadyCompleted {
775                         require.NoError(t, greetingData.Piece(p).MarkComplete())
776                 }
777         }
778         cfg := TestingConfig()
779         // TODO: Disable network option?
780         cfg.DisableTCP = true
781         cfg.DisableUTP = true
782         cfg.DefaultStorage = filePieceStore
783         cl, err := NewClient(cfg)
784         require.NoError(t, err)
785         defer cl.Close()
786         tt, err := cl.AddTorrent(greetingMetainfo)
787         require.NoError(t, err)
788         psrs := tt.PieceStateRuns()
789         assert.Len(t, psrs, 1)
790         assert.EqualValues(t, 3, psrs[0].Length)
791         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
792         if alreadyCompleted {
793                 r := tt.NewReader()
794                 b, err := ioutil.ReadAll(r)
795                 assert.NoError(t, err)
796                 assert.EqualValues(t, testutil.GreetingFileContents, b)
797         }
798 }
799
800 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
801         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
802 }
803
804 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
805         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
806 }
807
808 func TestAddMetainfoWithNodes(t *testing.T) {
809         cfg := TestingConfig()
810         cfg.ListenAddr = ":0"
811         cfg.NoDHT = false
812         // For now, we want to just jam the nodes into the table, without
813         // verifying them first. Also the DHT code doesn't support mixing secure
814         // and insecure nodes if security is enabled (yet).
815         cfg.DHTConfig.NoSecurity = true
816         cl, err := NewClient(cfg)
817         require.NoError(t, err)
818         defer cl.Close()
819         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
820         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
821         require.NoError(t, err)
822         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
823         // check if the announce-list is here instead. TODO: Add nodes.
824         assert.Len(t, tt.metainfo.AnnounceList, 5)
825         // There are 6 nodes in the torrent file.
826         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
827 }
828
829 type testDownloadCancelParams struct {
830         ExportClientStatus        bool
831         SetLeecherStorageCapacity bool
832         LeecherStorageCapacity    int64
833         Cancel                    bool
834 }
835
836 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
837         greetingTempDir, mi := testutil.GreetingTestTorrent()
838         defer os.RemoveAll(greetingTempDir)
839         cfg := TestingConfig()
840         cfg.Seed = true
841         cfg.DataDir = greetingTempDir
842         seeder, err := NewClient(cfg)
843         require.NoError(t, err)
844         defer seeder.Close()
845         if ps.ExportClientStatus {
846                 testutil.ExportStatusWriter(seeder, "s")
847         }
848         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
849         seederTorrent.VerifyData()
850         leecherDataDir, err := ioutil.TempDir("", "")
851         require.NoError(t, err)
852         defer os.RemoveAll(leecherDataDir)
853         fc, err := filecache.NewCache(leecherDataDir)
854         require.NoError(t, err)
855         if ps.SetLeecherStorageCapacity {
856                 fc.SetCapacity(ps.LeecherStorageCapacity)
857         }
858         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
859         cfg.DataDir = leecherDataDir
860         leecher, _ := NewClient(cfg)
861         defer leecher.Close()
862         if ps.ExportClientStatus {
863                 testutil.ExportStatusWriter(leecher, "l")
864         }
865         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
866                 ret = TorrentSpecFromMetaInfo(mi)
867                 ret.ChunkSize = 2
868                 return
869         }())
870         require.NoError(t, err)
871         assert.True(t, new)
872         psc := leecherGreeting.SubscribePieceStateChanges()
873         defer psc.Close()
874
875         leecherGreeting.cl.mu.Lock()
876         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
877         if ps.Cancel {
878                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
879         }
880         leecherGreeting.cl.mu.Unlock()
881
882         addClientPeer(leecherGreeting, seeder)
883         completes := make(map[int]bool, 3)
884 values:
885         for {
886                 // started := time.Now()
887                 select {
888                 case _v := <-psc.Values:
889                         // log.Print(time.Since(started))
890                         v := _v.(PieceStateChange)
891                         completes[v.Index] = v.Complete
892                 case <-time.After(100 * time.Millisecond):
893                         break values
894                 }
895         }
896         if ps.Cancel {
897                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
898         } else {
899                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
900         }
901
902 }
903
904 func TestTorrentDownloadAll(t *testing.T) {
905         testDownloadCancel(t, testDownloadCancelParams{})
906 }
907
908 func TestTorrentDownloadAllThenCancel(t *testing.T) {
909         testDownloadCancel(t, testDownloadCancelParams{
910                 Cancel: true,
911         })
912 }
913
914 // Ensure that it's an error for a peer to send an invalid have message.
915 func TestPeerInvalidHave(t *testing.T) {
916         cl, err := NewClient(TestingConfig())
917         require.NoError(t, err)
918         defer cl.Close()
919         info := metainfo.Info{
920                 PieceLength: 1,
921                 Pieces:      make([]byte, 20),
922                 Files:       []metainfo.FileInfo{{Length: 1}},
923         }
924         infoBytes, err := bencode.Marshal(info)
925         require.NoError(t, err)
926         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
927                 InfoBytes: infoBytes,
928                 InfoHash:  metainfo.HashBytes(infoBytes),
929                 Storage:   badStorage{},
930         })
931         require.NoError(t, err)
932         assert.True(t, _new)
933         defer tt.Drop()
934         cn := &connection{
935                 t: tt,
936         }
937         assert.NoError(t, cn.peerSentHave(0))
938         assert.Error(t, cn.peerSentHave(1))
939 }
940
941 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
942         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
943         defer os.RemoveAll(greetingTempDir)
944         cfg := TestingConfig()
945         cfg.DataDir = greetingTempDir
946         seeder, err := NewClient(TestingConfig())
947         require.NoError(t, err)
948         seeder.AddTorrentSpec(&TorrentSpec{
949                 InfoBytes: greetingMetainfo.InfoBytes,
950         })
951 }
952
953 func TestPrepareTrackerAnnounce(t *testing.T) {
954         cl := &Client{}
955         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
956         require.NoError(t, err)
957         assert.False(t, blocked)
958         assert.EqualValues(t, "localhost:1234", host)
959         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
960 }
961
962 // Check that when the listen port is 0, all the protocols listened on have
963 // the same port, and it isn't zero.
964 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
965         cl, err := NewClient(TestingConfig())
966         require.NoError(t, err)
967         defer cl.Close()
968         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
969         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
970 }
971
972 func TestClientDynamicListenTCPOnly(t *testing.T) {
973         cfg := TestingConfig()
974         cfg.DisableUTP = true
975         cl, err := NewClient(cfg)
976         require.NoError(t, err)
977         defer cl.Close()
978         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
979         assert.Nil(t, cl.utpSock)
980 }
981
982 func TestClientDynamicListenUTPOnly(t *testing.T) {
983         cfg := TestingConfig()
984         cfg.DisableTCP = true
985         cl, err := NewClient(cfg)
986         require.NoError(t, err)
987         defer cl.Close()
988         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
989         assert.Nil(t, cl.tcpListener)
990 }
991
992 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
993         cfg := TestingConfig()
994         cfg.DisableTCP = true
995         cfg.DisableUTP = true
996         cl, err := NewClient(cfg)
997         require.NoError(t, err)
998         defer cl.Close()
999         assert.Nil(t, cl.ListenAddr())
1000 }
1001
1002 func addClientPeer(t *Torrent, cl *Client) {
1003         t.AddPeers([]Peer{
1004                 {
1005                         IP:   missinggo.AddrIP(cl.ListenAddr()),
1006                         Port: missinggo.AddrPort(cl.ListenAddr()),
1007                 },
1008         })
1009 }
1010
1011 func totalConns(tts []*Torrent) (ret int) {
1012         for _, tt := range tts {
1013                 tt.cl.mu.Lock()
1014                 ret += len(tt.conns)
1015                 tt.cl.mu.Unlock()
1016         }
1017         return
1018 }
1019
1020 func TestSetMaxEstablishedConn(t *testing.T) {
1021         var tts []*Torrent
1022         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1023         for i := range iter.N(3) {
1024                 cl, err := NewClient(TestingConfig())
1025                 require.NoError(t, err)
1026                 defer cl.Close()
1027                 tt, _ := cl.AddTorrentInfoHash(ih)
1028                 tt.SetMaxEstablishedConns(2)
1029                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1030                 tts = append(tts, tt)
1031         }
1032         addPeers := func() {
1033                 for i, tt := range tts {
1034                         for _, _tt := range tts[:i] {
1035                                 addClientPeer(tt, _tt.cl)
1036                         }
1037                 }
1038         }
1039         waitTotalConns := func(num int) {
1040                 for totalConns(tts) != num {
1041                         time.Sleep(time.Millisecond)
1042                 }
1043         }
1044         addPeers()
1045         waitTotalConns(6)
1046         tts[0].SetMaxEstablishedConns(1)
1047         waitTotalConns(4)
1048         tts[0].SetMaxEstablishedConns(0)
1049         waitTotalConns(2)
1050         tts[0].SetMaxEstablishedConns(1)
1051         addPeers()
1052         waitTotalConns(4)
1053         tts[0].SetMaxEstablishedConns(2)
1054         addPeers()
1055         waitTotalConns(6)
1056 }
1057
1058 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1059         os.MkdirAll(dir, 0770)
1060         file, err := os.Create(filepath.Join(dir, name))
1061         require.NoError(t, err)
1062         file.Write([]byte(name))
1063         file.Close()
1064         mi := metainfo.MetaInfo{}
1065         mi.SetDefaults()
1066         info := metainfo.Info{PieceLength: 256 * 1024}
1067         err = info.BuildFromFilePath(filepath.Join(dir, name))
1068         require.NoError(t, err)
1069         mi.InfoBytes, err = bencode.Marshal(info)
1070         require.NoError(t, err)
1071         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1072         tr, err := cl.AddTorrent(&mi)
1073         require.NoError(t, err)
1074         require.True(t, tr.Seeding())
1075         tr.VerifyData()
1076         return magnet
1077 }
1078
1079 // https://github.com/anacrolix/torrent/issues/114
1080 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1081         cfg := TestingConfig()
1082         cfg.DisableUTP = true
1083         cfg.Seed = true
1084         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1085         cfg.ForceEncryption = true
1086         os.Mkdir(cfg.DataDir, 0755)
1087         server, err := NewClient(cfg)
1088         require.NoError(t, err)
1089         defer server.Close()
1090         testutil.ExportStatusWriter(server, "s")
1091         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1092         makeMagnet(t, server, cfg.DataDir, "test2")
1093         cfg = TestingConfig()
1094         cfg.DisableUTP = true
1095         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1096         cfg.ForceEncryption = true
1097         client, err := NewClient(cfg)
1098         require.NoError(t, err)
1099         defer client.Close()
1100         testutil.ExportStatusWriter(client, "c")
1101         tr, err := client.AddMagnet(magnet1)
1102         require.NoError(t, err)
1103         tr.AddPeers([]Peer{{
1104                 IP:   missinggo.AddrIP(server.ListenAddr()),
1105                 Port: missinggo.AddrPort(server.ListenAddr()),
1106         }})
1107         <-tr.GotInfo()
1108         tr.DownloadAll()
1109         client.WaitAll()
1110 }
1111
1112 func TestClientAddressInUse(t *testing.T) {
1113         s, _ := NewUtpSocket("udp", ":50007")
1114         if s != nil {
1115                 defer s.Close()
1116         }
1117         cfg := TestingConfig()
1118         cfg.ListenAddr = ":50007"
1119         cl, err := NewClient(cfg)
1120         require.Error(t, err)
1121         require.Nil(t, cl)
1122 }