]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Use require in some places in TestUTPRawConn
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "math/rand"
11         "net"
12         "os"
13         "path/filepath"
14         "strings"
15         "sync"
16         "testing"
17         "time"
18
19         _ "github.com/anacrolix/envpprof"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/filecache"
22         "github.com/anacrolix/missinggo/pubsub"
23         "github.com/bradfitz/iter"
24         "github.com/stretchr/testify/assert"
25         "github.com/stretchr/testify/require"
26         "golang.org/x/time/rate"
27
28         "github.com/anacrolix/torrent/bencode"
29         "github.com/anacrolix/torrent/internal/testutil"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32         "github.com/anacrolix/torrent/storage"
33 )
34
35 func init() {
36         log.SetFlags(log.LstdFlags | log.Llongfile)
37 }
38
39 func TestingConfig() *Config {
40         return &Config{
41                 ListenAddr:      "localhost:0",
42                 NoDHT:           true,
43                 DisableTrackers: true,
44                 DataDir: func() string {
45                         ret, err := ioutil.TempDir("", "")
46                         if err != nil {
47                                 panic(err)
48                         }
49                         return ret
50                 }(),
51                 Debug: true,
52         }
53 }
54
55 func TestClientDefault(t *testing.T) {
56         cl, err := NewClient(TestingConfig())
57         require.NoError(t, err)
58         cl.Close()
59 }
60
61 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
62         cfg := TestingConfig()
63         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
64         require.NoError(t, err)
65         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
66         defer ci.Close()
67         cfg.DefaultStorage = ci
68         cl, err := NewClient(cfg)
69         require.NoError(t, err)
70         cl.Close()
71         // And again, https://github.com/anacrolix/torrent/issues/158
72         cl, err = NewClient(cfg)
73         require.NoError(t, err)
74         cl.Close()
75 }
76
77 func TestAddDropTorrent(t *testing.T) {
78         cl, err := NewClient(TestingConfig())
79         require.NoError(t, err)
80         defer cl.Close()
81         dir, mi := testutil.GreetingTestTorrent()
82         defer os.RemoveAll(dir)
83         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
84         require.NoError(t, err)
85         assert.True(t, new)
86         tt.SetMaxEstablishedConns(0)
87         tt.SetMaxEstablishedConns(1)
88         tt.Drop()
89 }
90
91 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
92         t.SkipNow()
93 }
94
95 func TestAddTorrentNoUsableURLs(t *testing.T) {
96         t.SkipNow()
97 }
98
99 func TestAddPeersToUnknownTorrent(t *testing.T) {
100         t.SkipNow()
101 }
102
103 func TestPieceHashSize(t *testing.T) {
104         if pieceHash.Size() != 20 {
105                 t.FailNow()
106         }
107 }
108
109 func TestTorrentInitialState(t *testing.T) {
110         dir, mi := testutil.GreetingTestTorrent()
111         defer os.RemoveAll(dir)
112         tor := &Torrent{
113                 infoHash:          mi.HashInfoBytes(),
114                 pieceStateChanges: pubsub.NewPubSub(),
115         }
116         tor.chunkSize = 2
117         tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion("/dev/null", storage.NewMapPieceCompletion()))
118         // Needed to lock for asynchronous piece verification.
119         tor.cl = new(Client)
120         err := tor.setInfoBytes(mi.InfoBytes)
121         require.NoError(t, err)
122         require.Len(t, tor.pieces, 3)
123         tor.pendAllChunkSpecs(0)
124         tor.cl.mu.Lock()
125         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
126         tor.cl.mu.Unlock()
127         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
128 }
129
130 func TestUnmarshalPEXMsg(t *testing.T) {
131         var m peerExchangeMessage
132         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
133                 t.Fatal(err)
134         }
135         if len(m.Added) != 2 {
136                 t.FailNow()
137         }
138         if m.Added[0].Port != 0x506 {
139                 t.FailNow()
140         }
141 }
142
143 func TestReducedDialTimeout(t *testing.T) {
144         for _, _case := range []struct {
145                 Max             time.Duration
146                 HalfOpenLimit   int
147                 PendingPeers    int
148                 ExpectedReduced time.Duration
149         }{
150                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
151                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
152                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
153                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
154                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
155                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
156         } {
157                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
158                 expected := _case.ExpectedReduced
159                 if expected < minDialTimeout {
160                         expected = minDialTimeout
161                 }
162                 if reduced != expected {
163                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
164                 }
165         }
166 }
167
168 func TestUTPRawConn(t *testing.T) {
169         l, err := NewUtpSocket("udp", "")
170         require.NoError(t, err)
171         defer l.Close()
172         go func() {
173                 for {
174                         _, err := l.Accept()
175                         if err != nil {
176                                 break
177                         }
178                 }
179         }()
180         // Connect a UTP peer to see if the RawConn will still work.
181         s, err := NewUtpSocket("udp", "")
182         require.NoError(t, err)
183         defer s.Close()
184         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
185         require.NoError(t, err)
186         defer utpPeer.Close()
187         peer, err := net.ListenPacket("udp", ":0")
188         require.NoError(t, err)
189         defer peer.Close()
190
191         msgsReceived := 0
192         // How many messages to send. I've set this to double the channel buffer
193         // size in the raw packetConn.
194         const N = 200
195         readerStopped := make(chan struct{})
196         // The reader goroutine.
197         go func() {
198                 defer close(readerStopped)
199                 b := make([]byte, 500)
200                 for i := 0; i < N; i++ {
201                         n, _, err := l.ReadFrom(b)
202                         if err != nil {
203                                 t.Fatalf("error reading from raw conn: %s", err)
204                         }
205                         msgsReceived++
206                         var d int
207                         fmt.Sscan(string(b[:n]), &d)
208                         if d != i {
209                                 log.Printf("got wrong number: expected %d, got %d", i, d)
210                         }
211                 }
212         }()
213         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
214         require.NoError(t, err)
215         for i := 0; i < N; i++ {
216                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
217                 if err != nil {
218                         t.Fatal(err)
219                 }
220                 time.Sleep(time.Microsecond)
221         }
222         select {
223         case <-readerStopped:
224         case <-time.After(time.Second):
225                 t.Fatal("reader timed out")
226         }
227         if msgsReceived != N {
228                 t.Fatalf("messages received: %d", msgsReceived)
229         }
230 }
231
232 func TestTwoClientsArbitraryPorts(t *testing.T) {
233         for i := 0; i < 2; i++ {
234                 cl, err := NewClient(TestingConfig())
235                 if err != nil {
236                         t.Fatal(err)
237                 }
238                 defer cl.Close()
239         }
240 }
241
242 func TestAddDropManyTorrents(t *testing.T) {
243         cl, err := NewClient(TestingConfig())
244         require.NoError(t, err)
245         defer cl.Close()
246         for i := range iter.N(1000) {
247                 var spec TorrentSpec
248                 binary.PutVarint(spec.InfoHash[:], int64(i))
249                 tt, new, err := cl.AddTorrentSpec(&spec)
250                 assert.NoError(t, err)
251                 assert.True(t, new)
252                 defer tt.Drop()
253         }
254 }
255
256 type FileCacheClientStorageFactoryParams struct {
257         Capacity    int64
258         SetCapacity bool
259         Wrapper     func(*filecache.Cache) storage.ClientImpl
260 }
261
262 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
263         return func(dataDir string) storage.ClientImpl {
264                 fc, err := filecache.NewCache(dataDir)
265                 if err != nil {
266                         panic(err)
267                 }
268                 if ps.SetCapacity {
269                         fc.SetCapacity(ps.Capacity)
270                 }
271                 return ps.Wrapper(fc)
272         }
273 }
274
275 type storageFactory func(string) storage.ClientImpl
276
277 func TestClientTransferDefault(t *testing.T) {
278         testClientTransfer(t, testClientTransferParams{
279                 ExportClientStatus: true,
280                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
281                         Wrapper: fileCachePieceResourceStorage,
282                 }),
283         })
284 }
285
286 func TestClientTransferRateLimitedUpload(t *testing.T) {
287         started := time.Now()
288         testClientTransfer(t, testClientTransferParams{
289                 // We are uploading 13 bytes (the length of the greeting torrent). The
290                 // chunks are 2 bytes in length. Then the smallest burst we can run
291                 // with is 2. Time taken is (13-burst)/rate.
292                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
293         })
294         require.True(t, time.Since(started) > time.Second)
295 }
296
297 func TestClientTransferRateLimitedDownload(t *testing.T) {
298         testClientTransfer(t, testClientTransferParams{
299                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
300         })
301 }
302
303 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
304         return storage.NewResourcePieces(fc.AsResourceProvider())
305 }
306
307 func TestClientTransferSmallCache(t *testing.T) {
308         testClientTransfer(t, testClientTransferParams{
309                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
310                         SetCapacity: true,
311                         // Going below the piece length means it can't complete a piece so
312                         // that it can be hashed.
313                         Capacity: 5,
314                         Wrapper:  fileCachePieceResourceStorage,
315                 }),
316                 SetReadahead: true,
317                 // Can't readahead too far or the cache will thrash and drop data we
318                 // thought we had.
319                 Readahead:          0,
320                 ExportClientStatus: true,
321         })
322 }
323
324 func TestClientTransferVarious(t *testing.T) {
325         // Leecher storage
326         for _, ls := range []storageFactory{
327                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
328                         Wrapper: fileCachePieceResourceStorage,
329                 }),
330                 storage.NewBoltDB,
331         } {
332                 // Seeder storage
333                 for _, ss := range []func(string) storage.ClientImpl{
334                         storage.NewFile,
335                         storage.NewMMap,
336                 } {
337                         for _, responsive := range []bool{false, true} {
338                                 testClientTransfer(t, testClientTransferParams{
339                                         Responsive:     responsive,
340                                         SeederStorage:  ss,
341                                         LeecherStorage: ls,
342                                 })
343                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
344                                         testClientTransfer(t, testClientTransferParams{
345                                                 SeederStorage:  ss,
346                                                 Responsive:     responsive,
347                                                 SetReadahead:   true,
348                                                 Readahead:      readahead,
349                                                 LeecherStorage: ls,
350                                         })
351                                 }
352                         }
353                 }
354         }
355 }
356
357 type testClientTransferParams struct {
358         Responsive                 bool
359         Readahead                  int64
360         SetReadahead               bool
361         ExportClientStatus         bool
362         LeecherStorage             func(string) storage.ClientImpl
363         SeederStorage              func(string) storage.ClientImpl
364         SeederUploadRateLimiter    *rate.Limiter
365         LeecherDownloadRateLimiter *rate.Limiter
366 }
367
368 // Creates a seeder and a leecher, and ensures the data transfers when a read
369 // is attempted on the leecher.
370 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
371         greetingTempDir, mi := testutil.GreetingTestTorrent()
372         defer os.RemoveAll(greetingTempDir)
373         // Create seeder and a Torrent.
374         cfg := TestingConfig()
375         cfg.Seed = true
376         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
377         // cfg.ListenAddr = "localhost:4000"
378         if ps.SeederStorage != nil {
379                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
380                 defer cfg.DefaultStorage.Close()
381         } else {
382                 cfg.DataDir = greetingTempDir
383         }
384         seeder, err := NewClient(cfg)
385         require.NoError(t, err)
386         defer seeder.Close()
387         if ps.ExportClientStatus {
388                 testutil.ExportStatusWriter(seeder, "s")
389         }
390         // seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
391         _, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
392         require.NoError(t, err)
393         assert.True(t, new)
394         // Create leecher and a Torrent.
395         leecherDataDir, err := ioutil.TempDir("", "")
396         require.NoError(t, err)
397         defer os.RemoveAll(leecherDataDir)
398         if ps.LeecherStorage == nil {
399                 cfg.DataDir = leecherDataDir
400         } else {
401                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
402         }
403         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
404         // cfg.ListenAddr = "localhost:4001"
405         leecher, err := NewClient(cfg)
406         require.NoError(t, err)
407         defer leecher.Close()
408         if ps.ExportClientStatus {
409                 testutil.ExportStatusWriter(leecher, "l")
410         }
411         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
412                 ret = TorrentSpecFromMetaInfo(mi)
413                 ret.ChunkSize = 2
414                 return
415         }())
416         require.NoError(t, err)
417         assert.True(t, new)
418         // Now do some things with leecher and seeder.
419         addClientPeer(leecherGreeting, seeder)
420         r := leecherGreeting.NewReader()
421         defer r.Close()
422         if ps.Responsive {
423                 r.SetResponsive()
424         }
425         if ps.SetReadahead {
426                 r.SetReadahead(ps.Readahead)
427         }
428         assertReadAllGreeting(t, r)
429         // After one read through, we can assume certain torrent statistics.
430         // These are not a strict requirement. It is however interesting to
431         // follow.
432         // t.Logf("%#v", seederTorrent.Stats())
433         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
434         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
435         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
436         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
437         // Read through again for the cases where the torrent data size exceeds
438         // the size of the cache.
439         assertReadAllGreeting(t, r)
440 }
441
442 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
443         pos, err := r.Seek(0, os.SEEK_SET)
444         assert.NoError(t, err)
445         assert.EqualValues(t, 0, pos)
446         _greeting, err := ioutil.ReadAll(r)
447         assert.NoError(t, err)
448         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
449 }
450
451 // Check that after completing leeching, a leecher transitions to a seeding
452 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
453 func TestSeedAfterDownloading(t *testing.T) {
454         greetingTempDir, mi := testutil.GreetingTestTorrent()
455         defer os.RemoveAll(greetingTempDir)
456         cfg := TestingConfig()
457         cfg.Seed = true
458         cfg.DataDir = greetingTempDir
459         seeder, err := NewClient(cfg)
460         require.NoError(t, err)
461         defer seeder.Close()
462         testutil.ExportStatusWriter(seeder, "s")
463         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
464         cfg.DataDir, err = ioutil.TempDir("", "")
465         require.NoError(t, err)
466         defer os.RemoveAll(cfg.DataDir)
467         leecher, err := NewClient(cfg)
468         require.NoError(t, err)
469         defer leecher.Close()
470         testutil.ExportStatusWriter(leecher, "l")
471         cfg.Seed = false
472         // cfg.TorrentDataOpener = nil
473         cfg.DataDir, err = ioutil.TempDir("", "")
474         require.NoError(t, err)
475         defer os.RemoveAll(cfg.DataDir)
476         leecherLeecher, _ := NewClient(cfg)
477         defer leecherLeecher.Close()
478         testutil.ExportStatusWriter(leecherLeecher, "ll")
479         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
480                 ret = TorrentSpecFromMetaInfo(mi)
481                 ret.ChunkSize = 2
482                 return
483         }())
484         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
485                 ret = TorrentSpecFromMetaInfo(mi)
486                 ret.ChunkSize = 3
487                 return
488         }())
489         // Simultaneously DownloadAll in Leecher, and read the contents
490         // consecutively in LeecherLeecher. This non-deterministically triggered a
491         // case where the leecher wouldn't unchoke the LeecherLeecher.
492         var wg sync.WaitGroup
493         wg.Add(1)
494         go func() {
495                 defer wg.Done()
496                 r := llg.NewReader()
497                 defer r.Close()
498                 b, err := ioutil.ReadAll(r)
499                 require.NoError(t, err)
500                 assert.EqualValues(t, testutil.GreetingFileContents, b)
501         }()
502         addClientPeer(leecherGreeting, seeder)
503         addClientPeer(leecherGreeting, leecherLeecher)
504         wg.Add(1)
505         go func() {
506                 defer wg.Done()
507                 leecherGreeting.DownloadAll()
508                 leecher.WaitAll()
509         }()
510         wg.Wait()
511 }
512
513 func TestMergingTrackersByAddingSpecs(t *testing.T) {
514         cl, err := NewClient(TestingConfig())
515         require.NoError(t, err)
516         defer cl.Close()
517         spec := TorrentSpec{}
518         T, new, _ := cl.AddTorrentSpec(&spec)
519         if !new {
520                 t.FailNow()
521         }
522         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
523         _, new, _ = cl.AddTorrentSpec(&spec)
524         assert.False(t, new)
525         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
526         // Because trackers are disabled in TestingConfig.
527         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
528 }
529
530 type badStorage struct{}
531
532 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
533         return bs, nil
534 }
535
536 func (bs badStorage) Close() error {
537         return nil
538 }
539
540 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
541         return badStoragePiece{p}
542 }
543
544 type badStoragePiece struct {
545         p metainfo.Piece
546 }
547
548 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
549         return 0, nil
550 }
551
552 func (p badStoragePiece) GetIsComplete() bool {
553         return true
554 }
555
556 func (p badStoragePiece) MarkComplete() error {
557         return errors.New("psyyyyyyyche")
558 }
559
560 func (p badStoragePiece) MarkNotComplete() error {
561         return errors.New("psyyyyyyyche")
562 }
563
564 func (p badStoragePiece) randomlyTruncatedDataString() string {
565         return "hello, world\n"[:rand.Intn(14)]
566 }
567
568 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
569         r := strings.NewReader(p.randomlyTruncatedDataString())
570         return r.ReadAt(b, off+p.p.Offset())
571 }
572
573 // We read from a piece which is marked completed, but is missing data.
574 func TestCompletedPieceWrongSize(t *testing.T) {
575         cfg := TestingConfig()
576         cfg.DefaultStorage = badStorage{}
577         cl, err := NewClient(cfg)
578         require.NoError(t, err)
579         defer cl.Close()
580         info := metainfo.Info{
581                 PieceLength: 15,
582                 Pieces:      make([]byte, 20),
583                 Files: []metainfo.FileInfo{
584                         {Path: []string{"greeting"}, Length: 13},
585                 },
586         }
587         b, err := bencode.Marshal(info)
588         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
589                 InfoBytes: b,
590                 InfoHash:  metainfo.HashBytes(b),
591         })
592         require.NoError(t, err)
593         defer tt.Drop()
594         assert.True(t, new)
595         r := tt.NewReader()
596         defer r.Close()
597         b, err = ioutil.ReadAll(r)
598         assert.Len(t, b, 13)
599         assert.NoError(t, err)
600 }
601
602 func BenchmarkAddLargeTorrent(b *testing.B) {
603         cfg := TestingConfig()
604         cfg.DisableTCP = true
605         cfg.DisableUTP = true
606         cfg.ListenAddr = "redonk"
607         cl, err := NewClient(cfg)
608         require.NoError(b, err)
609         defer cl.Close()
610         for range iter.N(b.N) {
611                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
612                 if err != nil {
613                         b.Fatal(err)
614                 }
615                 t.Drop()
616         }
617 }
618
619 func TestResponsive(t *testing.T) {
620         seederDataDir, mi := testutil.GreetingTestTorrent()
621         defer os.RemoveAll(seederDataDir)
622         cfg := TestingConfig()
623         cfg.Seed = true
624         cfg.DataDir = seederDataDir
625         seeder, err := NewClient(cfg)
626         require.Nil(t, err)
627         defer seeder.Close()
628         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
629         leecherDataDir, err := ioutil.TempDir("", "")
630         require.Nil(t, err)
631         defer os.RemoveAll(leecherDataDir)
632         cfg = TestingConfig()
633         cfg.DataDir = leecherDataDir
634         leecher, err := NewClient(cfg)
635         require.Nil(t, err)
636         defer leecher.Close()
637         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
638                 ret = TorrentSpecFromMetaInfo(mi)
639                 ret.ChunkSize = 2
640                 return
641         }())
642         addClientPeer(leecherTorrent, seeder)
643         reader := leecherTorrent.NewReader()
644         defer reader.Close()
645         reader.SetReadahead(0)
646         reader.SetResponsive()
647         b := make([]byte, 2)
648         _, err = reader.Seek(3, os.SEEK_SET)
649         require.NoError(t, err)
650         _, err = io.ReadFull(reader, b)
651         assert.Nil(t, err)
652         assert.EqualValues(t, "lo", string(b))
653         _, err = reader.Seek(11, os.SEEK_SET)
654         require.NoError(t, err)
655         n, err := io.ReadFull(reader, b)
656         assert.Nil(t, err)
657         assert.EqualValues(t, 2, n)
658         assert.EqualValues(t, "d\n", string(b))
659 }
660
661 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
662         seederDataDir, mi := testutil.GreetingTestTorrent()
663         defer os.RemoveAll(seederDataDir)
664         cfg := TestingConfig()
665         cfg.Seed = true
666         cfg.DataDir = seederDataDir
667         seeder, err := NewClient(cfg)
668         require.Nil(t, err)
669         defer seeder.Close()
670         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
671         leecherDataDir, err := ioutil.TempDir("", "")
672         require.Nil(t, err)
673         defer os.RemoveAll(leecherDataDir)
674         cfg = TestingConfig()
675         cfg.DataDir = leecherDataDir
676         leecher, err := NewClient(cfg)
677         require.Nil(t, err)
678         defer leecher.Close()
679         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
680                 ret = TorrentSpecFromMetaInfo(mi)
681                 ret.ChunkSize = 2
682                 return
683         }())
684         addClientPeer(leecherTorrent, seeder)
685         reader := leecherTorrent.NewReader()
686         defer reader.Close()
687         reader.SetReadahead(0)
688         reader.SetResponsive()
689         b := make([]byte, 2)
690         _, err = reader.Seek(3, os.SEEK_SET)
691         require.NoError(t, err)
692         _, err = io.ReadFull(reader, b)
693         assert.Nil(t, err)
694         assert.EqualValues(t, "lo", string(b))
695         go leecherTorrent.Drop()
696         _, err = reader.Seek(11, os.SEEK_SET)
697         require.NoError(t, err)
698         n, err := reader.Read(b)
699         assert.EqualError(t, err, "torrent closed")
700         assert.EqualValues(t, 0, n)
701 }
702
703 func TestDHTInheritBlocklist(t *testing.T) {
704         ipl := iplist.New(nil)
705         require.NotNil(t, ipl)
706         cfg := TestingConfig()
707         cfg.IPBlocklist = ipl
708         cfg.NoDHT = false
709         cl, err := NewClient(cfg)
710         require.NoError(t, err)
711         defer cl.Close()
712         require.Equal(t, ipl, cl.DHT().IPBlocklist())
713 }
714
715 // Check that stuff is merged in subsequent AddTorrentSpec for the same
716 // infohash.
717 func TestAddTorrentSpecMerging(t *testing.T) {
718         cl, err := NewClient(TestingConfig())
719         require.NoError(t, err)
720         defer cl.Close()
721         dir, mi := testutil.GreetingTestTorrent()
722         defer os.RemoveAll(dir)
723         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
724                 InfoHash: mi.HashInfoBytes(),
725         })
726         require.NoError(t, err)
727         require.True(t, new)
728         require.Nil(t, tt.Info())
729         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
730         require.NoError(t, err)
731         require.False(t, new)
732         require.NotNil(t, tt.Info())
733 }
734
735 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
736         dir, mi := testutil.GreetingTestTorrent()
737         os.RemoveAll(dir)
738         cl, _ := NewClient(TestingConfig())
739         defer cl.Close()
740         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
741                 InfoHash: mi.HashInfoBytes(),
742         })
743         tt.Drop()
744         assert.EqualValues(t, 0, len(cl.Torrents()))
745         select {
746         case <-tt.GotInfo():
747                 t.FailNow()
748         default:
749         }
750 }
751
752 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
753         for i := range iter.N(info.NumPieces()) {
754                 p := info.Piece(i)
755                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
756         }
757 }
758
759 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
760         fileCacheDir, err := ioutil.TempDir("", "")
761         require.NoError(t, err)
762         defer os.RemoveAll(fileCacheDir)
763         fileCache, err := filecache.NewCache(fileCacheDir)
764         require.NoError(t, err)
765         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
766         defer os.RemoveAll(greetingDataTempDir)
767         filePieceStore := csf(fileCache)
768         info, err := greetingMetainfo.UnmarshalInfo()
769         require.NoError(t, err)
770         ih := greetingMetainfo.HashInfoBytes()
771         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
772         require.NoError(t, err)
773         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
774         // require.Equal(t, len(testutil.GreetingFileContents), written)
775         // require.NoError(t, err)
776         for i := 0; i < info.NumPieces(); i++ {
777                 p := info.Piece(i)
778                 if alreadyCompleted {
779                         err := greetingData.Piece(p).MarkComplete()
780                         assert.NoError(t, err)
781                 }
782         }
783         cfg := TestingConfig()
784         // TODO: Disable network option?
785         cfg.DisableTCP = true
786         cfg.DisableUTP = true
787         cfg.DefaultStorage = filePieceStore
788         cl, err := NewClient(cfg)
789         require.NoError(t, err)
790         defer cl.Close()
791         tt, err := cl.AddTorrent(greetingMetainfo)
792         require.NoError(t, err)
793         psrs := tt.PieceStateRuns()
794         assert.Len(t, psrs, 1)
795         assert.EqualValues(t, 3, psrs[0].Length)
796         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
797         if alreadyCompleted {
798                 r := tt.NewReader()
799                 b, err := ioutil.ReadAll(r)
800                 assert.NoError(t, err)
801                 assert.EqualValues(t, testutil.GreetingFileContents, b)
802         }
803 }
804
805 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
806         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
807 }
808
809 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
810         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
811 }
812
813 func TestAddMetainfoWithNodes(t *testing.T) {
814         cfg := TestingConfig()
815         cfg.ListenAddr = ":0"
816         cfg.NoDHT = false
817         // For now, we want to just jam the nodes into the table, without
818         // verifying them first. Also the DHT code doesn't support mixing secure
819         // and insecure nodes if security is enabled (yet).
820         cfg.DHTConfig.NoSecurity = true
821         cl, err := NewClient(cfg)
822         require.NoError(t, err)
823         defer cl.Close()
824         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
825         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
826         require.NoError(t, err)
827         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
828         // check if the announce-list is here instead. TODO: Add nodes.
829         assert.Len(t, tt.metainfo.AnnounceList, 5)
830         // There are 6 nodes in the torrent file.
831         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
832 }
833
834 type testDownloadCancelParams struct {
835         ExportClientStatus        bool
836         SetLeecherStorageCapacity bool
837         LeecherStorageCapacity    int64
838         Cancel                    bool
839 }
840
841 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
842         greetingTempDir, mi := testutil.GreetingTestTorrent()
843         defer os.RemoveAll(greetingTempDir)
844         cfg := TestingConfig()
845         cfg.Seed = true
846         cfg.DataDir = greetingTempDir
847         seeder, err := NewClient(cfg)
848         require.NoError(t, err)
849         defer seeder.Close()
850         if ps.ExportClientStatus {
851                 testutil.ExportStatusWriter(seeder, "s")
852         }
853         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
854         leecherDataDir, err := ioutil.TempDir("", "")
855         require.NoError(t, err)
856         defer os.RemoveAll(leecherDataDir)
857         fc, err := filecache.NewCache(leecherDataDir)
858         require.NoError(t, err)
859         if ps.SetLeecherStorageCapacity {
860                 fc.SetCapacity(ps.LeecherStorageCapacity)
861         }
862         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
863         cfg.DataDir = leecherDataDir
864         leecher, _ := NewClient(cfg)
865         defer leecher.Close()
866         if ps.ExportClientStatus {
867                 testutil.ExportStatusWriter(leecher, "l")
868         }
869         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
870                 ret = TorrentSpecFromMetaInfo(mi)
871                 ret.ChunkSize = 2
872                 return
873         }())
874         require.NoError(t, err)
875         assert.True(t, new)
876         psc := leecherGreeting.SubscribePieceStateChanges()
877         defer psc.Close()
878         leecherGreeting.DownloadAll()
879         if ps.Cancel {
880                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
881         }
882         addClientPeer(leecherGreeting, seeder)
883         completes := make(map[int]bool, 3)
884 values:
885         for {
886                 // started := time.Now()
887                 select {
888                 case _v := <-psc.Values:
889                         // log.Print(time.Since(started))
890                         v := _v.(PieceStateChange)
891                         completes[v.Index] = v.Complete
892                 case <-time.After(100 * time.Millisecond):
893                         break values
894                 }
895         }
896         if ps.Cancel {
897                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
898         } else {
899                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
900         }
901
902 }
903
904 func TestTorrentDownloadAll(t *testing.T) {
905         testDownloadCancel(t, testDownloadCancelParams{})
906 }
907
908 func TestTorrentDownloadAllThenCancel(t *testing.T) {
909         testDownloadCancel(t, testDownloadCancelParams{
910                 Cancel: true,
911         })
912 }
913
914 // Ensure that it's an error for a peer to send an invalid have message.
915 func TestPeerInvalidHave(t *testing.T) {
916         cl, err := NewClient(TestingConfig())
917         require.NoError(t, err)
918         defer cl.Close()
919         info := metainfo.Info{
920                 PieceLength: 1,
921                 Pieces:      make([]byte, 20),
922                 Files:       []metainfo.FileInfo{{Length: 1}},
923         }
924         infoBytes, err := bencode.Marshal(info)
925         require.NoError(t, err)
926         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
927                 InfoBytes: infoBytes,
928                 InfoHash:  metainfo.HashBytes(infoBytes),
929         })
930         require.NoError(t, err)
931         assert.True(t, _new)
932         defer tt.Drop()
933         cn := &connection{
934                 t: tt,
935         }
936         assert.NoError(t, cn.peerSentHave(0))
937         assert.Error(t, cn.peerSentHave(1))
938 }
939
940 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
941         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
942         defer os.RemoveAll(greetingTempDir)
943         cfg := TestingConfig()
944         cfg.DataDir = greetingTempDir
945         seeder, err := NewClient(TestingConfig())
946         require.NoError(t, err)
947         seeder.AddTorrentSpec(&TorrentSpec{
948                 InfoBytes: greetingMetainfo.InfoBytes,
949         })
950 }
951
952 func TestPrepareTrackerAnnounce(t *testing.T) {
953         cl := &Client{}
954         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
955         require.NoError(t, err)
956         assert.False(t, blocked)
957         assert.EqualValues(t, "localhost:1234", host)
958         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
959 }
960
961 // Check that when the listen port is 0, all the protocols listened on have
962 // the same port, and it isn't zero.
963 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
964         cl, err := NewClient(TestingConfig())
965         require.NoError(t, err)
966         defer cl.Close()
967         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
968         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
969 }
970
971 func TestClientDynamicListenTCPOnly(t *testing.T) {
972         cfg := TestingConfig()
973         cfg.DisableUTP = true
974         cl, err := NewClient(cfg)
975         require.NoError(t, err)
976         defer cl.Close()
977         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
978         assert.Nil(t, cl.utpSock)
979 }
980
981 func TestClientDynamicListenUTPOnly(t *testing.T) {
982         cfg := TestingConfig()
983         cfg.DisableTCP = true
984         cl, err := NewClient(cfg)
985         require.NoError(t, err)
986         defer cl.Close()
987         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
988         assert.Nil(t, cl.tcpListener)
989 }
990
991 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
992         cfg := TestingConfig()
993         cfg.DisableTCP = true
994         cfg.DisableUTP = true
995         cl, err := NewClient(cfg)
996         require.NoError(t, err)
997         defer cl.Close()
998         assert.Nil(t, cl.ListenAddr())
999 }
1000
1001 func addClientPeer(t *Torrent, cl *Client) {
1002         t.AddPeers([]Peer{
1003                 {
1004                         IP:   missinggo.AddrIP(cl.ListenAddr()),
1005                         Port: missinggo.AddrPort(cl.ListenAddr()),
1006                 },
1007         })
1008 }
1009
1010 func totalConns(tts []*Torrent) (ret int) {
1011         for _, tt := range tts {
1012                 tt.cl.mu.Lock()
1013                 ret += len(tt.conns)
1014                 tt.cl.mu.Unlock()
1015         }
1016         return
1017 }
1018
1019 func TestSetMaxEstablishedConn(t *testing.T) {
1020         var tts []*Torrent
1021         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1022         for i := range iter.N(3) {
1023                 cl, err := NewClient(TestingConfig())
1024                 require.NoError(t, err)
1025                 defer cl.Close()
1026                 tt, _ := cl.AddTorrentInfoHash(ih)
1027                 tt.SetMaxEstablishedConns(2)
1028                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1029                 tts = append(tts, tt)
1030         }
1031         addPeers := func() {
1032                 for i, tt := range tts {
1033                         for _, _tt := range tts[:i] {
1034                                 addClientPeer(tt, _tt.cl)
1035                         }
1036                 }
1037         }
1038         waitTotalConns := func(num int) {
1039                 for totalConns(tts) != num {
1040                         time.Sleep(time.Millisecond)
1041                 }
1042         }
1043         addPeers()
1044         waitTotalConns(6)
1045         tts[0].SetMaxEstablishedConns(1)
1046         waitTotalConns(4)
1047         tts[0].SetMaxEstablishedConns(0)
1048         waitTotalConns(2)
1049         tts[0].SetMaxEstablishedConns(1)
1050         addPeers()
1051         waitTotalConns(4)
1052         tts[0].SetMaxEstablishedConns(2)
1053         addPeers()
1054         waitTotalConns(6)
1055 }
1056
1057 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1058         os.MkdirAll(dir, 0770)
1059         file, err := os.Create(filepath.Join(dir, name))
1060         require.NoError(t, err)
1061         file.Write([]byte(name))
1062         file.Close()
1063         mi := metainfo.MetaInfo{}
1064         mi.SetDefaults()
1065         info := metainfo.Info{PieceLength: 256 * 1024}
1066         err = info.BuildFromFilePath(filepath.Join(dir, name))
1067         require.NoError(t, err)
1068         mi.InfoBytes, err = bencode.Marshal(info)
1069         require.NoError(t, err)
1070         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1071         tr, err := cl.AddTorrent(&mi)
1072         require.NoError(t, err)
1073         assert.True(t, tr.Seeding())
1074         return magnet
1075 }
1076
1077 // https://github.com/anacrolix/torrent/issues/114
1078 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1079         cfg := TestingConfig()
1080         cfg.DisableUTP = true
1081         cfg.Seed = true
1082         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1083         cfg.Debug = true
1084         cfg.ForceEncryption = true
1085         os.Mkdir(cfg.DataDir, 0755)
1086         server, err := NewClient(cfg)
1087         require.NoError(t, err)
1088         defer server.Close()
1089         testutil.ExportStatusWriter(server, "s")
1090         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1091         makeMagnet(t, server, cfg.DataDir, "test2")
1092         cfg = TestingConfig()
1093         cfg.DisableUTP = true
1094         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1095         cfg.Debug = true
1096         cfg.ForceEncryption = true
1097         client, err := NewClient(cfg)
1098         require.NoError(t, err)
1099         defer client.Close()
1100         testutil.ExportStatusWriter(client, "c")
1101         tr, err := client.AddMagnet(magnet1)
1102         require.NoError(t, err)
1103         tr.AddPeers([]Peer{{
1104                 IP:   missinggo.AddrIP(server.ListenAddr()),
1105                 Port: missinggo.AddrPort(server.ListenAddr()),
1106         }})
1107         <-tr.GotInfo()
1108         tr.DownloadAll()
1109         client.WaitAll()
1110 }