]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
b465279b7e7936cb3f4c94945fa9753f54869d21
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "context"
5         "encoding/binary"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "net"
10         "os"
11         "path/filepath"
12         "sync"
13         "testing"
14         "time"
15
16         "github.com/anacrolix/dht"
17         _ "github.com/anacrolix/envpprof"
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/filecache"
20         "github.com/bradfitz/iter"
21         "github.com/stretchr/testify/assert"
22         "github.com/stretchr/testify/require"
23         "golang.org/x/time/rate"
24
25         "github.com/anacrolix/torrent/bencode"
26         "github.com/anacrolix/torrent/internal/testutil"
27         "github.com/anacrolix/torrent/iplist"
28         "github.com/anacrolix/torrent/metainfo"
29         "github.com/anacrolix/torrent/storage"
30 )
31
32 func TestingConfig() *Config {
33         return &Config{
34                 ListenHost:              LoopbackListenHost,
35                 NoDHT:                   true,
36                 DataDir:                 tempDir(),
37                 DisableTrackers:         true,
38                 NoDefaultPortForwarding: true,
39                 // Debug:           true,
40         }
41 }
42
43 func TestClientDefault(t *testing.T) {
44         cl, err := NewClient(TestingConfig())
45         require.NoError(t, err)
46         cl.Close()
47 }
48
49 func TestClientNilConfig(t *testing.T) {
50         cl, err := NewClient(nil)
51         require.NoError(t, err)
52         cl.Close()
53 }
54
55 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
56         cfg := TestingConfig()
57         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
58         require.NoError(t, err)
59         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
60         defer ci.Close()
61         cfg.DefaultStorage = ci
62         cl, err := NewClient(cfg)
63         require.NoError(t, err)
64         cl.Close()
65         // And again, https://github.com/anacrolix/torrent/issues/158
66         cl, err = NewClient(cfg)
67         require.NoError(t, err)
68         cl.Close()
69 }
70
71 func TestAddDropTorrent(t *testing.T) {
72         cl, err := NewClient(TestingConfig())
73         require.NoError(t, err)
74         defer cl.Close()
75         dir, mi := testutil.GreetingTestTorrent()
76         defer os.RemoveAll(dir)
77         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
78         require.NoError(t, err)
79         assert.True(t, new)
80         tt.SetMaxEstablishedConns(0)
81         tt.SetMaxEstablishedConns(1)
82         tt.Drop()
83 }
84
85 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
86         // TODO?
87         t.SkipNow()
88 }
89
90 func TestAddTorrentNoUsableURLs(t *testing.T) {
91         // TODO?
92         t.SkipNow()
93 }
94
95 func TestAddPeersToUnknownTorrent(t *testing.T) {
96         // TODO?
97         t.SkipNow()
98 }
99
100 func TestPieceHashSize(t *testing.T) {
101         assert.Equal(t, 20, pieceHash.Size())
102 }
103
104 func TestTorrentInitialState(t *testing.T) {
105         dir, mi := testutil.GreetingTestTorrent()
106         defer os.RemoveAll(dir)
107         cl := &Client{}
108         cl.initLogger()
109         tor := cl.newTorrent(
110                 mi.HashInfoBytes(),
111                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
112         )
113         tor.setChunkSize(2)
114         tor.cl.mu.Lock()
115         err := tor.setInfoBytes(mi.InfoBytes)
116         tor.cl.mu.Unlock()
117         require.NoError(t, err)
118         require.Len(t, tor.pieces, 3)
119         tor.pendAllChunkSpecs(0)
120         tor.cl.mu.Lock()
121         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
122         tor.cl.mu.Unlock()
123         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
124 }
125
126 func TestUnmarshalPEXMsg(t *testing.T) {
127         var m peerExchangeMessage
128         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
129                 t.Fatal(err)
130         }
131         if len(m.Added) != 2 {
132                 t.FailNow()
133         }
134         if m.Added[0].Port != 0x506 {
135                 t.FailNow()
136         }
137 }
138
139 func TestReducedDialTimeout(t *testing.T) {
140         cfg := &Config{}
141         cfg.setDefaults()
142         for _, _case := range []struct {
143                 Max             time.Duration
144                 HalfOpenLimit   int
145                 PendingPeers    int
146                 ExpectedReduced time.Duration
147         }{
148                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
149                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
150                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
151                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
152                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
153                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
154         } {
155                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
156                 expected := _case.ExpectedReduced
157                 if expected < cfg.MinDialTimeout {
158                         expected = cfg.MinDialTimeout
159                 }
160                 if reduced != expected {
161                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
162                 }
163         }
164 }
165
166 func TestUTPRawConn(t *testing.T) {
167         l, err := NewUtpSocket("udp", "")
168         require.NoError(t, err)
169         defer l.Close()
170         go func() {
171                 for {
172                         _, err := l.Accept()
173                         if err != nil {
174                                 break
175                         }
176                 }
177         }()
178         // Connect a UTP peer to see if the RawConn will still work.
179         s, err := NewUtpSocket("udp", "")
180         require.NoError(t, err)
181         defer s.Close()
182         utpPeer, err := s.DialContext(context.Background(), "", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
183         require.NoError(t, err)
184         defer utpPeer.Close()
185         peer, err := net.ListenPacket("udp", ":0")
186         require.NoError(t, err)
187         defer peer.Close()
188
189         msgsReceived := 0
190         // How many messages to send. I've set this to double the channel buffer
191         // size in the raw packetConn.
192         const N = 200
193         readerStopped := make(chan struct{})
194         // The reader goroutine.
195         go func() {
196                 defer close(readerStopped)
197                 b := make([]byte, 500)
198                 for i := 0; i < N; i++ {
199                         n, _, err := l.ReadFrom(b)
200                         require.NoError(t, err)
201                         msgsReceived++
202                         var d int
203                         fmt.Sscan(string(b[:n]), &d)
204                         assert.Equal(t, i, d)
205                 }
206         }()
207         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
208         require.NoError(t, err)
209         for i := 0; i < N; i++ {
210                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
211                 require.NoError(t, err)
212                 time.Sleep(time.Millisecond)
213         }
214         select {
215         case <-readerStopped:
216         case <-time.After(time.Second):
217                 t.Fatal("reader timed out")
218         }
219         if msgsReceived != N {
220                 t.Fatalf("messages received: %d", msgsReceived)
221         }
222 }
223
224 func TestAddDropManyTorrents(t *testing.T) {
225         cl, err := NewClient(TestingConfig())
226         require.NoError(t, err)
227         defer cl.Close()
228         for i := range iter.N(1000) {
229                 var spec TorrentSpec
230                 binary.PutVarint(spec.InfoHash[:], int64(i))
231                 tt, new, err := cl.AddTorrentSpec(&spec)
232                 assert.NoError(t, err)
233                 assert.True(t, new)
234                 defer tt.Drop()
235         }
236 }
237
238 type FileCacheClientStorageFactoryParams struct {
239         Capacity    int64
240         SetCapacity bool
241         Wrapper     func(*filecache.Cache) storage.ClientImpl
242 }
243
244 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
245         return func(dataDir string) storage.ClientImpl {
246                 fc, err := filecache.NewCache(dataDir)
247                 if err != nil {
248                         panic(err)
249                 }
250                 if ps.SetCapacity {
251                         fc.SetCapacity(ps.Capacity)
252                 }
253                 return ps.Wrapper(fc)
254         }
255 }
256
257 type storageFactory func(string) storage.ClientImpl
258
259 func TestClientTransferDefault(t *testing.T) {
260         testClientTransfer(t, testClientTransferParams{
261                 ExportClientStatus: true,
262                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
263                         Wrapper: fileCachePieceResourceStorage,
264                 }),
265         })
266 }
267
268 func TestClientTransferRateLimitedUpload(t *testing.T) {
269         started := time.Now()
270         testClientTransfer(t, testClientTransferParams{
271                 // We are uploading 13 bytes (the length of the greeting torrent). The
272                 // chunks are 2 bytes in length. Then the smallest burst we can run
273                 // with is 2. Time taken is (13-burst)/rate.
274                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
275                 ExportClientStatus:      true,
276         })
277         require.True(t, time.Since(started) > time.Second)
278 }
279
280 func TestClientTransferRateLimitedDownload(t *testing.T) {
281         testClientTransfer(t, testClientTransferParams{
282                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
283         })
284 }
285
286 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
287         return storage.NewResourcePieces(fc.AsResourceProvider())
288 }
289
290 func TestClientTransferSmallCache(t *testing.T) {
291         testClientTransfer(t, testClientTransferParams{
292                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
293                         SetCapacity: true,
294                         // Going below the piece length means it can't complete a piece so
295                         // that it can be hashed.
296                         Capacity: 5,
297                         Wrapper:  fileCachePieceResourceStorage,
298                 }),
299                 SetReadahead: true,
300                 // Can't readahead too far or the cache will thrash and drop data we
301                 // thought we had.
302                 Readahead:          0,
303                 ExportClientStatus: true,
304         })
305 }
306
307 func TestClientTransferVarious(t *testing.T) {
308         // Leecher storage
309         for _, ls := range []storageFactory{
310                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
311                         Wrapper: fileCachePieceResourceStorage,
312                 }),
313                 storage.NewBoltDB,
314         } {
315                 // Seeder storage
316                 for _, ss := range []func(string) storage.ClientImpl{
317                         storage.NewFile,
318                         storage.NewMMap,
319                 } {
320                         for _, responsive := range []bool{false, true} {
321                                 testClientTransfer(t, testClientTransferParams{
322                                         Responsive:     responsive,
323                                         SeederStorage:  ss,
324                                         LeecherStorage: ls,
325                                 })
326                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
327                                         testClientTransfer(t, testClientTransferParams{
328                                                 SeederStorage:  ss,
329                                                 Responsive:     responsive,
330                                                 SetReadahead:   true,
331                                                 Readahead:      readahead,
332                                                 LeecherStorage: ls,
333                                         })
334                                 }
335                         }
336                 }
337         }
338 }
339
340 type testClientTransferParams struct {
341         Responsive                 bool
342         Readahead                  int64
343         SetReadahead               bool
344         ExportClientStatus         bool
345         LeecherStorage             func(string) storage.ClientImpl
346         SeederStorage              func(string) storage.ClientImpl
347         SeederUploadRateLimiter    *rate.Limiter
348         LeecherDownloadRateLimiter *rate.Limiter
349 }
350
351 // Creates a seeder and a leecher, and ensures the data transfers when a read
352 // is attempted on the leecher.
353 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
354         greetingTempDir, mi := testutil.GreetingTestTorrent()
355         defer os.RemoveAll(greetingTempDir)
356         // Create seeder and a Torrent.
357         cfg := TestingConfig()
358         cfg.Seed = true
359         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
360         // cfg.ListenAddr = "localhost:4000"
361         if ps.SeederStorage != nil {
362                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
363                 defer cfg.DefaultStorage.Close()
364         } else {
365                 cfg.DataDir = greetingTempDir
366         }
367         seeder, err := NewClient(cfg)
368         require.NoError(t, err)
369         if ps.ExportClientStatus {
370                 testutil.ExportStatusWriter(seeder, "s")
371         }
372         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
373         // Run a Stats right after Closing the Client. This will trigger the Stats
374         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
375         defer seederTorrent.Stats()
376         defer seeder.Close()
377         seederTorrent.VerifyData()
378         // Create leecher and a Torrent.
379         leecherDataDir, err := ioutil.TempDir("", "")
380         require.NoError(t, err)
381         defer os.RemoveAll(leecherDataDir)
382         if ps.LeecherStorage == nil {
383                 cfg.DataDir = leecherDataDir
384         } else {
385                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
386         }
387         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
388         cfg.Seed = false
389         leecher, err := NewClient(cfg)
390         require.NoError(t, err)
391         defer leecher.Close()
392         if ps.ExportClientStatus {
393                 testutil.ExportStatusWriter(leecher, "l")
394         }
395         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
396                 ret = TorrentSpecFromMetaInfo(mi)
397                 ret.ChunkSize = 2
398                 return
399         }())
400         require.NoError(t, err)
401         assert.True(t, new)
402         // Now do some things with leecher and seeder.
403         leecherTorrent.AddClientPeer(seeder)
404         // The Torrent should not be interested in obtaining peers, so the one we
405         // just added should be the only one.
406         assert.False(t, leecherTorrent.Seeding())
407         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
408         r := leecherTorrent.NewReader()
409         defer r.Close()
410         if ps.Responsive {
411                 r.SetResponsive()
412         }
413         if ps.SetReadahead {
414                 r.SetReadahead(ps.Readahead)
415         }
416         assertReadAllGreeting(t, r)
417         assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData)
418         assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten)
419         assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData)
420         assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead)
421         // Try reading through again for the cases where the torrent data size
422         // exceeds the size of the cache.
423         assertReadAllGreeting(t, r)
424 }
425
426 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
427         pos, err := r.Seek(0, io.SeekStart)
428         assert.NoError(t, err)
429         assert.EqualValues(t, 0, pos)
430         _greeting, err := ioutil.ReadAll(r)
431         assert.NoError(t, err)
432         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
433 }
434
435 // Check that after completing leeching, a leecher transitions to a seeding
436 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
437 func TestSeedAfterDownloading(t *testing.T) {
438         greetingTempDir, mi := testutil.GreetingTestTorrent()
439         defer os.RemoveAll(greetingTempDir)
440         cfg := TestingConfig()
441         cfg.Seed = true
442         cfg.DataDir = greetingTempDir
443         seeder, err := NewClient(cfg)
444         require.NoError(t, err)
445         defer seeder.Close()
446         testutil.ExportStatusWriter(seeder, "s")
447         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
448         require.NoError(t, err)
449         assert.True(t, ok)
450         seederTorrent.VerifyData()
451         cfg.DataDir, err = ioutil.TempDir("", "")
452         require.NoError(t, err)
453         defer os.RemoveAll(cfg.DataDir)
454         leecher, err := NewClient(cfg)
455         require.NoError(t, err)
456         defer leecher.Close()
457         testutil.ExportStatusWriter(leecher, "l")
458         cfg.Seed = false
459         cfg.DataDir, err = ioutil.TempDir("", "")
460         require.NoError(t, err)
461         defer os.RemoveAll(cfg.DataDir)
462         leecherLeecher, _ := NewClient(cfg)
463         require.NoError(t, err)
464         defer leecherLeecher.Close()
465         testutil.ExportStatusWriter(leecherLeecher, "ll")
466         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
467                 ret = TorrentSpecFromMetaInfo(mi)
468                 ret.ChunkSize = 2
469                 return
470         }())
471         require.NoError(t, err)
472         assert.True(t, ok)
473         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
474                 ret = TorrentSpecFromMetaInfo(mi)
475                 ret.ChunkSize = 3
476                 return
477         }())
478         require.NoError(t, err)
479         assert.True(t, ok)
480         // Simultaneously DownloadAll in Leecher, and read the contents
481         // consecutively in LeecherLeecher. This non-deterministically triggered a
482         // case where the leecher wouldn't unchoke the LeecherLeecher.
483         var wg sync.WaitGroup
484         wg.Add(1)
485         go func() {
486                 defer wg.Done()
487                 r := llg.NewReader()
488                 defer r.Close()
489                 b, err := ioutil.ReadAll(r)
490                 require.NoError(t, err)
491                 assert.EqualValues(t, testutil.GreetingFileContents, b)
492         }()
493         done := make(chan struct{})
494         defer close(done)
495         go func() {
496                 for {
497                         go leecherGreeting.AddClientPeer(seeder)
498                         go leecherGreeting.AddClientPeer(leecherLeecher)
499                         select {
500                         case <-done:
501                                 return
502                         case <-time.After(time.Second):
503                         }
504                 }
505         }()
506         wg.Add(1)
507         go func() {
508                 defer wg.Done()
509                 leecherGreeting.DownloadAll()
510                 leecher.WaitAll()
511         }()
512         wg.Wait()
513 }
514
515 func TestMergingTrackersByAddingSpecs(t *testing.T) {
516         cl, err := NewClient(TestingConfig())
517         require.NoError(t, err)
518         defer cl.Close()
519         spec := TorrentSpec{}
520         T, new, _ := cl.AddTorrentSpec(&spec)
521         if !new {
522                 t.FailNow()
523         }
524         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
525         _, new, _ = cl.AddTorrentSpec(&spec)
526         assert.False(t, new)
527         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
528         // Because trackers are disabled in TestingConfig.
529         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
530 }
531
532 // We read from a piece which is marked completed, but is missing data.
533 func TestCompletedPieceWrongSize(t *testing.T) {
534         cfg := TestingConfig()
535         cfg.DefaultStorage = badStorage{}
536         cl, err := NewClient(cfg)
537         require.NoError(t, err)
538         defer cl.Close()
539         info := metainfo.Info{
540                 PieceLength: 15,
541                 Pieces:      make([]byte, 20),
542                 Files: []metainfo.FileInfo{
543                         {Path: []string{"greeting"}, Length: 13},
544                 },
545         }
546         b, err := bencode.Marshal(info)
547         require.NoError(t, err)
548         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
549                 InfoBytes: b,
550                 InfoHash:  metainfo.HashBytes(b),
551         })
552         require.NoError(t, err)
553         defer tt.Drop()
554         assert.True(t, new)
555         r := tt.NewReader()
556         defer r.Close()
557         b, err = ioutil.ReadAll(r)
558         assert.Len(t, b, 13)
559         assert.NoError(t, err)
560 }
561
562 func BenchmarkAddLargeTorrent(b *testing.B) {
563         cfg := TestingConfig()
564         cfg.DisableTCP = true
565         cfg.DisableUTP = true
566         cfg.ListenHost = func(string) string { return "redonk" }
567         cl, err := NewClient(cfg)
568         require.NoError(b, err)
569         defer cl.Close()
570         for range iter.N(b.N) {
571                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
572                 if err != nil {
573                         b.Fatal(err)
574                 }
575                 t.Drop()
576         }
577 }
578
579 func TestResponsive(t *testing.T) {
580         seederDataDir, mi := testutil.GreetingTestTorrent()
581         defer os.RemoveAll(seederDataDir)
582         cfg := TestingConfig()
583         cfg.Seed = true
584         cfg.DataDir = seederDataDir
585         seeder, err := NewClient(cfg)
586         require.Nil(t, err)
587         defer seeder.Close()
588         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
589         seederTorrent.VerifyData()
590         leecherDataDir, err := ioutil.TempDir("", "")
591         require.Nil(t, err)
592         defer os.RemoveAll(leecherDataDir)
593         cfg = TestingConfig()
594         cfg.DataDir = leecherDataDir
595         leecher, err := NewClient(cfg)
596         require.Nil(t, err)
597         defer leecher.Close()
598         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
599                 ret = TorrentSpecFromMetaInfo(mi)
600                 ret.ChunkSize = 2
601                 return
602         }())
603         leecherTorrent.AddClientPeer(seeder)
604         reader := leecherTorrent.NewReader()
605         defer reader.Close()
606         reader.SetReadahead(0)
607         reader.SetResponsive()
608         b := make([]byte, 2)
609         _, err = reader.Seek(3, io.SeekStart)
610         require.NoError(t, err)
611         _, err = io.ReadFull(reader, b)
612         assert.Nil(t, err)
613         assert.EqualValues(t, "lo", string(b))
614         _, err = reader.Seek(11, io.SeekStart)
615         require.NoError(t, err)
616         n, err := io.ReadFull(reader, b)
617         assert.Nil(t, err)
618         assert.EqualValues(t, 2, n)
619         assert.EqualValues(t, "d\n", string(b))
620 }
621
622 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
623         seederDataDir, mi := testutil.GreetingTestTorrent()
624         defer os.RemoveAll(seederDataDir)
625         cfg := TestingConfig()
626         cfg.Seed = true
627         cfg.DataDir = seederDataDir
628         seeder, err := NewClient(cfg)
629         require.Nil(t, err)
630         defer seeder.Close()
631         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
632         seederTorrent.VerifyData()
633         leecherDataDir, err := ioutil.TempDir("", "")
634         require.Nil(t, err)
635         defer os.RemoveAll(leecherDataDir)
636         cfg = TestingConfig()
637         cfg.DataDir = leecherDataDir
638         leecher, err := NewClient(cfg)
639         require.Nil(t, err)
640         defer leecher.Close()
641         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
642                 ret = TorrentSpecFromMetaInfo(mi)
643                 ret.ChunkSize = 2
644                 return
645         }())
646         leecherTorrent.AddClientPeer(seeder)
647         reader := leecherTorrent.NewReader()
648         defer reader.Close()
649         reader.SetReadahead(0)
650         reader.SetResponsive()
651         b := make([]byte, 2)
652         _, err = reader.Seek(3, io.SeekStart)
653         require.NoError(t, err)
654         _, err = io.ReadFull(reader, b)
655         assert.Nil(t, err)
656         assert.EqualValues(t, "lo", string(b))
657         go leecherTorrent.Drop()
658         _, err = reader.Seek(11, io.SeekStart)
659         require.NoError(t, err)
660         n, err := reader.Read(b)
661         assert.EqualError(t, err, "torrent closed")
662         assert.EqualValues(t, 0, n)
663 }
664
665 func TestDHTInheritBlocklist(t *testing.T) {
666         ipl := iplist.New(nil)
667         require.NotNil(t, ipl)
668         cfg := TestingConfig()
669         cfg.IPBlocklist = ipl
670         cfg.NoDHT = false
671         cl, err := NewClient(cfg)
672         require.NoError(t, err)
673         defer cl.Close()
674         numServers := 0
675         cl.eachDhtServer(func(s *dht.Server) {
676                 assert.Equal(t, ipl, s.IPBlocklist())
677                 numServers++
678         })
679         assert.EqualValues(t, 2, numServers)
680 }
681
682 // Check that stuff is merged in subsequent AddTorrentSpec for the same
683 // infohash.
684 func TestAddTorrentSpecMerging(t *testing.T) {
685         cl, err := NewClient(TestingConfig())
686         require.NoError(t, err)
687         defer cl.Close()
688         dir, mi := testutil.GreetingTestTorrent()
689         defer os.RemoveAll(dir)
690         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
691                 InfoHash: mi.HashInfoBytes(),
692         })
693         require.NoError(t, err)
694         require.True(t, new)
695         require.Nil(t, tt.Info())
696         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
697         require.NoError(t, err)
698         require.False(t, new)
699         require.NotNil(t, tt.Info())
700 }
701
702 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
703         dir, mi := testutil.GreetingTestTorrent()
704         os.RemoveAll(dir)
705         cl, _ := NewClient(TestingConfig())
706         defer cl.Close()
707         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
708                 InfoHash: mi.HashInfoBytes(),
709         })
710         tt.Drop()
711         assert.EqualValues(t, 0, len(cl.Torrents()))
712         select {
713         case <-tt.GotInfo():
714                 t.FailNow()
715         default:
716         }
717 }
718
719 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
720         for i := range iter.N(info.NumPieces()) {
721                 p := info.Piece(i)
722                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
723         }
724 }
725
726 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
727         fileCacheDir, err := ioutil.TempDir("", "")
728         require.NoError(t, err)
729         defer os.RemoveAll(fileCacheDir)
730         fileCache, err := filecache.NewCache(fileCacheDir)
731         require.NoError(t, err)
732         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
733         defer os.RemoveAll(greetingDataTempDir)
734         filePieceStore := csf(fileCache)
735         defer filePieceStore.Close()
736         info, err := greetingMetainfo.UnmarshalInfo()
737         require.NoError(t, err)
738         ih := greetingMetainfo.HashInfoBytes()
739         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
740         require.NoError(t, err)
741         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
742         // require.Equal(t, len(testutil.GreetingFileContents), written)
743         // require.NoError(t, err)
744         for i := 0; i < info.NumPieces(); i++ {
745                 p := info.Piece(i)
746                 if alreadyCompleted {
747                         require.NoError(t, greetingData.Piece(p).MarkComplete())
748                 }
749         }
750         cfg := TestingConfig()
751         // TODO: Disable network option?
752         cfg.DisableTCP = true
753         cfg.DisableUTP = true
754         cfg.DefaultStorage = filePieceStore
755         cl, err := NewClient(cfg)
756         require.NoError(t, err)
757         defer cl.Close()
758         tt, err := cl.AddTorrent(greetingMetainfo)
759         require.NoError(t, err)
760         psrs := tt.PieceStateRuns()
761         assert.Len(t, psrs, 1)
762         assert.EqualValues(t, 3, psrs[0].Length)
763         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
764         if alreadyCompleted {
765                 r := tt.NewReader()
766                 b, err := ioutil.ReadAll(r)
767                 assert.NoError(t, err)
768                 assert.EqualValues(t, testutil.GreetingFileContents, b)
769         }
770 }
771
772 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
773         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
774 }
775
776 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
777         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
778 }
779
780 func TestAddMetainfoWithNodes(t *testing.T) {
781         cfg := TestingConfig()
782         cfg.ListenHost = func(string) string { return "" }
783         cfg.NoDHT = false
784         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
785         // For now, we want to just jam the nodes into the table, without
786         // verifying them first. Also the DHT code doesn't support mixing secure
787         // and insecure nodes if security is enabled (yet).
788         // cfg.DHTConfig.NoSecurity = true
789         cl, err := NewClient(cfg)
790         require.NoError(t, err)
791         defer cl.Close()
792         sum := func() (ret int64) {
793                 cl.eachDhtServer(func(s *dht.Server) {
794                         ret += s.Stats().OutboundQueriesAttempted
795                 })
796                 return
797         }
798         assert.EqualValues(t, 0, sum())
799         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
800         require.NoError(t, err)
801         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
802         // check if the announce-list is here instead. TODO: Add nodes.
803         assert.Len(t, tt.metainfo.AnnounceList, 5)
804         // There are 6 nodes in the torrent file.
805         assert.EqualValues(t, 6*len(cl.dhtServers), sum())
806 }
807
808 type testDownloadCancelParams struct {
809         ExportClientStatus        bool
810         SetLeecherStorageCapacity bool
811         LeecherStorageCapacity    int64
812         Cancel                    bool
813 }
814
815 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
816         greetingTempDir, mi := testutil.GreetingTestTorrent()
817         defer os.RemoveAll(greetingTempDir)
818         cfg := TestingConfig()
819         cfg.Seed = true
820         cfg.DataDir = greetingTempDir
821         seeder, err := NewClient(cfg)
822         require.NoError(t, err)
823         defer seeder.Close()
824         if ps.ExportClientStatus {
825                 testutil.ExportStatusWriter(seeder, "s")
826         }
827         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
828         seederTorrent.VerifyData()
829         leecherDataDir, err := ioutil.TempDir("", "")
830         require.NoError(t, err)
831         defer os.RemoveAll(leecherDataDir)
832         fc, err := filecache.NewCache(leecherDataDir)
833         require.NoError(t, err)
834         if ps.SetLeecherStorageCapacity {
835                 fc.SetCapacity(ps.LeecherStorageCapacity)
836         }
837         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
838         cfg.DataDir = leecherDataDir
839         leecher, _ := NewClient(cfg)
840         defer leecher.Close()
841         if ps.ExportClientStatus {
842                 testutil.ExportStatusWriter(leecher, "l")
843         }
844         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
845                 ret = TorrentSpecFromMetaInfo(mi)
846                 ret.ChunkSize = 2
847                 return
848         }())
849         require.NoError(t, err)
850         assert.True(t, new)
851         psc := leecherGreeting.SubscribePieceStateChanges()
852         defer psc.Close()
853
854         leecherGreeting.cl.mu.Lock()
855         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
856         if ps.Cancel {
857                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
858         }
859         leecherGreeting.cl.mu.Unlock()
860
861         leecherGreeting.AddClientPeer(seeder)
862         completes := make(map[int]bool, 3)
863 values:
864         for {
865                 // started := time.Now()
866                 select {
867                 case _v := <-psc.Values:
868                         // log.Print(time.Since(started))
869                         v := _v.(PieceStateChange)
870                         completes[v.Index] = v.Complete
871                 case <-time.After(100 * time.Millisecond):
872                         break values
873                 }
874         }
875         if ps.Cancel {
876                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
877         } else {
878                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
879         }
880
881 }
882
883 func TestTorrentDownloadAll(t *testing.T) {
884         testDownloadCancel(t, testDownloadCancelParams{})
885 }
886
887 func TestTorrentDownloadAllThenCancel(t *testing.T) {
888         testDownloadCancel(t, testDownloadCancelParams{
889                 Cancel: true,
890         })
891 }
892
893 // Ensure that it's an error for a peer to send an invalid have message.
894 func TestPeerInvalidHave(t *testing.T) {
895         cl, err := NewClient(TestingConfig())
896         require.NoError(t, err)
897         defer cl.Close()
898         info := metainfo.Info{
899                 PieceLength: 1,
900                 Pieces:      make([]byte, 20),
901                 Files:       []metainfo.FileInfo{{Length: 1}},
902         }
903         infoBytes, err := bencode.Marshal(info)
904         require.NoError(t, err)
905         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
906                 InfoBytes: infoBytes,
907                 InfoHash:  metainfo.HashBytes(infoBytes),
908                 Storage:   badStorage{},
909         })
910         require.NoError(t, err)
911         assert.True(t, _new)
912         defer tt.Drop()
913         cn := &connection{
914                 t: tt,
915         }
916         assert.NoError(t, cn.peerSentHave(0))
917         assert.Error(t, cn.peerSentHave(1))
918 }
919
920 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
921         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
922         defer os.RemoveAll(greetingTempDir)
923         cfg := TestingConfig()
924         cfg.DataDir = greetingTempDir
925         seeder, err := NewClient(TestingConfig())
926         require.NoError(t, err)
927         seeder.AddTorrentSpec(&TorrentSpec{
928                 InfoBytes: greetingMetainfo.InfoBytes,
929         })
930 }
931
932 // Check that when the listen port is 0, all the protocols listened on have
933 // the same port, and it isn't zero.
934 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
935         cl, err := NewClient(TestingConfig())
936         require.NoError(t, err)
937         defer cl.Close()
938         port := cl.LocalPort()
939         assert.NotEqual(t, 0, port)
940         cl.eachListener(func(s socket) bool {
941                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
942                 return true
943         })
944 }
945
946 func TestClientDynamicListenTCPOnly(t *testing.T) {
947         cfg := TestingConfig()
948         cfg.DisableUTP = true
949         cl, err := NewClient(cfg)
950         require.NoError(t, err)
951         defer cl.Close()
952         assert.NotEqual(t, 0, cl.LocalPort())
953         cl.eachListener(func(s socket) bool {
954                 assert.True(t, isTcpNetwork(s.Addr().Network()))
955                 return true
956         })
957 }
958
959 func TestClientDynamicListenUTPOnly(t *testing.T) {
960         cfg := TestingConfig()
961         cfg.DisableTCP = true
962         cl, err := NewClient(cfg)
963         require.NoError(t, err)
964         defer cl.Close()
965         assert.NotEqual(t, 0, cl.LocalPort())
966         cl.eachListener(func(s socket) bool {
967                 assert.True(t, isUtpNetwork(s.Addr().Network()))
968                 return true
969         })
970 }
971
972 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
973         cfg := TestingConfig()
974         cfg.DisableTCP = true
975         cfg.DisableUTP = true
976         cl, err := NewClient(cfg)
977         require.NoError(t, err)
978         defer cl.Close()
979         assert.Equal(t, 0, cl.LocalPort())
980 }
981
982 func totalConns(tts []*Torrent) (ret int) {
983         for _, tt := range tts {
984                 tt.cl.mu.Lock()
985                 ret += len(tt.conns)
986                 tt.cl.mu.Unlock()
987         }
988         return
989 }
990
991 func TestSetMaxEstablishedConn(t *testing.T) {
992         ss := testutil.NewStatusServer(t)
993         defer ss.Close()
994         var tts []*Torrent
995         ih := testutil.GreetingMetaInfo().HashInfoBytes()
996         for i := range iter.N(3) {
997                 cl, err := NewClient(TestingConfig())
998                 require.NoError(t, err)
999                 defer cl.Close()
1000                 tt, _ := cl.AddTorrentInfoHash(ih)
1001                 tt.SetMaxEstablishedConns(2)
1002                 ss.HandleStatusWriter(cl, fmt.Sprintf("/%d", i))
1003                 tts = append(tts, tt)
1004         }
1005         addPeers := func() {
1006                 for _, tt := range tts {
1007                         for _, _tt := range tts {
1008                                 // if tt != _tt {
1009                                 tt.AddClientPeer(_tt.cl)
1010                                 // }
1011                         }
1012                 }
1013         }
1014         waitTotalConns := func(num int) {
1015                 for totalConns(tts) != num {
1016                         addPeers()
1017                         time.Sleep(time.Millisecond)
1018                 }
1019         }
1020         addPeers()
1021         waitTotalConns(6)
1022         tts[0].SetMaxEstablishedConns(1)
1023         waitTotalConns(4)
1024         tts[0].SetMaxEstablishedConns(0)
1025         waitTotalConns(2)
1026         tts[0].SetMaxEstablishedConns(1)
1027         addPeers()
1028         waitTotalConns(4)
1029         tts[0].SetMaxEstablishedConns(2)
1030         addPeers()
1031         waitTotalConns(6)
1032 }
1033
1034 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1035         os.MkdirAll(dir, 0770)
1036         file, err := os.Create(filepath.Join(dir, name))
1037         require.NoError(t, err)
1038         file.Write([]byte(name))
1039         file.Close()
1040         mi := metainfo.MetaInfo{}
1041         mi.SetDefaults()
1042         info := metainfo.Info{PieceLength: 256 * 1024}
1043         err = info.BuildFromFilePath(filepath.Join(dir, name))
1044         require.NoError(t, err)
1045         mi.InfoBytes, err = bencode.Marshal(info)
1046         require.NoError(t, err)
1047         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1048         tr, err := cl.AddTorrent(&mi)
1049         require.NoError(t, err)
1050         require.True(t, tr.Seeding())
1051         tr.VerifyData()
1052         return magnet
1053 }
1054
1055 // https://github.com/anacrolix/torrent/issues/114
1056 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1057         cfg := TestingConfig()
1058         cfg.DisableUTP = true
1059         cfg.Seed = true
1060         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1061         cfg.ForceEncryption = true
1062         os.Mkdir(cfg.DataDir, 0755)
1063         server, err := NewClient(cfg)
1064         require.NoError(t, err)
1065         defer server.Close()
1066         testutil.ExportStatusWriter(server, "s")
1067         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1068         makeMagnet(t, server, cfg.DataDir, "test2")
1069         cfg = TestingConfig()
1070         cfg.DisableUTP = true
1071         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1072         cfg.ForceEncryption = true
1073         client, err := NewClient(cfg)
1074         require.NoError(t, err)
1075         defer client.Close()
1076         testutil.ExportStatusWriter(client, "c")
1077         tr, err := client.AddMagnet(magnet1)
1078         require.NoError(t, err)
1079         tr.AddClientPeer(server)
1080         <-tr.GotInfo()
1081         tr.DownloadAll()
1082         client.WaitAll()
1083 }
1084
1085 func TestClientAddressInUse(t *testing.T) {
1086         s, _ := NewUtpSocket("udp", ":50007")
1087         if s != nil {
1088                 defer s.Close()
1089         }
1090         cfg := TestingConfig().SetListenAddr(":50007")
1091         cl, err := NewClient(cfg)
1092         require.Error(t, err)
1093         require.Nil(t, cl)
1094 }