]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Remove test temporary directories when finished
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "math/rand"
10         "net"
11         "os"
12         "path/filepath"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/bradfitz/iter"
23         "github.com/stretchr/testify/assert"
24         "github.com/stretchr/testify/require"
25         "golang.org/x/time/rate"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/internal/testutil"
29         "github.com/anacrolix/torrent/iplist"
30         "github.com/anacrolix/torrent/metainfo"
31         "github.com/anacrolix/torrent/storage"
32 )
33
34 func TestingConfig() *Config {
35         return &Config{
36                 ListenAddr: "localhost:0",
37                 NoDHT:      true,
38                 DataDir: func() string {
39                         ret, err := ioutil.TempDir(tempDir, "")
40                         if err != nil {
41                                 panic(err)
42                         }
43                         return ret
44                 }(),
45                 DisableTrackers: true,
46                 Debug:           true,
47         }
48 }
49
50 func TestClientDefault(t *testing.T) {
51         cl, err := NewClient(TestingConfig())
52         require.NoError(t, err)
53         cl.Close()
54 }
55
56 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
57         cfg := TestingConfig()
58         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
59         require.NoError(t, err)
60         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
61         defer ci.Close()
62         cfg.DefaultStorage = ci
63         cl, err := NewClient(cfg)
64         require.NoError(t, err)
65         cl.Close()
66         // And again, https://github.com/anacrolix/torrent/issues/158
67         cl, err = NewClient(cfg)
68         require.NoError(t, err)
69         cl.Close()
70 }
71
72 func TestAddDropTorrent(t *testing.T) {
73         cl, err := NewClient(TestingConfig())
74         require.NoError(t, err)
75         defer cl.Close()
76         dir, mi := testutil.GreetingTestTorrent()
77         defer os.RemoveAll(dir)
78         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
79         require.NoError(t, err)
80         assert.True(t, new)
81         tt.SetMaxEstablishedConns(0)
82         tt.SetMaxEstablishedConns(1)
83         tt.Drop()
84 }
85
86 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
87         t.SkipNow()
88 }
89
90 func TestAddTorrentNoUsableURLs(t *testing.T) {
91         t.SkipNow()
92 }
93
94 func TestAddPeersToUnknownTorrent(t *testing.T) {
95         t.SkipNow()
96 }
97
98 func TestPieceHashSize(t *testing.T) {
99         if pieceHash.Size() != 20 {
100                 t.FailNow()
101         }
102 }
103
104 func TestTorrentInitialState(t *testing.T) {
105         dir, mi := testutil.GreetingTestTorrent()
106         defer os.RemoveAll(dir)
107         tor := &Torrent{
108                 infoHash:          mi.HashInfoBytes(),
109                 pieceStateChanges: pubsub.NewPubSub(),
110         }
111         tor.chunkSize = 2
112         tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion("/dev/null", storage.NewMapPieceCompletion()))
113         // Needed to lock for asynchronous piece verification.
114         tor.cl = new(Client)
115         err := tor.setInfoBytes(mi.InfoBytes)
116         require.NoError(t, err)
117         require.Len(t, tor.pieces, 3)
118         tor.pendAllChunkSpecs(0)
119         tor.cl.mu.Lock()
120         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
121         tor.cl.mu.Unlock()
122         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
123 }
124
125 func TestUnmarshalPEXMsg(t *testing.T) {
126         var m peerExchangeMessage
127         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
128                 t.Fatal(err)
129         }
130         if len(m.Added) != 2 {
131                 t.FailNow()
132         }
133         if m.Added[0].Port != 0x506 {
134                 t.FailNow()
135         }
136 }
137
138 func TestReducedDialTimeout(t *testing.T) {
139         for _, _case := range []struct {
140                 Max             time.Duration
141                 HalfOpenLimit   int
142                 PendingPeers    int
143                 ExpectedReduced time.Duration
144         }{
145                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
146                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
147                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
148                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
149                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
150                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
151         } {
152                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
153                 expected := _case.ExpectedReduced
154                 if expected < minDialTimeout {
155                         expected = minDialTimeout
156                 }
157                 if reduced != expected {
158                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
159                 }
160         }
161 }
162
163 func TestUTPRawConn(t *testing.T) {
164         l, err := NewUtpSocket("udp", "")
165         require.NoError(t, err)
166         defer l.Close()
167         go func() {
168                 for {
169                         _, err := l.Accept()
170                         if err != nil {
171                                 break
172                         }
173                 }
174         }()
175         // Connect a UTP peer to see if the RawConn will still work.
176         s, err := NewUtpSocket("udp", "")
177         require.NoError(t, err)
178         defer s.Close()
179         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
180         require.NoError(t, err)
181         defer utpPeer.Close()
182         peer, err := net.ListenPacket("udp", ":0")
183         require.NoError(t, err)
184         defer peer.Close()
185
186         msgsReceived := 0
187         // How many messages to send. I've set this to double the channel buffer
188         // size in the raw packetConn.
189         const N = 200
190         readerStopped := make(chan struct{})
191         // The reader goroutine.
192         go func() {
193                 defer close(readerStopped)
194                 b := make([]byte, 500)
195                 for i := 0; i < N; i++ {
196                         n, _, err := l.ReadFrom(b)
197                         require.NoError(t, err)
198                         msgsReceived++
199                         var d int
200                         fmt.Sscan(string(b[:n]), &d)
201                         assert.Equal(t, i, d)
202                 }
203         }()
204         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
205         require.NoError(t, err)
206         for i := 0; i < N; i++ {
207                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
208                 require.NoError(t, err)
209                 time.Sleep(time.Millisecond)
210         }
211         select {
212         case <-readerStopped:
213         case <-time.After(time.Second):
214                 t.Fatal("reader timed out")
215         }
216         if msgsReceived != N {
217                 t.Fatalf("messages received: %d", msgsReceived)
218         }
219 }
220
221 func TestTwoClientsArbitraryPorts(t *testing.T) {
222         for i := 0; i < 2; i++ {
223                 cl, err := NewClient(TestingConfig())
224                 if err != nil {
225                         t.Fatal(err)
226                 }
227                 defer cl.Close()
228         }
229 }
230
231 func TestAddDropManyTorrents(t *testing.T) {
232         cl, err := NewClient(TestingConfig())
233         require.NoError(t, err)
234         defer cl.Close()
235         for i := range iter.N(1000) {
236                 var spec TorrentSpec
237                 binary.PutVarint(spec.InfoHash[:], int64(i))
238                 tt, new, err := cl.AddTorrentSpec(&spec)
239                 assert.NoError(t, err)
240                 assert.True(t, new)
241                 defer tt.Drop()
242         }
243 }
244
245 type FileCacheClientStorageFactoryParams struct {
246         Capacity    int64
247         SetCapacity bool
248         Wrapper     func(*filecache.Cache) storage.ClientImpl
249 }
250
251 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
252         return func(dataDir string) storage.ClientImpl {
253                 fc, err := filecache.NewCache(dataDir)
254                 if err != nil {
255                         panic(err)
256                 }
257                 if ps.SetCapacity {
258                         fc.SetCapacity(ps.Capacity)
259                 }
260                 return ps.Wrapper(fc)
261         }
262 }
263
264 type storageFactory func(string) storage.ClientImpl
265
266 func TestClientTransferDefault(t *testing.T) {
267         testClientTransfer(t, testClientTransferParams{
268                 ExportClientStatus: true,
269                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
270                         Wrapper: fileCachePieceResourceStorage,
271                 }),
272         })
273 }
274
275 func TestClientTransferRateLimitedUpload(t *testing.T) {
276         started := time.Now()
277         testClientTransfer(t, testClientTransferParams{
278                 // We are uploading 13 bytes (the length of the greeting torrent). The
279                 // chunks are 2 bytes in length. Then the smallest burst we can run
280                 // with is 2. Time taken is (13-burst)/rate.
281                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
282         })
283         require.True(t, time.Since(started) > time.Second)
284 }
285
286 func TestClientTransferRateLimitedDownload(t *testing.T) {
287         testClientTransfer(t, testClientTransferParams{
288                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
289         })
290 }
291
292 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
293         return storage.NewResourcePieces(fc.AsResourceProvider())
294 }
295
296 func TestClientTransferSmallCache(t *testing.T) {
297         testClientTransfer(t, testClientTransferParams{
298                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
299                         SetCapacity: true,
300                         // Going below the piece length means it can't complete a piece so
301                         // that it can be hashed.
302                         Capacity: 5,
303                         Wrapper:  fileCachePieceResourceStorage,
304                 }),
305                 SetReadahead: true,
306                 // Can't readahead too far or the cache will thrash and drop data we
307                 // thought we had.
308                 Readahead:          0,
309                 ExportClientStatus: true,
310         })
311 }
312
313 func TestClientTransferVarious(t *testing.T) {
314         // Leecher storage
315         for _, ls := range []storageFactory{
316                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
317                         Wrapper: fileCachePieceResourceStorage,
318                 }),
319                 storage.NewBoltDB,
320         } {
321                 // Seeder storage
322                 for _, ss := range []func(string) storage.ClientImpl{
323                         storage.NewFile,
324                         storage.NewMMap,
325                 } {
326                         for _, responsive := range []bool{false, true} {
327                                 testClientTransfer(t, testClientTransferParams{
328                                         Responsive:     responsive,
329                                         SeederStorage:  ss,
330                                         LeecherStorage: ls,
331                                 })
332                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
333                                         testClientTransfer(t, testClientTransferParams{
334                                                 SeederStorage:  ss,
335                                                 Responsive:     responsive,
336                                                 SetReadahead:   true,
337                                                 Readahead:      readahead,
338                                                 LeecherStorage: ls,
339                                         })
340                                 }
341                         }
342                 }
343         }
344 }
345
346 type testClientTransferParams struct {
347         Responsive                 bool
348         Readahead                  int64
349         SetReadahead               bool
350         ExportClientStatus         bool
351         LeecherStorage             func(string) storage.ClientImpl
352         SeederStorage              func(string) storage.ClientImpl
353         SeederUploadRateLimiter    *rate.Limiter
354         LeecherDownloadRateLimiter *rate.Limiter
355 }
356
357 // Creates a seeder and a leecher, and ensures the data transfers when a read
358 // is attempted on the leecher.
359 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
360         greetingTempDir, mi := testutil.GreetingTestTorrent()
361         defer os.RemoveAll(greetingTempDir)
362         // Create seeder and a Torrent.
363         cfg := TestingConfig()
364         cfg.Seed = true
365         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
366         // cfg.ListenAddr = "localhost:4000"
367         if ps.SeederStorage != nil {
368                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
369                 defer cfg.DefaultStorage.Close()
370         } else {
371                 cfg.DataDir = greetingTempDir
372         }
373         seeder, err := NewClient(cfg)
374         require.NoError(t, err)
375         defer seeder.Close()
376         if ps.ExportClientStatus {
377                 testutil.ExportStatusWriter(seeder, "s")
378         }
379         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
380         seederTorrent.VerifyData()
381         // Create leecher and a Torrent.
382         leecherDataDir, err := ioutil.TempDir("", "")
383         require.NoError(t, err)
384         defer os.RemoveAll(leecherDataDir)
385         if ps.LeecherStorage == nil {
386                 cfg.DataDir = leecherDataDir
387         } else {
388                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
389         }
390         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
391         // cfg.ListenAddr = "localhost:4001"
392         leecher, err := NewClient(cfg)
393         require.NoError(t, err)
394         defer leecher.Close()
395         if ps.ExportClientStatus {
396                 testutil.ExportStatusWriter(leecher, "l")
397         }
398         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
399                 ret = TorrentSpecFromMetaInfo(mi)
400                 ret.ChunkSize = 2
401                 return
402         }())
403         require.NoError(t, err)
404         assert.True(t, new)
405         // Now do some things with leecher and seeder.
406         addClientPeer(leecherGreeting, seeder)
407         r := leecherGreeting.NewReader()
408         defer r.Close()
409         if ps.Responsive {
410                 r.SetResponsive()
411         }
412         if ps.SetReadahead {
413                 r.SetReadahead(ps.Readahead)
414         }
415         assertReadAllGreeting(t, r)
416         // After one read through, we can assume certain torrent statistics.
417         // These are not a strict requirement. It is however interesting to
418         // follow.
419         // t.Logf("%#v", seederTorrent.Stats())
420         // assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
421         // assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
422         // assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
423         // assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
424         // Read through again for the cases where the torrent data size exceeds
425         // the size of the cache.
426         assertReadAllGreeting(t, r)
427 }
428
429 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
430         pos, err := r.Seek(0, os.SEEK_SET)
431         assert.NoError(t, err)
432         assert.EqualValues(t, 0, pos)
433         _greeting, err := ioutil.ReadAll(r)
434         assert.NoError(t, err)
435         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
436 }
437
438 // Check that after completing leeching, a leecher transitions to a seeding
439 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
440 func TestSeedAfterDownloading(t *testing.T) {
441         greetingTempDir, mi := testutil.GreetingTestTorrent()
442         defer os.RemoveAll(greetingTempDir)
443         cfg := TestingConfig()
444         cfg.Seed = true
445         cfg.DataDir = greetingTempDir
446         seeder, err := NewClient(cfg)
447         require.NoError(t, err)
448         defer seeder.Close()
449         testutil.ExportStatusWriter(seeder, "s")
450         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
451         seederTorrent.VerifyData()
452         cfg.DataDir, err = ioutil.TempDir("", "")
453         require.NoError(t, err)
454         defer os.RemoveAll(cfg.DataDir)
455         leecher, err := NewClient(cfg)
456         require.NoError(t, err)
457         defer leecher.Close()
458         testutil.ExportStatusWriter(leecher, "l")
459         cfg.Seed = false
460         // cfg.TorrentDataOpener = nil
461         cfg.DataDir, err = ioutil.TempDir("", "")
462         require.NoError(t, err)
463         defer os.RemoveAll(cfg.DataDir)
464         leecherLeecher, _ := NewClient(cfg)
465         defer leecherLeecher.Close()
466         testutil.ExportStatusWriter(leecherLeecher, "ll")
467         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
468                 ret = TorrentSpecFromMetaInfo(mi)
469                 ret.ChunkSize = 2
470                 return
471         }())
472         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
473                 ret = TorrentSpecFromMetaInfo(mi)
474                 ret.ChunkSize = 3
475                 return
476         }())
477         // Simultaneously DownloadAll in Leecher, and read the contents
478         // consecutively in LeecherLeecher. This non-deterministically triggered a
479         // case where the leecher wouldn't unchoke the LeecherLeecher.
480         var wg sync.WaitGroup
481         wg.Add(1)
482         go func() {
483                 defer wg.Done()
484                 r := llg.NewReader()
485                 defer r.Close()
486                 b, err := ioutil.ReadAll(r)
487                 require.NoError(t, err)
488                 assert.EqualValues(t, testutil.GreetingFileContents, b)
489         }()
490         addClientPeer(leecherGreeting, seeder)
491         addClientPeer(leecherGreeting, leecherLeecher)
492         wg.Add(1)
493         go func() {
494                 defer wg.Done()
495                 leecherGreeting.DownloadAll()
496                 leecher.WaitAll()
497         }()
498         wg.Wait()
499 }
500
501 func TestMergingTrackersByAddingSpecs(t *testing.T) {
502         cl, err := NewClient(TestingConfig())
503         require.NoError(t, err)
504         defer cl.Close()
505         spec := TorrentSpec{}
506         T, new, _ := cl.AddTorrentSpec(&spec)
507         if !new {
508                 t.FailNow()
509         }
510         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
511         _, new, _ = cl.AddTorrentSpec(&spec)
512         assert.False(t, new)
513         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
514         // Because trackers are disabled in TestingConfig.
515         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
516 }
517
518 type badStorage struct{}
519
520 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
521         return bs, nil
522 }
523
524 func (bs badStorage) Close() error {
525         return nil
526 }
527
528 func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
529         return badStoragePiece{p}
530 }
531
532 type badStoragePiece struct {
533         p metainfo.Piece
534 }
535
536 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
537         return 0, nil
538 }
539
540 func (p badStoragePiece) GetIsComplete() bool {
541         return true
542 }
543
544 func (p badStoragePiece) MarkComplete() error {
545         return errors.New("psyyyyyyyche")
546 }
547
548 func (p badStoragePiece) MarkNotComplete() error {
549         return errors.New("psyyyyyyyche")
550 }
551
552 func (p badStoragePiece) randomlyTruncatedDataString() string {
553         return "hello, world\n"[:rand.Intn(14)]
554 }
555
556 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
557         r := strings.NewReader(p.randomlyTruncatedDataString())
558         return r.ReadAt(b, off+p.p.Offset())
559 }
560
561 // We read from a piece which is marked completed, but is missing data.
562 func TestCompletedPieceWrongSize(t *testing.T) {
563         cfg := TestingConfig()
564         cfg.DefaultStorage = badStorage{}
565         cl, err := NewClient(cfg)
566         require.NoError(t, err)
567         defer cl.Close()
568         info := metainfo.Info{
569                 PieceLength: 15,
570                 Pieces:      make([]byte, 20),
571                 Files: []metainfo.FileInfo{
572                         {Path: []string{"greeting"}, Length: 13},
573                 },
574         }
575         b, err := bencode.Marshal(info)
576         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
577                 InfoBytes: b,
578                 InfoHash:  metainfo.HashBytes(b),
579         })
580         require.NoError(t, err)
581         defer tt.Drop()
582         assert.True(t, new)
583         r := tt.NewReader()
584         defer r.Close()
585         b, err = ioutil.ReadAll(r)
586         assert.Len(t, b, 13)
587         assert.NoError(t, err)
588 }
589
590 func BenchmarkAddLargeTorrent(b *testing.B) {
591         cfg := TestingConfig()
592         cfg.DisableTCP = true
593         cfg.DisableUTP = true
594         cfg.ListenAddr = "redonk"
595         cl, err := NewClient(cfg)
596         require.NoError(b, err)
597         defer cl.Close()
598         for range iter.N(b.N) {
599                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
600                 if err != nil {
601                         b.Fatal(err)
602                 }
603                 t.Drop()
604         }
605 }
606
607 func TestResponsive(t *testing.T) {
608         seederDataDir, mi := testutil.GreetingTestTorrent()
609         defer os.RemoveAll(seederDataDir)
610         cfg := TestingConfig()
611         cfg.Seed = true
612         cfg.DataDir = seederDataDir
613         seeder, err := NewClient(cfg)
614         require.Nil(t, err)
615         defer seeder.Close()
616         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
617         seederTorrent.VerifyData()
618         leecherDataDir, err := ioutil.TempDir("", "")
619         require.Nil(t, err)
620         defer os.RemoveAll(leecherDataDir)
621         cfg = TestingConfig()
622         cfg.DataDir = leecherDataDir
623         leecher, err := NewClient(cfg)
624         require.Nil(t, err)
625         defer leecher.Close()
626         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
627                 ret = TorrentSpecFromMetaInfo(mi)
628                 ret.ChunkSize = 2
629                 return
630         }())
631         addClientPeer(leecherTorrent, seeder)
632         reader := leecherTorrent.NewReader()
633         defer reader.Close()
634         reader.SetReadahead(0)
635         reader.SetResponsive()
636         b := make([]byte, 2)
637         _, err = reader.Seek(3, os.SEEK_SET)
638         require.NoError(t, err)
639         _, err = io.ReadFull(reader, b)
640         assert.Nil(t, err)
641         assert.EqualValues(t, "lo", string(b))
642         _, err = reader.Seek(11, os.SEEK_SET)
643         require.NoError(t, err)
644         n, err := io.ReadFull(reader, b)
645         assert.Nil(t, err)
646         assert.EqualValues(t, 2, n)
647         assert.EqualValues(t, "d\n", string(b))
648 }
649
650 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
651         seederDataDir, mi := testutil.GreetingTestTorrent()
652         defer os.RemoveAll(seederDataDir)
653         cfg := TestingConfig()
654         cfg.Seed = true
655         cfg.DataDir = seederDataDir
656         seeder, err := NewClient(cfg)
657         require.Nil(t, err)
658         defer seeder.Close()
659         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
660         seederTorrent.VerifyData()
661         leecherDataDir, err := ioutil.TempDir("", "")
662         require.Nil(t, err)
663         defer os.RemoveAll(leecherDataDir)
664         cfg = TestingConfig()
665         cfg.DataDir = leecherDataDir
666         leecher, err := NewClient(cfg)
667         require.Nil(t, err)
668         defer leecher.Close()
669         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
670                 ret = TorrentSpecFromMetaInfo(mi)
671                 ret.ChunkSize = 2
672                 return
673         }())
674         addClientPeer(leecherTorrent, seeder)
675         reader := leecherTorrent.NewReader()
676         defer reader.Close()
677         reader.SetReadahead(0)
678         reader.SetResponsive()
679         b := make([]byte, 2)
680         _, err = reader.Seek(3, os.SEEK_SET)
681         require.NoError(t, err)
682         _, err = io.ReadFull(reader, b)
683         assert.Nil(t, err)
684         assert.EqualValues(t, "lo", string(b))
685         go leecherTorrent.Drop()
686         _, err = reader.Seek(11, os.SEEK_SET)
687         require.NoError(t, err)
688         n, err := reader.Read(b)
689         assert.EqualError(t, err, "torrent closed")
690         assert.EqualValues(t, 0, n)
691 }
692
693 func TestDHTInheritBlocklist(t *testing.T) {
694         ipl := iplist.New(nil)
695         require.NotNil(t, ipl)
696         cfg := TestingConfig()
697         cfg.IPBlocklist = ipl
698         cfg.NoDHT = false
699         cl, err := NewClient(cfg)
700         require.NoError(t, err)
701         defer cl.Close()
702         require.Equal(t, ipl, cl.DHT().IPBlocklist())
703 }
704
705 // Check that stuff is merged in subsequent AddTorrentSpec for the same
706 // infohash.
707 func TestAddTorrentSpecMerging(t *testing.T) {
708         cl, err := NewClient(TestingConfig())
709         require.NoError(t, err)
710         defer cl.Close()
711         dir, mi := testutil.GreetingTestTorrent()
712         defer os.RemoveAll(dir)
713         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
714                 InfoHash: mi.HashInfoBytes(),
715         })
716         require.NoError(t, err)
717         require.True(t, new)
718         require.Nil(t, tt.Info())
719         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
720         require.NoError(t, err)
721         require.False(t, new)
722         require.NotNil(t, tt.Info())
723 }
724
725 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
726         dir, mi := testutil.GreetingTestTorrent()
727         os.RemoveAll(dir)
728         cl, _ := NewClient(TestingConfig())
729         defer cl.Close()
730         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
731                 InfoHash: mi.HashInfoBytes(),
732         })
733         tt.Drop()
734         assert.EqualValues(t, 0, len(cl.Torrents()))
735         select {
736         case <-tt.GotInfo():
737                 t.FailNow()
738         default:
739         }
740 }
741
742 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
743         for i := range iter.N(info.NumPieces()) {
744                 p := info.Piece(i)
745                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
746         }
747 }
748
749 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
750         fileCacheDir, err := ioutil.TempDir("", "")
751         require.NoError(t, err)
752         defer os.RemoveAll(fileCacheDir)
753         fileCache, err := filecache.NewCache(fileCacheDir)
754         require.NoError(t, err)
755         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
756         defer os.RemoveAll(greetingDataTempDir)
757         filePieceStore := csf(fileCache)
758         defer filePieceStore.Close()
759         info, err := greetingMetainfo.UnmarshalInfo()
760         require.NoError(t, err)
761         ih := greetingMetainfo.HashInfoBytes()
762         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
763         require.NoError(t, err)
764         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
765         // require.Equal(t, len(testutil.GreetingFileContents), written)
766         // require.NoError(t, err)
767         for i := 0; i < info.NumPieces(); i++ {
768                 p := info.Piece(i)
769                 if alreadyCompleted {
770                         require.NoError(t, greetingData.Piece(p).MarkComplete())
771                 }
772         }
773         cfg := TestingConfig()
774         // TODO: Disable network option?
775         cfg.DisableTCP = true
776         cfg.DisableUTP = true
777         cfg.DefaultStorage = filePieceStore
778         cl, err := NewClient(cfg)
779         require.NoError(t, err)
780         defer cl.Close()
781         tt, err := cl.AddTorrent(greetingMetainfo)
782         require.NoError(t, err)
783         psrs := tt.PieceStateRuns()
784         assert.Len(t, psrs, 1)
785         assert.EqualValues(t, 3, psrs[0].Length)
786         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
787         if alreadyCompleted {
788                 r := tt.NewReader()
789                 b, err := ioutil.ReadAll(r)
790                 assert.NoError(t, err)
791                 assert.EqualValues(t, testutil.GreetingFileContents, b)
792         }
793 }
794
795 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
796         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
797 }
798
799 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
800         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
801 }
802
803 func TestAddMetainfoWithNodes(t *testing.T) {
804         cfg := TestingConfig()
805         cfg.ListenAddr = ":0"
806         cfg.NoDHT = false
807         // For now, we want to just jam the nodes into the table, without
808         // verifying them first. Also the DHT code doesn't support mixing secure
809         // and insecure nodes if security is enabled (yet).
810         cfg.DHTConfig.NoSecurity = true
811         cl, err := NewClient(cfg)
812         require.NoError(t, err)
813         defer cl.Close()
814         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
815         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
816         require.NoError(t, err)
817         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
818         // check if the announce-list is here instead. TODO: Add nodes.
819         assert.Len(t, tt.metainfo.AnnounceList, 5)
820         // There are 6 nodes in the torrent file.
821         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
822 }
823
824 type testDownloadCancelParams struct {
825         ExportClientStatus        bool
826         SetLeecherStorageCapacity bool
827         LeecherStorageCapacity    int64
828         Cancel                    bool
829 }
830
831 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
832         greetingTempDir, mi := testutil.GreetingTestTorrent()
833         defer os.RemoveAll(greetingTempDir)
834         cfg := TestingConfig()
835         cfg.Seed = true
836         cfg.DataDir = greetingTempDir
837         seeder, err := NewClient(cfg)
838         require.NoError(t, err)
839         defer seeder.Close()
840         if ps.ExportClientStatus {
841                 testutil.ExportStatusWriter(seeder, "s")
842         }
843         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
844         seederTorrent.VerifyData()
845         leecherDataDir, err := ioutil.TempDir("", "")
846         require.NoError(t, err)
847         defer os.RemoveAll(leecherDataDir)
848         fc, err := filecache.NewCache(leecherDataDir)
849         require.NoError(t, err)
850         if ps.SetLeecherStorageCapacity {
851                 fc.SetCapacity(ps.LeecherStorageCapacity)
852         }
853         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
854         cfg.DataDir = leecherDataDir
855         leecher, _ := NewClient(cfg)
856         defer leecher.Close()
857         if ps.ExportClientStatus {
858                 testutil.ExportStatusWriter(leecher, "l")
859         }
860         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
861                 ret = TorrentSpecFromMetaInfo(mi)
862                 ret.ChunkSize = 2
863                 return
864         }())
865         require.NoError(t, err)
866         assert.True(t, new)
867         psc := leecherGreeting.SubscribePieceStateChanges()
868         defer psc.Close()
869         leecherGreeting.DownloadAll()
870         if ps.Cancel {
871                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
872         }
873         addClientPeer(leecherGreeting, seeder)
874         completes := make(map[int]bool, 3)
875 values:
876         for {
877                 // started := time.Now()
878                 select {
879                 case _v := <-psc.Values:
880                         // log.Print(time.Since(started))
881                         v := _v.(PieceStateChange)
882                         completes[v.Index] = v.Complete
883                 case <-time.After(100 * time.Millisecond):
884                         break values
885                 }
886         }
887         if ps.Cancel {
888                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
889         } else {
890                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
891         }
892
893 }
894
895 func TestTorrentDownloadAll(t *testing.T) {
896         testDownloadCancel(t, testDownloadCancelParams{})
897 }
898
899 func TestTorrentDownloadAllThenCancel(t *testing.T) {
900         testDownloadCancel(t, testDownloadCancelParams{
901                 Cancel: true,
902         })
903 }
904
905 // Ensure that it's an error for a peer to send an invalid have message.
906 func TestPeerInvalidHave(t *testing.T) {
907         cl, err := NewClient(TestingConfig())
908         require.NoError(t, err)
909         defer cl.Close()
910         info := metainfo.Info{
911                 PieceLength: 1,
912                 Pieces:      make([]byte, 20),
913                 Files:       []metainfo.FileInfo{{Length: 1}},
914         }
915         infoBytes, err := bencode.Marshal(info)
916         require.NoError(t, err)
917         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
918                 InfoBytes: infoBytes,
919                 InfoHash:  metainfo.HashBytes(infoBytes),
920         })
921         require.NoError(t, err)
922         assert.True(t, _new)
923         defer tt.Drop()
924         cn := &connection{
925                 t: tt,
926         }
927         assert.NoError(t, cn.peerSentHave(0))
928         assert.Error(t, cn.peerSentHave(1))
929 }
930
931 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
932         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
933         defer os.RemoveAll(greetingTempDir)
934         cfg := TestingConfig()
935         cfg.DataDir = greetingTempDir
936         seeder, err := NewClient(TestingConfig())
937         require.NoError(t, err)
938         seeder.AddTorrentSpec(&TorrentSpec{
939                 InfoBytes: greetingMetainfo.InfoBytes,
940         })
941 }
942
943 func TestPrepareTrackerAnnounce(t *testing.T) {
944         cl := &Client{}
945         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
946         require.NoError(t, err)
947         assert.False(t, blocked)
948         assert.EqualValues(t, "localhost:1234", host)
949         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
950 }
951
952 // Check that when the listen port is 0, all the protocols listened on have
953 // the same port, and it isn't zero.
954 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
955         cl, err := NewClient(TestingConfig())
956         require.NoError(t, err)
957         defer cl.Close()
958         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
959         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
960 }
961
962 func TestClientDynamicListenTCPOnly(t *testing.T) {
963         cfg := TestingConfig()
964         cfg.DisableUTP = true
965         cl, err := NewClient(cfg)
966         require.NoError(t, err)
967         defer cl.Close()
968         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
969         assert.Nil(t, cl.utpSock)
970 }
971
972 func TestClientDynamicListenUTPOnly(t *testing.T) {
973         cfg := TestingConfig()
974         cfg.DisableTCP = true
975         cl, err := NewClient(cfg)
976         require.NoError(t, err)
977         defer cl.Close()
978         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
979         assert.Nil(t, cl.tcpListener)
980 }
981
982 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
983         cfg := TestingConfig()
984         cfg.DisableTCP = true
985         cfg.DisableUTP = true
986         cl, err := NewClient(cfg)
987         require.NoError(t, err)
988         defer cl.Close()
989         assert.Nil(t, cl.ListenAddr())
990 }
991
992 func addClientPeer(t *Torrent, cl *Client) {
993         t.AddPeers([]Peer{
994                 {
995                         IP:   missinggo.AddrIP(cl.ListenAddr()),
996                         Port: missinggo.AddrPort(cl.ListenAddr()),
997                 },
998         })
999 }
1000
1001 func totalConns(tts []*Torrent) (ret int) {
1002         for _, tt := range tts {
1003                 tt.cl.mu.Lock()
1004                 ret += len(tt.conns)
1005                 tt.cl.mu.Unlock()
1006         }
1007         return
1008 }
1009
1010 func TestSetMaxEstablishedConn(t *testing.T) {
1011         var tts []*Torrent
1012         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1013         for i := range iter.N(3) {
1014                 cl, err := NewClient(TestingConfig())
1015                 require.NoError(t, err)
1016                 defer cl.Close()
1017                 tt, _ := cl.AddTorrentInfoHash(ih)
1018                 tt.SetMaxEstablishedConns(2)
1019                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
1020                 tts = append(tts, tt)
1021         }
1022         addPeers := func() {
1023                 for i, tt := range tts {
1024                         for _, _tt := range tts[:i] {
1025                                 addClientPeer(tt, _tt.cl)
1026                         }
1027                 }
1028         }
1029         waitTotalConns := func(num int) {
1030                 for totalConns(tts) != num {
1031                         time.Sleep(time.Millisecond)
1032                 }
1033         }
1034         addPeers()
1035         waitTotalConns(6)
1036         tts[0].SetMaxEstablishedConns(1)
1037         waitTotalConns(4)
1038         tts[0].SetMaxEstablishedConns(0)
1039         waitTotalConns(2)
1040         tts[0].SetMaxEstablishedConns(1)
1041         addPeers()
1042         waitTotalConns(4)
1043         tts[0].SetMaxEstablishedConns(2)
1044         addPeers()
1045         waitTotalConns(6)
1046 }
1047
1048 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1049         os.MkdirAll(dir, 0770)
1050         file, err := os.Create(filepath.Join(dir, name))
1051         require.NoError(t, err)
1052         file.Write([]byte(name))
1053         file.Close()
1054         mi := metainfo.MetaInfo{}
1055         mi.SetDefaults()
1056         info := metainfo.Info{PieceLength: 256 * 1024}
1057         err = info.BuildFromFilePath(filepath.Join(dir, name))
1058         require.NoError(t, err)
1059         mi.InfoBytes, err = bencode.Marshal(info)
1060         require.NoError(t, err)
1061         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1062         tr, err := cl.AddTorrent(&mi)
1063         require.NoError(t, err)
1064         require.True(t, tr.Seeding())
1065         tr.VerifyData()
1066         return magnet
1067 }
1068
1069 // https://github.com/anacrolix/torrent/issues/114
1070 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1071         cfg := TestingConfig()
1072         cfg.DisableUTP = true
1073         cfg.Seed = true
1074         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1075         cfg.Debug = true
1076         cfg.ForceEncryption = true
1077         os.Mkdir(cfg.DataDir, 0755)
1078         server, err := NewClient(cfg)
1079         require.NoError(t, err)
1080         defer server.Close()
1081         testutil.ExportStatusWriter(server, "s")
1082         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1083         makeMagnet(t, server, cfg.DataDir, "test2")
1084         cfg = TestingConfig()
1085         cfg.DisableUTP = true
1086         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1087         cfg.Debug = true
1088         cfg.ForceEncryption = true
1089         client, err := NewClient(cfg)
1090         require.NoError(t, err)
1091         defer client.Close()
1092         testutil.ExportStatusWriter(client, "c")
1093         tr, err := client.AddMagnet(magnet1)
1094         require.NoError(t, err)
1095         tr.AddPeers([]Peer{{
1096                 IP:   missinggo.AddrIP(server.ListenAddr()),
1097                 Port: missinggo.AddrPort(server.ListenAddr()),
1098         }})
1099         <-tr.GotInfo()
1100         tr.DownloadAll()
1101         client.WaitAll()
1102 }
1103
1104 func TestClientAddressInUse(t *testing.T) {
1105         s, _ := NewUtpSocket("udp", ":50007")
1106         if s != nil {
1107                 defer s.Close()
1108         }
1109         cfg := TestingConfig()
1110         cfg.ListenAddr = ":50007"
1111         cl, err := NewClient(cfg)
1112         require.Error(t, err)
1113         require.Nil(t, cl)
1114 }