]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
fc008b99e399a9a334571ff8db6afa8af35cb078
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "math/rand"
10         "net"
11         "os"
12         "path/filepath"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/bradfitz/iter"
23         "github.com/stretchr/testify/assert"
24         "github.com/stretchr/testify/require"
25         "golang.org/x/time/rate"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/internal/testutil"
29         "github.com/anacrolix/torrent/iplist"
30         "github.com/anacrolix/torrent/metainfo"
31         "github.com/anacrolix/torrent/storage"
32 )
33
34 func TestingConfig() *Config {
35         return &Config{
36                 ListenAddr: "localhost:0",
37                 NoDHT:      true,
38                 DataDir: func() string {
39                         ret, err := ioutil.TempDir(tempDir, "")
40                         if err != nil {
41                                 panic(err)
42                         }
43                         return ret
44                 }(),
45                 DisableTrackers: true,
46                 Debug:           true,
47         }
48 }
49
50 func TestClientDefault(t *testing.T) {
51         cl, err := NewClient(TestingConfig())
52         require.NoError(t, err)
53         cl.Close()
54 }
55
56 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
57         cfg := TestingConfig()
58         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
59         require.NoError(t, err)
60         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
61         defer ci.Close()
62         cfg.DefaultStorage = ci
63         cl, err := NewClient(cfg)
64         require.NoError(t, err)
65         cl.Close()
66         // And again, https://github.com/anacrolix/torrent/issues/158
67         cl, err = NewClient(cfg)
68         require.NoError(t, err)
69         cl.Close()
70 }
71
72 func TestAddDropTorrent(t *testing.T) {
73         cl, err := NewClient(TestingConfig())
74         require.NoError(t, err)
75         defer cl.Close()
76         dir, mi := testutil.GreetingTestTorrent()
77         defer os.RemoveAll(dir)
78         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
79         require.NoError(t, err)
80         assert.True(t, new)
81         tt.SetMaxEstablishedConns(0)
82         tt.SetMaxEstablishedConns(1)
83         tt.Drop()
84 }
85
86 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
87         t.SkipNow()
88 }
89
90 func TestAddTorrentNoUsableURLs(t *testing.T) {
91         t.SkipNow()
92 }
93
94 func TestAddPeersToUnknownTorrent(t *testing.T) {
95         t.SkipNow()
96 }
97
98 func TestPieceHashSize(t *testing.T) {
99         if pieceHash.Size() != 20 {
100                 t.FailNow()
101         }
102 }
103
104 func TestTorrentInitialState(t *testing.T) {
105         dir, mi := testutil.GreetingTestTorrent()
106         defer os.RemoveAll(dir)
107         tor := &Torrent{
108                 infoHash:          mi.HashInfoBytes(),
109                 pieceStateChanges: pubsub.NewPubSub(),
110         }
111         tor.chunkSize = 2
112         tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion("/dev/null", storage.NewMapPieceCompletion()))
113         // Needed to lock for asynchronous piece verification.
114         tor.cl = new(Client)
115         tor.cl.mu.Lock()
116         err := tor.setInfoBytes(mi.InfoBytes)
117         tor.cl.mu.Unlock()
118         require.NoError(t, err)
119         require.Len(t, tor.pieces, 3)
120         tor.pendAllChunkSpecs(0)
121         tor.cl.mu.Lock()
122         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
123         tor.cl.mu.Unlock()
124         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
125 }
126
127 func TestUnmarshalPEXMsg(t *testing.T) {
128         var m peerExchangeMessage
129         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
130                 t.Fatal(err)
131         }
132         if len(m.Added) != 2 {
133                 t.FailNow()
134         }
135         if m.Added[0].Port != 0x506 {
136                 t.FailNow()
137         }
138 }
139
140 func TestReducedDialTimeout(t *testing.T) {
141         for _, _case := range []struct {
142                 Max             time.Duration
143                 HalfOpenLimit   int
144                 PendingPeers    int
145                 ExpectedReduced time.Duration
146         }{
147                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
148                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
149                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
150                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
151                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
152                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
153         } {
154                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
155                 expected := _case.ExpectedReduced
156                 if expected < minDialTimeout {
157                         expected = minDialTimeout
158                 }
159                 if reduced != expected {
160                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
161                 }
162         }
163 }
164
165 func TestUTPRawConn(t *testing.T) {
166         l, err := NewUtpSocket("udp", "")
167         require.NoError(t, err)
168         defer l.Close()
169         go func() {
170                 for {
171                         _, err := l.Accept()
172                         if err != nil {
173                                 break
174                         }
175                 }
176         }()
177         // Connect a UTP peer to see if the RawConn will still work.
178         s, err := NewUtpSocket("udp", "")
179         require.NoError(t, err)
180         defer s.Close()
181         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
182         require.NoError(t, err)
183         defer utpPeer.Close()
184         peer, err := net.ListenPacket("udp", ":0")
185         require.NoError(t, err)
186         defer peer.Close()
187
188         msgsReceived := 0
189         // How many messages to send. I've set this to double the channel buffer
190         // size in the raw packetConn.
191         const N = 200
192         readerStopped := make(chan struct{})
193         // The reader goroutine.
194         go func() {
195                 defer close(readerStopped)
196                 b := make([]byte, 500)
197                 for i := 0; i < N; i++ {
198                         n, _, err := l.ReadFrom(b)
199                         require.NoError(t, err)
200                         msgsReceived++
201                         var d int
202                         fmt.Sscan(string(b[:n]), &d)
203                         assert.Equal(t, i, d)
204                 }
205         }()
206         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
207         require.NoError(t, err)
208         for i := 0; i < N; i++ {
209                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
210                 require.NoError(t, err)
211                 time.Sleep(time.Millisecond)
212         }
213         select {
214         case <-readerStopped:
215         case <-time.After(time.Second):
216                 t.Fatal("reader timed out")
217         }
218         if msgsReceived != N {
219                 t.Fatalf("messages received: %d", msgsReceived)
220         }
221 }
222
223 func TestTwoClientsArbitraryPorts(t *testing.T) {
224         for i := 0; i < 2; i++ {
225                 cl, err := NewClient(TestingConfig())
226                 if err != nil {
227                         t.Fatal(err)
228                 }
229                 defer cl.Close()
230         }
231 }
232
233 func TestAddDropManyTorrents(t *testing.T) {
234         cl, err := NewClient(TestingConfig())
235         require.NoError(t, err)
236         defer cl.Close()
237         for i := range iter.N(1000) {
238                 var spec TorrentSpec
239                 binary.PutVarint(spec.InfoHash[:], int64(i))
240                 tt, new, err := cl.AddTorrentSpec(&spec)
241                 assert.NoError(t, err)
242                 assert.True(t, new)
243                 defer tt.Drop()
244         }
245 }
246
247 type FileCacheClientStorageFactoryParams struct {
248         Capacity    int64
249         SetCapacity bool
250         Wrapper     func(*filecache.Cache) storage.ClientImpl
251 }
252
253 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
254         return func(dataDir string) storage.ClientImpl {
255                 fc, err := filecache.NewCache(dataDir)
256                 if err != nil {
257                         panic(err)
258                 }
259                 if ps.SetCapacity {
260                         fc.SetCapacity(ps.Capacity)
261                 }
262                 return ps.Wrapper(fc)
263         }
264 }
265
266 type storageFactory func(string) storage.ClientImpl
267
268 func TestClientTransferDefault(t *testing.T) {
269         testClientTransfer(t, testClientTransferParams{
270                 ExportClientStatus: true,
271                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
272                         Wrapper: fileCachePieceResourceStorage,
273                 }),
274         })
275 }
276
277 func TestClientTransferRateLimitedUpload(t *testing.T) {
278         started := time.Now()
279         testClientTransfer(t, testClientTransferParams{
280                 // We are uploading 13 bytes (the length of the greeting torrent). The
281                 // chunks are 2 bytes in length. Then the smallest burst we can run
282                 // with is 2. Time taken is (13-burst)/rate.
283                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
284         })
285         require.True(t, time.Since(started) > time.Second)
286 }
287
288 func TestClientTransferRateLimitedDownload(t *testing.T) {
289         testClientTransfer(t, testClientTransferParams{
290                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
291         })
292 }
293
294 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
295         return storage.NewResourcePieces(fc.AsResourceProvider())
296 }
297
298 func TestClientTransferSmallCache(t *testing.T) {
299         testClientTransfer(t, testClientTransferParams{
300                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
301                         SetCapacity: true,
302                         // Going below the piece length means it can't complete a piece so
303                         // that it can be hashed.
304                         Capacity: 5,
305                         Wrapper:  fileCachePieceResourceStorage,
306                 }),
307                 SetReadahead: true,
308                 // Can't readahead too far or the cache will thrash and drop data we
309                 // thought we had.
310                 Readahead:          0,
311                 ExportClientStatus: true,
312         })
313 }
314
315 func TestClientTransferVarious(t *testing.T) {
316         // Leecher storage
317         for _, ls := range []storageFactory{
318                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
319                         Wrapper: fileCachePieceResourceStorage,
320                 }),
321                 storage.NewBoltDB,
322         } {
323                 // Seeder storage
324                 for _, ss := range []func(string) storage.ClientImpl{
325                         storage.NewFile,
326                         storage.NewMMap,
327                 } {
328                         for _, responsive := range []bool{false, true} {
329                                 testClientTransfer(t, testClientTransferParams{
330                                         Responsive:     responsive,
331                                         SeederStorage:  ss,
332                                         LeecherStorage: ls,
333                                 })
334                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
335                                         testClientTransfer(t, testClientTransferParams{
336                                                 SeederStorage:  ss,
337                                                 Responsive:     responsive,
338                                                 SetReadahead:   true,
339                                                 Readahead:      readahead,
340                                                 LeecherStorage: ls,
341                                         })
342                                 }
343                         }
344                 }
345         }
346 }
347
348 type testClientTransferParams struct {
349         Responsive                 bool
350         Readahead                  int64
351         SetReadahead               bool
352         ExportClientStatus         bool
353         LeecherStorage             func(string) storage.ClientImpl
354         SeederStorage              func(string) storage.ClientImpl
355         SeederUploadRateLimiter    *rate.Limiter
356         LeecherDownloadRateLimiter *rate.Limiter
357 }
358
359 // Creates a seeder and a leecher, and ensures the data transfers when a read
360 // is attempted on the leecher.
361 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
362         greetingTempDir, mi := testutil.GreetingTestTorrent()
363         defer os.RemoveAll(greetingTempDir)
364         // Create seeder and a Torrent.
365         cfg := TestingConfig()
366         cfg.Seed = true
367         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
368         // cfg.ListenAddr = "localhost:4000"
369         if ps.SeederStorage != nil {
370                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
371                 defer cfg.DefaultStorage.Close()
372         } else {
373                 cfg.DataDir = greetingTempDir
374         }
375         seeder, err := NewClient(cfg)
376         require.NoError(t, err)
377         defer seeder.Close()
378         if ps.ExportClientStatus {
379                 testutil.ExportStatusWriter(seeder, "s")
380         }
381         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
382         seederTorrent.VerifyData()
383         // Create leecher and a Torrent.
384         leecherDataDir, err := ioutil.TempDir("", "")
385         require.NoError(t, err)
386         defer os.RemoveAll(leecherDataDir)
387         if ps.LeecherStorage == nil {
388                 cfg.DataDir = leecherDataDir
389         } else {
390                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
391         }
392         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
393         // cfg.ListenAddr = "localhost:4001"
394         leecher, err := NewClient(cfg)
395         require.NoError(t, err)
396         defer leecher.Close()
397         if ps.ExportClientStatus {
398                 testutil.ExportStatusWriter(leecher, "l")
399         }
400         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
401                 ret = TorrentSpecFromMetaInfo(mi)
402                 ret.ChunkSize = 2
403                 return
404         }())
405         require.NoError(t, err)
406         assert.True(t, new)
407         // Now do some things with leecher and seeder.
408         addClientPeer(leecherGreeting, seeder)
409         r := leecherGreeting.NewReader()
410         defer r.Close()
411         if ps.Responsive {
412                 r.SetResponsive()
413         }
414         if ps.SetReadahead {
415                 r.SetReadahead(ps.Readahead)
416         }
417         assertReadAllGreeting(t, r)
418         // After one read through, we can assume certain torrent statistics.
419         // These are not a strict requirement. It is however interesting to
420         // follow.
421         // t.Logf("%#v", seederTorrent.Stats())
422         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
423         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
424         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
425         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
426         // Read through again for the cases where the torrent data size exceeds
427         // the size of the cache.
428         assertReadAllGreeting(t, r)
429 }
430
431 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
432         pos, err := r.Seek(0, os.SEEK_SET)
433         assert.NoError(t, err)
434         assert.EqualValues(t, 0, pos)
435         _greeting, err := ioutil.ReadAll(r)
436         assert.NoError(t, err)
437         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
438 }
439
440 // Check that after completing leeching, a leecher transitions to a seeding
441 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
442 func TestSeedAfterDownloading(t *testing.T) {
443         greetingTempDir, mi := testutil.GreetingTestTorrent()
444         defer os.RemoveAll(greetingTempDir)
445         cfg := TestingConfig()
446         cfg.Seed = true
447         cfg.DataDir = greetingTempDir
448         seeder, err := NewClient(cfg)
449         require.NoError(t, err)
450         defer seeder.Close()
451         testutil.ExportStatusWriter(seeder, "s")
452         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
453         seederTorrent.VerifyData()
454         cfg.DataDir, err = ioutil.TempDir("", "")
455         require.NoError(t, err)
456         defer os.RemoveAll(cfg.DataDir)
457         leecher, err := NewClient(cfg)
458         require.NoError(t, err)
459         defer leecher.Close()
460         testutil.ExportStatusWriter(leecher, "l")
461         cfg.Seed = false
462         // cfg.TorrentDataOpener = nil
463         cfg.DataDir, err = ioutil.TempDir("", "")
464         require.NoError(t, err)
465         defer os.RemoveAll(cfg.DataDir)
466         leecherLeecher, _ := NewClient(cfg)
467         defer leecherLeecher.Close()
468         testutil.ExportStatusWriter(leecherLeecher, "ll")
469         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
470                 ret = TorrentSpecFromMetaInfo(mi)
471                 ret.ChunkSize = 2
472                 return
473         }())
474         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
475                 ret = TorrentSpecFromMetaInfo(mi)
476                 ret.ChunkSize = 3
477                 return
478         }())
479         // Simultaneously DownloadAll in Leecher, and read the contents
480         // consecutively in LeecherLeecher. This non-deterministically triggered a
481         // case where the leecher wouldn't unchoke the LeecherLeecher.
482         var wg sync.WaitGroup
483         wg.Add(1)
484         go func() {
485                 defer wg.Done()
486                 r := llg.NewReader()
487                 defer r.Close()
488                 b, err := ioutil.ReadAll(r)
489                 require.NoError(t, err)
490                 assert.EqualValues(t, testutil.GreetingFileContents, b)
491         }()
492         addClientPeer(leecherGreeting, seeder)
493         addClientPeer(leecherGreeting, leecherLeecher)
494         wg.Add(1)
495         go func() {
496                 defer wg.Done()
497                 leecherGreeting.DownloadAll()
498                 leecher.WaitAll()
499         }()
500         wg.Wait()
501 }
502
503 func TestMergingTrackersByAddingSpecs(t *testing.T) {
504         cl, err := NewClient(TestingConfig())
505         require.NoError(t, err)
506         defer cl.Close()
507         spec := TorrentSpec{}
508         T, new, _ := cl.AddTorrentSpec(&spec)
509         if !new {
510                 t.FailNow()
511         }
512         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
513         _, new, _ = cl.AddTorrentSpec(&spec)
514         assert.False(t, new)
515         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
516         // Because trackers are disabled in TestingConfig.
517         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
518 }
519
520 type badStorage struct{}
521
522 var _ storage.ClientImpl = badStorage{}
523
524 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
525         return bs, nil
526 }
527
528 func (bs badStorage) Close() error {
529         return nil
530 }
531
532 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
533         return badStoragePiece{p}
534 }
535
536 type badStoragePiece struct {
537         p metainfo.Piece
538 }
539
540 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
541         return 0, nil
542 }
543
544 func (p badStoragePiece) GetIsComplete() bool {
545         return true
546 }
547
548 func (p badStoragePiece) MarkComplete() error {
549         return errors.New("psyyyyyyyche")
550 }
551
552 func (p badStoragePiece) MarkNotComplete() error {
553         return errors.New("psyyyyyyyche")
554 }
555
556 func (p badStoragePiece) randomlyTruncatedDataString() string {
557         return "hello, world\n"[:rand.Intn(14)]
558 }
559
560 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
561         r := strings.NewReader(p.randomlyTruncatedDataString())
562         return r.ReadAt(b, off+p.p.Offset())
563 }
564
565 // We read from a piece which is marked completed, but is missing data.
566 func TestCompletedPieceWrongSize(t *testing.T) {
567         cfg := TestingConfig()
568         cfg.DefaultStorage = badStorage{}
569         cl, err := NewClient(cfg)
570         require.NoError(t, err)
571         defer cl.Close()
572         info := metainfo.Info{
573                 PieceLength: 15,
574                 Pieces:      make([]byte, 20),
575                 Files: []metainfo.FileInfo{
576                         {Path: []string{"greeting"}, Length: 13},
577                 },
578         }
579         b, err := bencode.Marshal(info)
580         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
581                 InfoBytes: b,
582                 InfoHash:  metainfo.HashBytes(b),
583         })
584         require.NoError(t, err)
585         defer tt.Drop()
586         assert.True(t, new)
587         r := tt.NewReader()
588         defer r.Close()
589         b, err = ioutil.ReadAll(r)
590         assert.Len(t, b, 13)
591         assert.NoError(t, err)
592 }
593
594 func BenchmarkAddLargeTorrent(b *testing.B) {
595         cfg := TestingConfig()
596         cfg.DisableTCP = true
597         cfg.DisableUTP = true
598         cfg.ListenAddr = "redonk"
599         cl, err := NewClient(cfg)
600         require.NoError(b, err)
601         defer cl.Close()
602         for range iter.N(b.N) {
603                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
604                 if err != nil {
605                         b.Fatal(err)
606                 }
607                 t.Drop()
608         }
609 }
610
611 func TestResponsive(t *testing.T) {
612         seederDataDir, mi := testutil.GreetingTestTorrent()
613         defer os.RemoveAll(seederDataDir)
614         cfg := TestingConfig()
615         cfg.Seed = true
616         cfg.DataDir = seederDataDir
617         seeder, err := NewClient(cfg)
618         require.Nil(t, err)
619         defer seeder.Close()
620         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
621         seederTorrent.VerifyData()
622         leecherDataDir, err := ioutil.TempDir("", "")
623         require.Nil(t, err)
624         defer os.RemoveAll(leecherDataDir)
625         cfg = TestingConfig()
626         cfg.DataDir = leecherDataDir
627         leecher, err := NewClient(cfg)
628         require.Nil(t, err)
629         defer leecher.Close()
630         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
631                 ret = TorrentSpecFromMetaInfo(mi)
632                 ret.ChunkSize = 2
633                 return
634         }())
635         addClientPeer(leecherTorrent, seeder)
636         reader := leecherTorrent.NewReader()
637         defer reader.Close()
638         reader.SetReadahead(0)
639         reader.SetResponsive()
640         b := make([]byte, 2)
641         _, err = reader.Seek(3, os.SEEK_SET)
642         require.NoError(t, err)
643         _, err = io.ReadFull(reader, b)
644         assert.Nil(t, err)
645         assert.EqualValues(t, "lo", string(b))
646         _, err = reader.Seek(11, os.SEEK_SET)
647         require.NoError(t, err)
648         n, err := io.ReadFull(reader, b)
649         assert.Nil(t, err)
650         assert.EqualValues(t, 2, n)
651         assert.EqualValues(t, "d\n", string(b))
652 }
653
654 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
655         seederDataDir, mi := testutil.GreetingTestTorrent()
656         defer os.RemoveAll(seederDataDir)
657         cfg := TestingConfig()
658         cfg.Seed = true
659         cfg.DataDir = seederDataDir
660         seeder, err := NewClient(cfg)
661         require.Nil(t, err)
662         defer seeder.Close()
663         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
664         seederTorrent.VerifyData()
665         leecherDataDir, err := ioutil.TempDir("", "")
666         require.Nil(t, err)
667         defer os.RemoveAll(leecherDataDir)
668         cfg = TestingConfig()
669         cfg.DataDir = leecherDataDir
670         leecher, err := NewClient(cfg)
671         require.Nil(t, err)
672         defer leecher.Close()
673         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
674                 ret = TorrentSpecFromMetaInfo(mi)
675                 ret.ChunkSize = 2
676                 return
677         }())
678         addClientPeer(leecherTorrent, seeder)
679         reader := leecherTorrent.NewReader()
680         defer reader.Close()
681         reader.SetReadahead(0)
682         reader.SetResponsive()
683         b := make([]byte, 2)
684         _, err = reader.Seek(3, os.SEEK_SET)
685         require.NoError(t, err)
686         _, err = io.ReadFull(reader, b)
687         assert.Nil(t, err)
688         assert.EqualValues(t, "lo", string(b))
689         go leecherTorrent.Drop()
690         _, err = reader.Seek(11, os.SEEK_SET)
691         require.NoError(t, err)
692         n, err := reader.Read(b)
693         assert.EqualError(t, err, "torrent closed")
694         assert.EqualValues(t, 0, n)
695 }
696
697 func TestDHTInheritBlocklist(t *testing.T) {
698         ipl := iplist.New(nil)
699         require.NotNil(t, ipl)
700         cfg := TestingConfig()
701         cfg.IPBlocklist = ipl
702         cfg.NoDHT = false
703         cl, err := NewClient(cfg)
704         require.NoError(t, err)
705         defer cl.Close()
706         require.Equal(t, ipl, cl.DHT().IPBlocklist())
707 }
708
709 // Check that stuff is merged in subsequent AddTorrentSpec for the same
710 // infohash.
711 func TestAddTorrentSpecMerging(t *testing.T) {
712         cl, err := NewClient(TestingConfig())
713         require.NoError(t, err)
714         defer cl.Close()
715         dir, mi := testutil.GreetingTestTorrent()
716         defer os.RemoveAll(dir)
717         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
718                 InfoHash: mi.HashInfoBytes(),
719         })
720         require.NoError(t, err)
721         require.True(t, new)
722         require.Nil(t, tt.Info())
723         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
724         require.NoError(t, err)
725         require.False(t, new)
726         require.NotNil(t, tt.Info())
727 }
728
729 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
730         dir, mi := testutil.GreetingTestTorrent()
731         os.RemoveAll(dir)
732         cl, _ := NewClient(TestingConfig())
733         defer cl.Close()
734         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
735                 InfoHash: mi.HashInfoBytes(),
736         })
737         tt.Drop()
738         assert.EqualValues(t, 0, len(cl.Torrents()))
739         select {
740         case <-tt.GotInfo():
741                 t.FailNow()
742         default:
743         }
744 }
745
746 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
747         for i := range iter.N(info.NumPieces()) {
748                 p := info.Piece(i)
749                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
750         }
751 }
752
753 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
754         fileCacheDir, err := ioutil.TempDir("", "")
755         require.NoError(t, err)
756         defer os.RemoveAll(fileCacheDir)
757         fileCache, err := filecache.NewCache(fileCacheDir)
758         require.NoError(t, err)
759         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
760         defer os.RemoveAll(greetingDataTempDir)
761         filePieceStore := csf(fileCache)
762         defer filePieceStore.Close()
763         info, err := greetingMetainfo.UnmarshalInfo()
764         require.NoError(t, err)
765         ih := greetingMetainfo.HashInfoBytes()
766         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
767         require.NoError(t, err)
768         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
769         // require.Equal(t, len(testutil.GreetingFileContents), written)
770         // require.NoError(t, err)
771         for i := 0; i < info.NumPieces(); i++ {
772                 p := info.Piece(i)
773                 if alreadyCompleted {
774                         require.NoError(t, greetingData.Piece(p).MarkComplete())
775                 }
776         }
777         cfg := TestingConfig()
778         // TODO: Disable network option?
779         cfg.DisableTCP = true
780         cfg.DisableUTP = true
781         cfg.DefaultStorage = filePieceStore
782         cl, err := NewClient(cfg)
783         require.NoError(t, err)
784         defer cl.Close()
785         tt, err := cl.AddTorrent(greetingMetainfo)
786         require.NoError(t, err)
787         psrs := tt.PieceStateRuns()
788         assert.Len(t, psrs, 1)
789         assert.EqualValues(t, 3, psrs[0].Length)
790         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
791         if alreadyCompleted {
792                 r := tt.NewReader()
793                 b, err := ioutil.ReadAll(r)
794                 assert.NoError(t, err)
795                 assert.EqualValues(t, testutil.GreetingFileContents, b)
796         }
797 }
798
799 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
800         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
801 }
802
803 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
804         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
805 }
806
807 func TestAddMetainfoWithNodes(t *testing.T) {
808         cfg := TestingConfig()
809         cfg.ListenAddr = ":0"
810         cfg.NoDHT = false
811         // For now, we want to just jam the nodes into the table, without
812         // verifying them first. Also the DHT code doesn't support mixing secure
813         // and insecure nodes if security is enabled (yet).
814         cfg.DHTConfig.NoSecurity = true
815         cl, err := NewClient(cfg)
816         require.NoError(t, err)
817         defer cl.Close()
818         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
819         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
820         require.NoError(t, err)
821         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
822         // check if the announce-list is here instead. TODO: Add nodes.
823         assert.Len(t, tt.metainfo.AnnounceList, 5)
824         // There are 6 nodes in the torrent file.
825         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
826 }
827
828 type testDownloadCancelParams struct {
829         ExportClientStatus        bool
830         SetLeecherStorageCapacity bool
831         LeecherStorageCapacity    int64
832         Cancel                    bool
833 }
834
835 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
836         greetingTempDir, mi := testutil.GreetingTestTorrent()
837         defer os.RemoveAll(greetingTempDir)
838         cfg := TestingConfig()
839         cfg.Seed = true
840         cfg.DataDir = greetingTempDir
841         seeder, err := NewClient(cfg)
842         require.NoError(t, err)
843         defer seeder.Close()
844         if ps.ExportClientStatus {
845                 testutil.ExportStatusWriter(seeder, "s")
846         }
847         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
848         seederTorrent.VerifyData()
849         leecherDataDir, err := ioutil.TempDir("", "")
850         require.NoError(t, err)
851         defer os.RemoveAll(leecherDataDir)
852         fc, err := filecache.NewCache(leecherDataDir)
853         require.NoError(t, err)
854         if ps.SetLeecherStorageCapacity {
855                 fc.SetCapacity(ps.LeecherStorageCapacity)
856         }
857         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
858         cfg.DataDir = leecherDataDir
859         leecher, _ := NewClient(cfg)
860         defer leecher.Close()
861         if ps.ExportClientStatus {
862                 testutil.ExportStatusWriter(leecher, "l")
863         }
864         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
865                 ret = TorrentSpecFromMetaInfo(mi)
866                 ret.ChunkSize = 2
867                 return
868         }())
869         require.NoError(t, err)
870         assert.True(t, new)
871         psc := leecherGreeting.SubscribePieceStateChanges()
872         defer psc.Close()
873         leecherGreeting.DownloadAll()
874         if ps.Cancel {
875                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
876         }
877         addClientPeer(leecherGreeting, seeder)
878         completes := make(map[int]bool, 3)
879 values:
880         for {
881                 // started := time.Now()
882                 select {
883                 case _v := <-psc.Values:
884                         // log.Print(time.Since(started))
885                         v := _v.(PieceStateChange)
886                         completes[v.Index] = v.Complete
887                 case <-time.After(100 * time.Millisecond):
888                         break values
889                 }
890         }
891         if ps.Cancel {
892                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
893         } else {
894                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
895         }
896
897 }
898
899 func TestTorrentDownloadAll(t *testing.T) {
900         testDownloadCancel(t, testDownloadCancelParams{})
901 }
902
903 func TestTorrentDownloadAllThenCancel(t *testing.T) {
904         testDownloadCancel(t, testDownloadCancelParams{
905                 Cancel: true,
906         })
907 }
908
909 // Ensure that it's an error for a peer to send an invalid have message.
910 func TestPeerInvalidHave(t *testing.T) {
911         cl, err := NewClient(TestingConfig())
912         require.NoError(t, err)
913         defer cl.Close()
914         info := metainfo.Info{
915                 PieceLength: 1,
916                 Pieces:      make([]byte, 20),
917                 Files:       []metainfo.FileInfo{{Length: 1}},
918         }
919         infoBytes, err := bencode.Marshal(info)
920         require.NoError(t, err)
921         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
922                 InfoBytes: infoBytes,
923                 InfoHash:  metainfo.HashBytes(infoBytes),
924                 Storage:   badStorage{},
925         })
926         require.NoError(t, err)
927         assert.True(t, _new)
928         defer tt.Drop()
929         cn := &connection{
930                 t: tt,
931         }
932         assert.NoError(t, cn.peerSentHave(0))
933         assert.Error(t, cn.peerSentHave(1))
934 }
935
936 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
937         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
938         defer os.RemoveAll(greetingTempDir)
939         cfg := TestingConfig()
940         cfg.DataDir = greetingTempDir
941         seeder, err := NewClient(TestingConfig())
942         require.NoError(t, err)
943         seeder.AddTorrentSpec(&TorrentSpec{
944                 InfoBytes: greetingMetainfo.InfoBytes,
945         })
946 }
947
948 func TestPrepareTrackerAnnounce(t *testing.T) {
949         cl := &Client{}
950         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
951         require.NoError(t, err)
952         assert.False(t, blocked)
953         assert.EqualValues(t, "localhost:1234", host)
954         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
955 }
956
957 // Check that when the listen port is 0, all the protocols listened on have
958 // the same port, and it isn't zero.
959 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
960         cl, err := NewClient(TestingConfig())
961         require.NoError(t, err)
962         defer cl.Close()
963         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
964         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
965 }
966
967 func TestClientDynamicListenTCPOnly(t *testing.T) {
968         cfg := TestingConfig()
969         cfg.DisableUTP = true
970         cl, err := NewClient(cfg)
971         require.NoError(t, err)
972         defer cl.Close()
973         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
974         assert.Nil(t, cl.utpSock)
975 }
976
977 func TestClientDynamicListenUTPOnly(t *testing.T) {
978         cfg := TestingConfig()
979         cfg.DisableTCP = true
980         cl, err := NewClient(cfg)
981         require.NoError(t, err)
982         defer cl.Close()
983         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
984         assert.Nil(t, cl.tcpListener)
985 }
986
987 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
988         cfg := TestingConfig()
989         cfg.DisableTCP = true
990         cfg.DisableUTP = true
991         cl, err := NewClient(cfg)
992         require.NoError(t, err)
993         defer cl.Close()
994         assert.Nil(t, cl.ListenAddr())
995 }
996
997 func addClientPeer(t *Torrent, cl *Client) {
998         t.AddPeers([]Peer{
999                 {
1000                         IP:   missinggo.AddrIP(cl.ListenAddr()),
1001                         Port: missinggo.AddrPort(cl.ListenAddr()),
1002                 },
1003         })
1004 }
1005
1006 func totalConns(tts []*Torrent) (ret int) {
1007         for _, tt := range tts {
1008                 tt.cl.mu.Lock()
1009                 ret += len(tt.conns)
1010                 tt.cl.mu.Unlock()
1011         }
1012         return
1013 }
1014
1015 func TestSetMaxEstablishedConn(t *testing.T) {
1016         var tts []*Torrent
1017         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1018         for i := range iter.N(3) {
1019                 cl, err := NewClient(TestingConfig())
1020                 require.NoError(t, err)
1021                 defer cl.Close()
1022                 tt, _ := cl.AddTorrentInfoHash(ih)
1023                 tt.SetMaxEstablishedConns(2)
1024                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1025                 tts = append(tts, tt)
1026         }
1027         addPeers := func() {
1028                 for i, tt := range tts {
1029                         for _, _tt := range tts[:i] {
1030                                 addClientPeer(tt, _tt.cl)
1031                         }
1032                 }
1033         }
1034         waitTotalConns := func(num int) {
1035                 for totalConns(tts) != num {
1036                         time.Sleep(time.Millisecond)
1037                 }
1038         }
1039         addPeers()
1040         waitTotalConns(6)
1041         tts[0].SetMaxEstablishedConns(1)
1042         waitTotalConns(4)
1043         tts[0].SetMaxEstablishedConns(0)
1044         waitTotalConns(2)
1045         tts[0].SetMaxEstablishedConns(1)
1046         addPeers()
1047         waitTotalConns(4)
1048         tts[0].SetMaxEstablishedConns(2)
1049         addPeers()
1050         waitTotalConns(6)
1051 }
1052
1053 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1054         os.MkdirAll(dir, 0770)
1055         file, err := os.Create(filepath.Join(dir, name))
1056         require.NoError(t, err)
1057         file.Write([]byte(name))
1058         file.Close()
1059         mi := metainfo.MetaInfo{}
1060         mi.SetDefaults()
1061         info := metainfo.Info{PieceLength: 256 * 1024}
1062         err = info.BuildFromFilePath(filepath.Join(dir, name))
1063         require.NoError(t, err)
1064         mi.InfoBytes, err = bencode.Marshal(info)
1065         require.NoError(t, err)
1066         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1067         tr, err := cl.AddTorrent(&mi)
1068         require.NoError(t, err)
1069         require.True(t, tr.Seeding())
1070         tr.VerifyData()
1071         return magnet
1072 }
1073
1074 // https://github.com/anacrolix/torrent/issues/114
1075 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1076         cfg := TestingConfig()
1077         cfg.DisableUTP = true
1078         cfg.Seed = true
1079         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1080         cfg.Debug = true
1081         cfg.ForceEncryption = true
1082         os.Mkdir(cfg.DataDir, 0755)
1083         server, err := NewClient(cfg)
1084         require.NoError(t, err)
1085         defer server.Close()
1086         testutil.ExportStatusWriter(server, "s")
1087         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1088         makeMagnet(t, server, cfg.DataDir, "test2")
1089         cfg = TestingConfig()
1090         cfg.DisableUTP = true
1091         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1092         cfg.Debug = true
1093         cfg.ForceEncryption = true
1094         client, err := NewClient(cfg)
1095         require.NoError(t, err)
1096         defer client.Close()
1097         testutil.ExportStatusWriter(client, "c")
1098         tr, err := client.AddMagnet(magnet1)
1099         require.NoError(t, err)
1100         tr.AddPeers([]Peer{{
1101                 IP:   missinggo.AddrIP(server.ListenAddr()),
1102                 Port: missinggo.AddrPort(server.ListenAddr()),
1103         }})
1104         <-tr.GotInfo()
1105         tr.DownloadAll()
1106         client.WaitAll()
1107 }
1108
1109 func TestClientAddressInUse(t *testing.T) {
1110         s, _ := NewUtpSocket("udp", ":50007")
1111         if s != nil {
1112                 defer s.Close()
1113         }
1114         cfg := TestingConfig()
1115         cfg.ListenAddr = ":50007"
1116         cl, err := NewClient(cfg)
1117         require.Error(t, err)
1118         require.Nil(t, cl)
1119 }