]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
f589186c89dacc8e8ffc9b1426986067b598ae13
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "math/rand"
10         "net"
11         "os"
12         "path/filepath"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/bradfitz/iter"
23         "github.com/stretchr/testify/assert"
24         "github.com/stretchr/testify/require"
25         "golang.org/x/time/rate"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/internal/testutil"
29         "github.com/anacrolix/torrent/iplist"
30         "github.com/anacrolix/torrent/metainfo"
31         "github.com/anacrolix/torrent/storage"
32 )
33
34 func TestingConfig() *Config {
35         return &Config{
36                 ListenAddr:      "localhost:0",
37                 NoDHT:           true,
38                 DataDir:         tempDir(),
39                 DisableTrackers: true,
40                 // Debug:           true,
41         }
42 }
43
44 func TestClientDefault(t *testing.T) {
45         cl, err := NewClient(TestingConfig())
46         require.NoError(t, err)
47         cl.Close()
48 }
49
50 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
51         cfg := TestingConfig()
52         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
53         require.NoError(t, err)
54         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
55         defer ci.Close()
56         cfg.DefaultStorage = ci
57         cl, err := NewClient(cfg)
58         require.NoError(t, err)
59         cl.Close()
60         // And again, https://github.com/anacrolix/torrent/issues/158
61         cl, err = NewClient(cfg)
62         require.NoError(t, err)
63         cl.Close()
64 }
65
66 func TestAddDropTorrent(t *testing.T) {
67         cl, err := NewClient(TestingConfig())
68         require.NoError(t, err)
69         defer cl.Close()
70         dir, mi := testutil.GreetingTestTorrent()
71         defer os.RemoveAll(dir)
72         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
73         require.NoError(t, err)
74         assert.True(t, new)
75         tt.SetMaxEstablishedConns(0)
76         tt.SetMaxEstablishedConns(1)
77         tt.Drop()
78 }
79
80 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
81         t.SkipNow()
82 }
83
84 func TestAddTorrentNoUsableURLs(t *testing.T) {
85         t.SkipNow()
86 }
87
88 func TestAddPeersToUnknownTorrent(t *testing.T) {
89         t.SkipNow()
90 }
91
92 func TestPieceHashSize(t *testing.T) {
93         if pieceHash.Size() != 20 {
94                 t.FailNow()
95         }
96 }
97
98 func TestTorrentInitialState(t *testing.T) {
99         dir, mi := testutil.GreetingTestTorrent()
100         defer os.RemoveAll(dir)
101         tor := &Torrent{
102                 infoHash:          mi.HashInfoBytes(),
103                 pieceStateChanges: pubsub.NewPubSub(),
104         }
105         tor.chunkSize = 2
106         tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()))
107         // Needed to lock for asynchronous piece verification.
108         tor.cl = new(Client)
109         tor.cl.mu.Lock()
110         err := tor.setInfoBytes(mi.InfoBytes)
111         tor.cl.mu.Unlock()
112         require.NoError(t, err)
113         require.Len(t, tor.pieces, 3)
114         tor.pendAllChunkSpecs(0)
115         tor.cl.mu.Lock()
116         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
117         tor.cl.mu.Unlock()
118         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
119 }
120
121 func TestUnmarshalPEXMsg(t *testing.T) {
122         var m peerExchangeMessage
123         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
124                 t.Fatal(err)
125         }
126         if len(m.Added) != 2 {
127                 t.FailNow()
128         }
129         if m.Added[0].Port != 0x506 {
130                 t.FailNow()
131         }
132 }
133
134 func TestReducedDialTimeout(t *testing.T) {
135         for _, _case := range []struct {
136                 Max             time.Duration
137                 HalfOpenLimit   int
138                 PendingPeers    int
139                 ExpectedReduced time.Duration
140         }{
141                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
142                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
143                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
144                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
145                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
146                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
147         } {
148                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
149                 expected := _case.ExpectedReduced
150                 if expected < minDialTimeout {
151                         expected = minDialTimeout
152                 }
153                 if reduced != expected {
154                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
155                 }
156         }
157 }
158
159 func TestUTPRawConn(t *testing.T) {
160         l, err := NewUtpSocket("udp", "")
161         require.NoError(t, err)
162         defer l.Close()
163         go func() {
164                 for {
165                         _, err := l.Accept()
166                         if err != nil {
167                                 break
168                         }
169                 }
170         }()
171         // Connect a UTP peer to see if the RawConn will still work.
172         s, err := NewUtpSocket("udp", "")
173         require.NoError(t, err)
174         defer s.Close()
175         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
176         require.NoError(t, err)
177         defer utpPeer.Close()
178         peer, err := net.ListenPacket("udp", ":0")
179         require.NoError(t, err)
180         defer peer.Close()
181
182         msgsReceived := 0
183         // How many messages to send. I've set this to double the channel buffer
184         // size in the raw packetConn.
185         const N = 200
186         readerStopped := make(chan struct{})
187         // The reader goroutine.
188         go func() {
189                 defer close(readerStopped)
190                 b := make([]byte, 500)
191                 for i := 0; i < N; i++ {
192                         n, _, err := l.ReadFrom(b)
193                         require.NoError(t, err)
194                         msgsReceived++
195                         var d int
196                         fmt.Sscan(string(b[:n]), &d)
197                         assert.Equal(t, i, d)
198                 }
199         }()
200         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
201         require.NoError(t, err)
202         for i := 0; i < N; i++ {
203                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
204                 require.NoError(t, err)
205                 time.Sleep(time.Millisecond)
206         }
207         select {
208         case <-readerStopped:
209         case <-time.After(time.Second):
210                 t.Fatal("reader timed out")
211         }
212         if msgsReceived != N {
213                 t.Fatalf("messages received: %d", msgsReceived)
214         }
215 }
216
217 func TestTwoClientsArbitraryPorts(t *testing.T) {
218         for i := 0; i < 2; i++ {
219                 cl, err := NewClient(TestingConfig())
220                 if err != nil {
221                         t.Fatal(err)
222                 }
223                 defer cl.Close()
224         }
225 }
226
227 func TestAddDropManyTorrents(t *testing.T) {
228         cl, err := NewClient(TestingConfig())
229         require.NoError(t, err)
230         defer cl.Close()
231         for i := range iter.N(1000) {
232                 var spec TorrentSpec
233                 binary.PutVarint(spec.InfoHash[:], int64(i))
234                 tt, new, err := cl.AddTorrentSpec(&spec)
235                 assert.NoError(t, err)
236                 assert.True(t, new)
237                 defer tt.Drop()
238         }
239 }
240
241 type FileCacheClientStorageFactoryParams struct {
242         Capacity    int64
243         SetCapacity bool
244         Wrapper     func(*filecache.Cache) storage.ClientImpl
245 }
246
247 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
248         return func(dataDir string) storage.ClientImpl {
249                 fc, err := filecache.NewCache(dataDir)
250                 if err != nil {
251                         panic(err)
252                 }
253                 if ps.SetCapacity {
254                         fc.SetCapacity(ps.Capacity)
255                 }
256                 return ps.Wrapper(fc)
257         }
258 }
259
260 type storageFactory func(string) storage.ClientImpl
261
262 func TestClientTransferDefault(t *testing.T) {
263         testClientTransfer(t, testClientTransferParams{
264                 ExportClientStatus: true,
265                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
266                         Wrapper: fileCachePieceResourceStorage,
267                 }),
268         })
269 }
270
271 func TestClientTransferRateLimitedUpload(t *testing.T) {
272         started := time.Now()
273         testClientTransfer(t, testClientTransferParams{
274                 // We are uploading 13 bytes (the length of the greeting torrent). The
275                 // chunks are 2 bytes in length. Then the smallest burst we can run
276                 // with is 2. Time taken is (13-burst)/rate.
277                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
278         })
279         require.True(t, time.Since(started) > time.Second)
280 }
281
282 func TestClientTransferRateLimitedDownload(t *testing.T) {
283         testClientTransfer(t, testClientTransferParams{
284                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
285         })
286 }
287
288 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
289         return storage.NewResourcePieces(fc.AsResourceProvider())
290 }
291
292 func TestClientTransferSmallCache(t *testing.T) {
293         testClientTransfer(t, testClientTransferParams{
294                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
295                         SetCapacity: true,
296                         // Going below the piece length means it can't complete a piece so
297                         // that it can be hashed.
298                         Capacity: 5,
299                         Wrapper:  fileCachePieceResourceStorage,
300                 }),
301                 SetReadahead: true,
302                 // Can't readahead too far or the cache will thrash and drop data we
303                 // thought we had.
304                 Readahead:          0,
305                 ExportClientStatus: true,
306         })
307 }
308
309 func TestClientTransferVarious(t *testing.T) {
310         // Leecher storage
311         for _, ls := range []storageFactory{
312                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
313                         Wrapper: fileCachePieceResourceStorage,
314                 }),
315                 storage.NewBoltDB,
316         } {
317                 // Seeder storage
318                 for _, ss := range []func(string) storage.ClientImpl{
319                         storage.NewFile,
320                         storage.NewMMap,
321                 } {
322                         for _, responsive := range []bool{false, true} {
323                                 testClientTransfer(t, testClientTransferParams{
324                                         Responsive:     responsive,
325                                         SeederStorage:  ss,
326                                         LeecherStorage: ls,
327                                 })
328                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
329                                         testClientTransfer(t, testClientTransferParams{
330                                                 SeederStorage:  ss,
331                                                 Responsive:     responsive,
332                                                 SetReadahead:   true,
333                                                 Readahead:      readahead,
334                                                 LeecherStorage: ls,
335                                         })
336                                 }
337                         }
338                 }
339         }
340 }
341
342 type testClientTransferParams struct {
343         Responsive                 bool
344         Readahead                  int64
345         SetReadahead               bool
346         ExportClientStatus         bool
347         LeecherStorage             func(string) storage.ClientImpl
348         SeederStorage              func(string) storage.ClientImpl
349         SeederUploadRateLimiter    *rate.Limiter
350         LeecherDownloadRateLimiter *rate.Limiter
351 }
352
353 // Creates a seeder and a leecher, and ensures the data transfers when a read
354 // is attempted on the leecher.
355 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
356         greetingTempDir, mi := testutil.GreetingTestTorrent()
357         defer os.RemoveAll(greetingTempDir)
358         // Create seeder and a Torrent.
359         cfg := TestingConfig()
360         cfg.Seed = true
361         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
362         // cfg.ListenAddr = "localhost:4000"
363         if ps.SeederStorage != nil {
364                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
365                 defer cfg.DefaultStorage.Close()
366         } else {
367                 cfg.DataDir = greetingTempDir
368         }
369         seeder, err := NewClient(cfg)
370         require.NoError(t, err)
371         defer seeder.Close()
372         if ps.ExportClientStatus {
373                 testutil.ExportStatusWriter(seeder, "s")
374         }
375         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
376         seederTorrent.VerifyData()
377         // Create leecher and a Torrent.
378         leecherDataDir, err := ioutil.TempDir("", "")
379         require.NoError(t, err)
380         defer os.RemoveAll(leecherDataDir)
381         if ps.LeecherStorage == nil {
382                 cfg.DataDir = leecherDataDir
383         } else {
384                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
385         }
386         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
387         // cfg.ListenAddr = "localhost:4001"
388         leecher, err := NewClient(cfg)
389         require.NoError(t, err)
390         defer leecher.Close()
391         if ps.ExportClientStatus {
392                 testutil.ExportStatusWriter(leecher, "l")
393         }
394         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
395                 ret = TorrentSpecFromMetaInfo(mi)
396                 ret.ChunkSize = 2
397                 return
398         }())
399         require.NoError(t, err)
400         assert.True(t, new)
401         // Now do some things with leecher and seeder.
402         addClientPeer(leecherGreeting, seeder)
403         r := leecherGreeting.NewReader()
404         defer r.Close()
405         if ps.Responsive {
406                 r.SetResponsive()
407         }
408         if ps.SetReadahead {
409                 r.SetReadahead(ps.Readahead)
410         }
411         assertReadAllGreeting(t, r)
412         // After one read through, we can assume certain torrent statistics.
413         // These are not a strict requirement. It is however interesting to
414         // follow.
415         // t.Logf("%#v", seederTorrent.Stats())
416         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
417         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
418         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
419         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
420         // Read through again for the cases where the torrent data size exceeds
421         // the size of the cache.
422         assertReadAllGreeting(t, r)
423 }
424
425 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
426         pos, err := r.Seek(0, io.SeekStart)
427         assert.NoError(t, err)
428         assert.EqualValues(t, 0, pos)
429         _greeting, err := ioutil.ReadAll(r)
430         assert.NoError(t, err)
431         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
432 }
433
434 // Check that after completing leeching, a leecher transitions to a seeding
435 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
436 func TestSeedAfterDownloading(t *testing.T) {
437         greetingTempDir, mi := testutil.GreetingTestTorrent()
438         defer os.RemoveAll(greetingTempDir)
439         cfg := TestingConfig()
440         cfg.Seed = true
441         cfg.DataDir = greetingTempDir
442         seeder, err := NewClient(cfg)
443         require.NoError(t, err)
444         defer seeder.Close()
445         testutil.ExportStatusWriter(seeder, "s")
446         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
447         seederTorrent.VerifyData()
448         cfg.DataDir, err = ioutil.TempDir("", "")
449         require.NoError(t, err)
450         defer os.RemoveAll(cfg.DataDir)
451         leecher, err := NewClient(cfg)
452         require.NoError(t, err)
453         defer leecher.Close()
454         testutil.ExportStatusWriter(leecher, "l")
455         cfg.Seed = false
456         // cfg.TorrentDataOpener = nil
457         cfg.DataDir, err = ioutil.TempDir("", "")
458         require.NoError(t, err)
459         defer os.RemoveAll(cfg.DataDir)
460         leecherLeecher, _ := NewClient(cfg)
461         defer leecherLeecher.Close()
462         testutil.ExportStatusWriter(leecherLeecher, "ll")
463         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
464                 ret = TorrentSpecFromMetaInfo(mi)
465                 ret.ChunkSize = 2
466                 return
467         }())
468         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
469                 ret = TorrentSpecFromMetaInfo(mi)
470                 ret.ChunkSize = 3
471                 return
472         }())
473         // Simultaneously DownloadAll in Leecher, and read the contents
474         // consecutively in LeecherLeecher. This non-deterministically triggered a
475         // case where the leecher wouldn't unchoke the LeecherLeecher.
476         var wg sync.WaitGroup
477         wg.Add(1)
478         go func() {
479                 defer wg.Done()
480                 r := llg.NewReader()
481                 defer r.Close()
482                 b, err := ioutil.ReadAll(r)
483                 require.NoError(t, err)
484                 assert.EqualValues(t, testutil.GreetingFileContents, b)
485         }()
486         addClientPeer(leecherGreeting, seeder)
487         addClientPeer(leecherGreeting, leecherLeecher)
488         wg.Add(1)
489         go func() {
490                 defer wg.Done()
491                 leecherGreeting.DownloadAll()
492                 leecher.WaitAll()
493         }()
494         wg.Wait()
495 }
496
497 func TestMergingTrackersByAddingSpecs(t *testing.T) {
498         cl, err := NewClient(TestingConfig())
499         require.NoError(t, err)
500         defer cl.Close()
501         spec := TorrentSpec{}
502         T, new, _ := cl.AddTorrentSpec(&spec)
503         if !new {
504                 t.FailNow()
505         }
506         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
507         _, new, _ = cl.AddTorrentSpec(&spec)
508         assert.False(t, new)
509         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
510         // Because trackers are disabled in TestingConfig.
511         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
512 }
513
514 type badStorage struct{}
515
516 var _ storage.ClientImpl = badStorage{}
517
518 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
519         return bs, nil
520 }
521
522 func (bs badStorage) Close() error {
523         return nil
524 }
525
526 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
527         return badStoragePiece{p}
528 }
529
530 type badStoragePiece struct {
531         p metainfo.Piece
532 }
533
534 var _ storage.PieceImpl = badStoragePiece{}
535
536 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
537         return 0, nil
538 }
539
540 func (p badStoragePiece) Completion() storage.Completion {
541         return storage.Completion{Complete: true, Ok: true}
542 }
543
544 func (p badStoragePiece) MarkComplete() error {
545         return errors.New("psyyyyyyyche")
546 }
547
548 func (p badStoragePiece) MarkNotComplete() error {
549         return errors.New("psyyyyyyyche")
550 }
551
552 func (p badStoragePiece) randomlyTruncatedDataString() string {
553         return "hello, world\n"[:rand.Intn(14)]
554 }
555
556 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
557         r := strings.NewReader(p.randomlyTruncatedDataString())
558         return r.ReadAt(b, off+p.p.Offset())
559 }
560
561 // We read from a piece which is marked completed, but is missing data.
562 func TestCompletedPieceWrongSize(t *testing.T) {
563         cfg := TestingConfig()
564         cfg.DefaultStorage = badStorage{}
565         cl, err := NewClient(cfg)
566         require.NoError(t, err)
567         defer cl.Close()
568         info := metainfo.Info{
569                 PieceLength: 15,
570                 Pieces:      make([]byte, 20),
571                 Files: []metainfo.FileInfo{
572                         {Path: []string{"greeting"}, Length: 13},
573                 },
574         }
575         b, err := bencode.Marshal(info)
576         require.NoError(t, err)
577         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
578                 InfoBytes: b,
579                 InfoHash:  metainfo.HashBytes(b),
580         })
581         require.NoError(t, err)
582         defer tt.Drop()
583         assert.True(t, new)
584         r := tt.NewReader()
585         defer r.Close()
586         b, err = ioutil.ReadAll(r)
587         assert.Len(t, b, 13)
588         assert.NoError(t, err)
589 }
590
591 func BenchmarkAddLargeTorrent(b *testing.B) {
592         cfg := TestingConfig()
593         cfg.DisableTCP = true
594         cfg.DisableUTP = true
595         cfg.ListenAddr = "redonk"
596         cl, err := NewClient(cfg)
597         require.NoError(b, err)
598         defer cl.Close()
599         for range iter.N(b.N) {
600                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
601                 if err != nil {
602                         b.Fatal(err)
603                 }
604                 t.Drop()
605         }
606 }
607
608 func TestResponsive(t *testing.T) {
609         seederDataDir, mi := testutil.GreetingTestTorrent()
610         defer os.RemoveAll(seederDataDir)
611         cfg := TestingConfig()
612         cfg.Seed = true
613         cfg.DataDir = seederDataDir
614         seeder, err := NewClient(cfg)
615         require.Nil(t, err)
616         defer seeder.Close()
617         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
618         seederTorrent.VerifyData()
619         leecherDataDir, err := ioutil.TempDir("", "")
620         require.Nil(t, err)
621         defer os.RemoveAll(leecherDataDir)
622         cfg = TestingConfig()
623         cfg.DataDir = leecherDataDir
624         leecher, err := NewClient(cfg)
625         require.Nil(t, err)
626         defer leecher.Close()
627         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
628                 ret = TorrentSpecFromMetaInfo(mi)
629                 ret.ChunkSize = 2
630                 return
631         }())
632         addClientPeer(leecherTorrent, seeder)
633         reader := leecherTorrent.NewReader()
634         defer reader.Close()
635         reader.SetReadahead(0)
636         reader.SetResponsive()
637         b := make([]byte, 2)
638         _, err = reader.Seek(3, io.SeekStart)
639         require.NoError(t, err)
640         _, err = io.ReadFull(reader, b)
641         assert.Nil(t, err)
642         assert.EqualValues(t, "lo", string(b))
643         _, err = reader.Seek(11, io.SeekStart)
644         require.NoError(t, err)
645         n, err := io.ReadFull(reader, b)
646         assert.Nil(t, err)
647         assert.EqualValues(t, 2, n)
648         assert.EqualValues(t, "d\n", string(b))
649 }
650
651 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
652         seederDataDir, mi := testutil.GreetingTestTorrent()
653         defer os.RemoveAll(seederDataDir)
654         cfg := TestingConfig()
655         cfg.Seed = true
656         cfg.DataDir = seederDataDir
657         seeder, err := NewClient(cfg)
658         require.Nil(t, err)
659         defer seeder.Close()
660         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
661         seederTorrent.VerifyData()
662         leecherDataDir, err := ioutil.TempDir("", "")
663         require.Nil(t, err)
664         defer os.RemoveAll(leecherDataDir)
665         cfg = TestingConfig()
666         cfg.DataDir = leecherDataDir
667         leecher, err := NewClient(cfg)
668         require.Nil(t, err)
669         defer leecher.Close()
670         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
671                 ret = TorrentSpecFromMetaInfo(mi)
672                 ret.ChunkSize = 2
673                 return
674         }())
675         addClientPeer(leecherTorrent, seeder)
676         reader := leecherTorrent.NewReader()
677         defer reader.Close()
678         reader.SetReadahead(0)
679         reader.SetResponsive()
680         b := make([]byte, 2)
681         _, err = reader.Seek(3, io.SeekStart)
682         require.NoError(t, err)
683         _, err = io.ReadFull(reader, b)
684         assert.Nil(t, err)
685         assert.EqualValues(t, "lo", string(b))
686         go leecherTorrent.Drop()
687         _, err = reader.Seek(11, io.SeekStart)
688         require.NoError(t, err)
689         n, err := reader.Read(b)
690         assert.EqualError(t, err, "torrent closed")
691         assert.EqualValues(t, 0, n)
692 }
693
694 func TestDHTInheritBlocklist(t *testing.T) {
695         ipl := iplist.New(nil)
696         require.NotNil(t, ipl)
697         cfg := TestingConfig()
698         cfg.IPBlocklist = ipl
699         cfg.NoDHT = false
700         cl, err := NewClient(cfg)
701         require.NoError(t, err)
702         defer cl.Close()
703         require.Equal(t, ipl, cl.DHT().IPBlocklist())
704 }
705
706 // Check that stuff is merged in subsequent AddTorrentSpec for the same
707 // infohash.
708 func TestAddTorrentSpecMerging(t *testing.T) {
709         cl, err := NewClient(TestingConfig())
710         require.NoError(t, err)
711         defer cl.Close()
712         dir, mi := testutil.GreetingTestTorrent()
713         defer os.RemoveAll(dir)
714         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
715                 InfoHash: mi.HashInfoBytes(),
716         })
717         require.NoError(t, err)
718         require.True(t, new)
719         require.Nil(t, tt.Info())
720         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
721         require.NoError(t, err)
722         require.False(t, new)
723         require.NotNil(t, tt.Info())
724 }
725
726 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
727         dir, mi := testutil.GreetingTestTorrent()
728         os.RemoveAll(dir)
729         cl, _ := NewClient(TestingConfig())
730         defer cl.Close()
731         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
732                 InfoHash: mi.HashInfoBytes(),
733         })
734         tt.Drop()
735         assert.EqualValues(t, 0, len(cl.Torrents()))
736         select {
737         case <-tt.GotInfo():
738                 t.FailNow()
739         default:
740         }
741 }
742
743 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
744         for i := range iter.N(info.NumPieces()) {
745                 p := info.Piece(i)
746                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
747         }
748 }
749
750 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
751         fileCacheDir, err := ioutil.TempDir("", "")
752         require.NoError(t, err)
753         defer os.RemoveAll(fileCacheDir)
754         fileCache, err := filecache.NewCache(fileCacheDir)
755         require.NoError(t, err)
756         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
757         defer os.RemoveAll(greetingDataTempDir)
758         filePieceStore := csf(fileCache)
759         defer filePieceStore.Close()
760         info, err := greetingMetainfo.UnmarshalInfo()
761         require.NoError(t, err)
762         ih := greetingMetainfo.HashInfoBytes()
763         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
764         require.NoError(t, err)
765         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
766         // require.Equal(t, len(testutil.GreetingFileContents), written)
767         // require.NoError(t, err)
768         for i := 0; i < info.NumPieces(); i++ {
769                 p := info.Piece(i)
770                 if alreadyCompleted {
771                         require.NoError(t, greetingData.Piece(p).MarkComplete())
772                 }
773         }
774         cfg := TestingConfig()
775         // TODO: Disable network option?
776         cfg.DisableTCP = true
777         cfg.DisableUTP = true
778         cfg.DefaultStorage = filePieceStore
779         cl, err := NewClient(cfg)
780         require.NoError(t, err)
781         defer cl.Close()
782         tt, err := cl.AddTorrent(greetingMetainfo)
783         require.NoError(t, err)
784         psrs := tt.PieceStateRuns()
785         assert.Len(t, psrs, 1)
786         assert.EqualValues(t, 3, psrs[0].Length)
787         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
788         if alreadyCompleted {
789                 r := tt.NewReader()
790                 b, err := ioutil.ReadAll(r)
791                 assert.NoError(t, err)
792                 assert.EqualValues(t, testutil.GreetingFileContents, b)
793         }
794 }
795
796 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
797         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
798 }
799
800 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
801         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
802 }
803
804 func TestAddMetainfoWithNodes(t *testing.T) {
805         cfg := TestingConfig()
806         cfg.ListenAddr = ":0"
807         cfg.NoDHT = false
808         // For now, we want to just jam the nodes into the table, without
809         // verifying them first. Also the DHT code doesn't support mixing secure
810         // and insecure nodes if security is enabled (yet).
811         cfg.DHTConfig.NoSecurity = true
812         cl, err := NewClient(cfg)
813         require.NoError(t, err)
814         defer cl.Close()
815         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
816         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
817         require.NoError(t, err)
818         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
819         // check if the announce-list is here instead. TODO: Add nodes.
820         assert.Len(t, tt.metainfo.AnnounceList, 5)
821         // There are 6 nodes in the torrent file.
822         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
823 }
824
825 type testDownloadCancelParams struct {
826         ExportClientStatus        bool
827         SetLeecherStorageCapacity bool
828         LeecherStorageCapacity    int64
829         Cancel                    bool
830 }
831
832 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
833         greetingTempDir, mi := testutil.GreetingTestTorrent()
834         defer os.RemoveAll(greetingTempDir)
835         cfg := TestingConfig()
836         cfg.Seed = true
837         cfg.DataDir = greetingTempDir
838         seeder, err := NewClient(cfg)
839         require.NoError(t, err)
840         defer seeder.Close()
841         if ps.ExportClientStatus {
842                 testutil.ExportStatusWriter(seeder, "s")
843         }
844         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
845         seederTorrent.VerifyData()
846         leecherDataDir, err := ioutil.TempDir("", "")
847         require.NoError(t, err)
848         defer os.RemoveAll(leecherDataDir)
849         fc, err := filecache.NewCache(leecherDataDir)
850         require.NoError(t, err)
851         if ps.SetLeecherStorageCapacity {
852                 fc.SetCapacity(ps.LeecherStorageCapacity)
853         }
854         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
855         cfg.DataDir = leecherDataDir
856         leecher, _ := NewClient(cfg)
857         defer leecher.Close()
858         if ps.ExportClientStatus {
859                 testutil.ExportStatusWriter(leecher, "l")
860         }
861         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
862                 ret = TorrentSpecFromMetaInfo(mi)
863                 ret.ChunkSize = 2
864                 return
865         }())
866         require.NoError(t, err)
867         assert.True(t, new)
868         psc := leecherGreeting.SubscribePieceStateChanges()
869         defer psc.Close()
870         leecherGreeting.DownloadAll()
871         if ps.Cancel {
872                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
873         }
874         addClientPeer(leecherGreeting, seeder)
875         completes := make(map[int]bool, 3)
876 values:
877         for {
878                 // started := time.Now()
879                 select {
880                 case _v := <-psc.Values:
881                         // log.Print(time.Since(started))
882                         v := _v.(PieceStateChange)
883                         completes[v.Index] = v.Complete
884                 case <-time.After(100 * time.Millisecond):
885                         break values
886                 }
887         }
888         if ps.Cancel {
889                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
890         } else {
891                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
892         }
893
894 }
895
896 func TestTorrentDownloadAll(t *testing.T) {
897         testDownloadCancel(t, testDownloadCancelParams{})
898 }
899
900 func TestTorrentDownloadAllThenCancel(t *testing.T) {
901         testDownloadCancel(t, testDownloadCancelParams{
902                 Cancel: true,
903         })
904 }
905
906 // Ensure that it's an error for a peer to send an invalid have message.
907 func TestPeerInvalidHave(t *testing.T) {
908         cl, err := NewClient(TestingConfig())
909         require.NoError(t, err)
910         defer cl.Close()
911         info := metainfo.Info{
912                 PieceLength: 1,
913                 Pieces:      make([]byte, 20),
914                 Files:       []metainfo.FileInfo{{Length: 1}},
915         }
916         infoBytes, err := bencode.Marshal(info)
917         require.NoError(t, err)
918         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
919                 InfoBytes: infoBytes,
920                 InfoHash:  metainfo.HashBytes(infoBytes),
921                 Storage:   badStorage{},
922         })
923         require.NoError(t, err)
924         assert.True(t, _new)
925         defer tt.Drop()
926         cn := &connection{
927                 t: tt,
928         }
929         assert.NoError(t, cn.peerSentHave(0))
930         assert.Error(t, cn.peerSentHave(1))
931 }
932
933 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
934         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
935         defer os.RemoveAll(greetingTempDir)
936         cfg := TestingConfig()
937         cfg.DataDir = greetingTempDir
938         seeder, err := NewClient(TestingConfig())
939         require.NoError(t, err)
940         seeder.AddTorrentSpec(&TorrentSpec{
941                 InfoBytes: greetingMetainfo.InfoBytes,
942         })
943 }
944
945 func TestPrepareTrackerAnnounce(t *testing.T) {
946         cl := &Client{}
947         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
948         require.NoError(t, err)
949         assert.False(t, blocked)
950         assert.EqualValues(t, "localhost:1234", host)
951         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
952 }
953
954 // Check that when the listen port is 0, all the protocols listened on have
955 // the same port, and it isn't zero.
956 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
957         cl, err := NewClient(TestingConfig())
958         require.NoError(t, err)
959         defer cl.Close()
960         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
961         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
962 }
963
964 func TestClientDynamicListenTCPOnly(t *testing.T) {
965         cfg := TestingConfig()
966         cfg.DisableUTP = true
967         cl, err := NewClient(cfg)
968         require.NoError(t, err)
969         defer cl.Close()
970         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
971         assert.Nil(t, cl.utpSock)
972 }
973
974 func TestClientDynamicListenUTPOnly(t *testing.T) {
975         cfg := TestingConfig()
976         cfg.DisableTCP = true
977         cl, err := NewClient(cfg)
978         require.NoError(t, err)
979         defer cl.Close()
980         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
981         assert.Nil(t, cl.tcpListener)
982 }
983
984 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
985         cfg := TestingConfig()
986         cfg.DisableTCP = true
987         cfg.DisableUTP = true
988         cl, err := NewClient(cfg)
989         require.NoError(t, err)
990         defer cl.Close()
991         assert.Nil(t, cl.ListenAddr())
992 }
993
994 func addClientPeer(t *Torrent, cl *Client) {
995         t.AddPeers([]Peer{
996                 {
997                         IP:   missinggo.AddrIP(cl.ListenAddr()),
998                         Port: missinggo.AddrPort(cl.ListenAddr()),
999                 },
1000         })
1001 }
1002
1003 func totalConns(tts []*Torrent) (ret int) {
1004         for _, tt := range tts {
1005                 tt.cl.mu.Lock()
1006                 ret += len(tt.conns)
1007                 tt.cl.mu.Unlock()
1008         }
1009         return
1010 }
1011
1012 func TestSetMaxEstablishedConn(t *testing.T) {
1013         var tts []*Torrent
1014         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1015         for i := range iter.N(3) {
1016                 cl, err := NewClient(TestingConfig())
1017                 require.NoError(t, err)
1018                 defer cl.Close()
1019                 tt, _ := cl.AddTorrentInfoHash(ih)
1020                 tt.SetMaxEstablishedConns(2)
1021                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1022                 tts = append(tts, tt)
1023         }
1024         addPeers := func() {
1025                 for i, tt := range tts {
1026                         for _, _tt := range tts[:i] {
1027                                 addClientPeer(tt, _tt.cl)
1028                         }
1029                 }
1030         }
1031         waitTotalConns := func(num int) {
1032                 for totalConns(tts) != num {
1033                         time.Sleep(time.Millisecond)
1034                 }
1035         }
1036         addPeers()
1037         waitTotalConns(6)
1038         tts[0].SetMaxEstablishedConns(1)
1039         waitTotalConns(4)
1040         tts[0].SetMaxEstablishedConns(0)
1041         waitTotalConns(2)
1042         tts[0].SetMaxEstablishedConns(1)
1043         addPeers()
1044         waitTotalConns(4)
1045         tts[0].SetMaxEstablishedConns(2)
1046         addPeers()
1047         waitTotalConns(6)
1048 }
1049
1050 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1051         os.MkdirAll(dir, 0770)
1052         file, err := os.Create(filepath.Join(dir, name))
1053         require.NoError(t, err)
1054         file.Write([]byte(name))
1055         file.Close()
1056         mi := metainfo.MetaInfo{}
1057         mi.SetDefaults()
1058         info := metainfo.Info{PieceLength: 256 * 1024}
1059         err = info.BuildFromFilePath(filepath.Join(dir, name))
1060         require.NoError(t, err)
1061         mi.InfoBytes, err = bencode.Marshal(info)
1062         require.NoError(t, err)
1063         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1064         tr, err := cl.AddTorrent(&mi)
1065         require.NoError(t, err)
1066         require.True(t, tr.Seeding())
1067         tr.VerifyData()
1068         return magnet
1069 }
1070
1071 // https://github.com/anacrolix/torrent/issues/114
1072 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1073         cfg := TestingConfig()
1074         cfg.DisableUTP = true
1075         cfg.Seed = true
1076         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1077         cfg.Debug = true
1078         cfg.ForceEncryption = true
1079         os.Mkdir(cfg.DataDir, 0755)
1080         server, err := NewClient(cfg)
1081         require.NoError(t, err)
1082         defer server.Close()
1083         testutil.ExportStatusWriter(server, "s")
1084         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1085         makeMagnet(t, server, cfg.DataDir, "test2")
1086         cfg = TestingConfig()
1087         cfg.DisableUTP = true
1088         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1089         cfg.Debug = true
1090         cfg.ForceEncryption = true
1091         client, err := NewClient(cfg)
1092         require.NoError(t, err)
1093         defer client.Close()
1094         testutil.ExportStatusWriter(client, "c")
1095         tr, err := client.AddMagnet(magnet1)
1096         require.NoError(t, err)
1097         tr.AddPeers([]Peer{{
1098                 IP:   missinggo.AddrIP(server.ListenAddr()),
1099                 Port: missinggo.AddrPort(server.ListenAddr()),
1100         }})
1101         <-tr.GotInfo()
1102         tr.DownloadAll()
1103         client.WaitAll()
1104 }
1105
1106 func TestClientAddressInUse(t *testing.T) {
1107         s, _ := NewUtpSocket("udp", ":50007")
1108         if s != nil {
1109                 defer s.Close()
1110         }
1111         cfg := TestingConfig()
1112         cfg.ListenAddr = ":50007"
1113         cl, err := NewClient(cfg)
1114         require.Error(t, err)
1115         require.Nil(t, cl)
1116 }