]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Rename Config->ClientConfig and change how defaults work
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "context"
5         "encoding/binary"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "net"
10         "os"
11         "path/filepath"
12         "reflect"
13         "sync"
14         "testing"
15         "time"
16
17         "github.com/anacrolix/dht"
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/bradfitz/iter"
22         "github.com/stretchr/testify/assert"
23         "github.com/stretchr/testify/require"
24         "golang.org/x/time/rate"
25
26         "github.com/anacrolix/torrent/bencode"
27         "github.com/anacrolix/torrent/internal/testutil"
28         "github.com/anacrolix/torrent/iplist"
29         "github.com/anacrolix/torrent/metainfo"
30         "github.com/anacrolix/torrent/storage"
31 )
32
33 func TestingConfig() *ClientConfig {
34         cfg := NewDefaultClientConfig()
35         cfg.ListenHost = LoopbackListenHost
36         cfg.NoDHT = true
37         cfg.DataDir = tempDir()
38         cfg.DisableTrackers = true
39         cfg.NoDefaultPortForwarding = true
40         return cfg
41 }
42
43 func TestClientDefault(t *testing.T) {
44         cl, err := NewClient(TestingConfig())
45         require.NoError(t, err)
46         cl.Close()
47 }
48
49 func TestClientNilConfig(t *testing.T) {
50         cl, err := NewClient(nil)
51         require.NoError(t, err)
52         cl.Close()
53 }
54
55 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
56         cfg := TestingConfig()
57         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
58         require.NoError(t, err)
59         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
60         defer ci.Close()
61         cfg.DefaultStorage = ci
62         cl, err := NewClient(cfg)
63         require.NoError(t, err)
64         cl.Close()
65         // And again, https://github.com/anacrolix/torrent/issues/158
66         cl, err = NewClient(cfg)
67         require.NoError(t, err)
68         cl.Close()
69 }
70
71 func TestAddDropTorrent(t *testing.T) {
72         cl, err := NewClient(TestingConfig())
73         require.NoError(t, err)
74         defer cl.Close()
75         dir, mi := testutil.GreetingTestTorrent()
76         defer os.RemoveAll(dir)
77         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
78         require.NoError(t, err)
79         assert.True(t, new)
80         tt.SetMaxEstablishedConns(0)
81         tt.SetMaxEstablishedConns(1)
82         tt.Drop()
83 }
84
85 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
86         // TODO?
87         t.SkipNow()
88 }
89
90 func TestAddTorrentNoUsableURLs(t *testing.T) {
91         // TODO?
92         t.SkipNow()
93 }
94
95 func TestAddPeersToUnknownTorrent(t *testing.T) {
96         // TODO?
97         t.SkipNow()
98 }
99
100 func TestPieceHashSize(t *testing.T) {
101         assert.Equal(t, 20, pieceHash.Size())
102 }
103
104 func TestTorrentInitialState(t *testing.T) {
105         dir, mi := testutil.GreetingTestTorrent()
106         defer os.RemoveAll(dir)
107         cl := &Client{
108                 config: &ClientConfig{},
109         }
110         cl.initLogger()
111         tor := cl.newTorrent(
112                 mi.HashInfoBytes(),
113                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
114         )
115         tor.setChunkSize(2)
116         tor.cl.mu.Lock()
117         err := tor.setInfoBytes(mi.InfoBytes)
118         tor.cl.mu.Unlock()
119         require.NoError(t, err)
120         require.Len(t, tor.pieces, 3)
121         tor.pendAllChunkSpecs(0)
122         tor.cl.mu.Lock()
123         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
124         tor.cl.mu.Unlock()
125         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
126 }
127
128 func TestUnmarshalPEXMsg(t *testing.T) {
129         var m peerExchangeMessage
130         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
131                 t.Fatal(err)
132         }
133         if len(m.Added) != 2 {
134                 t.FailNow()
135         }
136         if m.Added[0].Port != 0x506 {
137                 t.FailNow()
138         }
139 }
140
141 func TestReducedDialTimeout(t *testing.T) {
142         cfg := NewDefaultClientConfig()
143         for _, _case := range []struct {
144                 Max             time.Duration
145                 HalfOpenLimit   int
146                 PendingPeers    int
147                 ExpectedReduced time.Duration
148         }{
149                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
150                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
151                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
152                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
153                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
154                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
155         } {
156                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
157                 expected := _case.ExpectedReduced
158                 if expected < cfg.MinDialTimeout {
159                         expected = cfg.MinDialTimeout
160                 }
161                 if reduced != expected {
162                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
163                 }
164         }
165 }
166
167 func TestUTPRawConn(t *testing.T) {
168         l, err := NewUtpSocket("udp", "")
169         require.NoError(t, err)
170         defer l.Close()
171         go func() {
172                 for {
173                         _, err := l.Accept()
174                         if err != nil {
175                                 break
176                         }
177                 }
178         }()
179         // Connect a UTP peer to see if the RawConn will still work.
180         s, err := NewUtpSocket("udp", "")
181         require.NoError(t, err)
182         defer s.Close()
183         utpPeer, err := s.DialContext(context.Background(), "", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
184         require.NoError(t, err)
185         defer utpPeer.Close()
186         peer, err := net.ListenPacket("udp", ":0")
187         require.NoError(t, err)
188         defer peer.Close()
189
190         msgsReceived := 0
191         // How many messages to send. I've set this to double the channel buffer
192         // size in the raw packetConn.
193         const N = 200
194         readerStopped := make(chan struct{})
195         // The reader goroutine.
196         go func() {
197                 defer close(readerStopped)
198                 b := make([]byte, 500)
199                 for i := 0; i < N; i++ {
200                         n, _, err := l.ReadFrom(b)
201                         require.NoError(t, err)
202                         msgsReceived++
203                         var d int
204                         fmt.Sscan(string(b[:n]), &d)
205                         assert.Equal(t, i, d)
206                 }
207         }()
208         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
209         require.NoError(t, err)
210         for i := 0; i < N; i++ {
211                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
212                 require.NoError(t, err)
213                 time.Sleep(time.Millisecond)
214         }
215         select {
216         case <-readerStopped:
217         case <-time.After(time.Second):
218                 t.Fatal("reader timed out")
219         }
220         if msgsReceived != N {
221                 t.Fatalf("messages received: %d", msgsReceived)
222         }
223 }
224
225 func TestAddDropManyTorrents(t *testing.T) {
226         cl, err := NewClient(TestingConfig())
227         require.NoError(t, err)
228         defer cl.Close()
229         for i := range iter.N(1000) {
230                 var spec TorrentSpec
231                 binary.PutVarint(spec.InfoHash[:], int64(i))
232                 tt, new, err := cl.AddTorrentSpec(&spec)
233                 assert.NoError(t, err)
234                 assert.True(t, new)
235                 defer tt.Drop()
236         }
237 }
238
239 type FileCacheClientStorageFactoryParams struct {
240         Capacity    int64
241         SetCapacity bool
242         Wrapper     func(*filecache.Cache) storage.ClientImpl
243 }
244
245 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
246         return func(dataDir string) storage.ClientImpl {
247                 fc, err := filecache.NewCache(dataDir)
248                 if err != nil {
249                         panic(err)
250                 }
251                 if ps.SetCapacity {
252                         fc.SetCapacity(ps.Capacity)
253                 }
254                 return ps.Wrapper(fc)
255         }
256 }
257
258 type storageFactory func(string) storage.ClientImpl
259
260 func TestClientTransferDefault(t *testing.T) {
261         testClientTransfer(t, testClientTransferParams{
262                 ExportClientStatus: true,
263                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
264                         Wrapper: fileCachePieceResourceStorage,
265                 }),
266         })
267 }
268
269 func TestClientTransferRateLimitedUpload(t *testing.T) {
270         started := time.Now()
271         testClientTransfer(t, testClientTransferParams{
272                 // We are uploading 13 bytes (the length of the greeting torrent). The
273                 // chunks are 2 bytes in length. Then the smallest burst we can run
274                 // with is 2. Time taken is (13-burst)/rate.
275                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
276                 ExportClientStatus:      true,
277         })
278         require.True(t, time.Since(started) > time.Second)
279 }
280
281 func TestClientTransferRateLimitedDownload(t *testing.T) {
282         testClientTransfer(t, testClientTransferParams{
283                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
284         })
285 }
286
287 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
288         return storage.NewResourcePieces(fc.AsResourceProvider())
289 }
290
291 func TestClientTransferSmallCache(t *testing.T) {
292         testClientTransfer(t, testClientTransferParams{
293                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
294                         SetCapacity: true,
295                         // Going below the piece length means it can't complete a piece so
296                         // that it can be hashed.
297                         Capacity: 5,
298                         Wrapper:  fileCachePieceResourceStorage,
299                 }),
300                 SetReadahead: true,
301                 // Can't readahead too far or the cache will thrash and drop data we
302                 // thought we had.
303                 Readahead:          0,
304                 ExportClientStatus: true,
305         })
306 }
307
308 func TestClientTransferVarious(t *testing.T) {
309         // Leecher storage
310         for _, ls := range []storageFactory{
311                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
312                         Wrapper: fileCachePieceResourceStorage,
313                 }),
314                 storage.NewBoltDB,
315         } {
316                 // Seeder storage
317                 for _, ss := range []func(string) storage.ClientImpl{
318                         storage.NewFile,
319                         storage.NewMMap,
320                 } {
321                         for _, responsive := range []bool{false, true} {
322                                 testClientTransfer(t, testClientTransferParams{
323                                         Responsive:     responsive,
324                                         SeederStorage:  ss,
325                                         LeecherStorage: ls,
326                                 })
327                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
328                                         testClientTransfer(t, testClientTransferParams{
329                                                 SeederStorage:  ss,
330                                                 Responsive:     responsive,
331                                                 SetReadahead:   true,
332                                                 Readahead:      readahead,
333                                                 LeecherStorage: ls,
334                                         })
335                                 }
336                         }
337                 }
338         }
339 }
340
341 type testClientTransferParams struct {
342         Responsive                 bool
343         Readahead                  int64
344         SetReadahead               bool
345         ExportClientStatus         bool
346         LeecherStorage             func(string) storage.ClientImpl
347         SeederStorage              func(string) storage.ClientImpl
348         SeederUploadRateLimiter    *rate.Limiter
349         LeecherDownloadRateLimiter *rate.Limiter
350 }
351
352 // Creates a seeder and a leecher, and ensures the data transfers when a read
353 // is attempted on the leecher.
354 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
355         greetingTempDir, mi := testutil.GreetingTestTorrent()
356         defer os.RemoveAll(greetingTempDir)
357         // Create seeder and a Torrent.
358         cfg := TestingConfig()
359         cfg.Seed = true
360         if ps.SeederUploadRateLimiter != nil {
361                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
362         }
363         // cfg.ListenAddr = "localhost:4000"
364         if ps.SeederStorage != nil {
365                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
366                 defer cfg.DefaultStorage.Close()
367         } else {
368                 cfg.DataDir = greetingTempDir
369         }
370         seeder, err := NewClient(cfg)
371         require.NoError(t, err)
372         if ps.ExportClientStatus {
373                 testutil.ExportStatusWriter(seeder, "s")
374         }
375         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
376         // Run a Stats right after Closing the Client. This will trigger the Stats
377         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
378         defer seederTorrent.Stats()
379         defer seeder.Close()
380         seederTorrent.VerifyData()
381         // Create leecher and a Torrent.
382         leecherDataDir, err := ioutil.TempDir("", "")
383         require.NoError(t, err)
384         defer os.RemoveAll(leecherDataDir)
385         cfg = TestingConfig()
386         if ps.LeecherStorage == nil {
387                 cfg.DataDir = leecherDataDir
388         } else {
389                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
390         }
391         if ps.LeecherDownloadRateLimiter != nil {
392                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
393         }
394         cfg.Seed = false
395         leecher, err := NewClient(cfg)
396         require.NoError(t, err)
397         defer leecher.Close()
398         if ps.ExportClientStatus {
399                 testutil.ExportStatusWriter(leecher, "l")
400         }
401         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
402                 ret = TorrentSpecFromMetaInfo(mi)
403                 ret.ChunkSize = 2
404                 return
405         }())
406         require.NoError(t, err)
407         assert.True(t, new)
408         // Now do some things with leecher and seeder.
409         leecherTorrent.AddClientPeer(seeder)
410         // The Torrent should not be interested in obtaining peers, so the one we
411         // just added should be the only one.
412         assert.False(t, leecherTorrent.Seeding())
413         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
414         r := leecherTorrent.NewReader()
415         defer r.Close()
416         if ps.Responsive {
417                 r.SetResponsive()
418         }
419         if ps.SetReadahead {
420                 r.SetReadahead(ps.Readahead)
421         }
422         assertReadAllGreeting(t, r)
423
424         seederStats := seederTorrent.Stats()
425         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
426         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
427
428         leecherStats := leecherTorrent.Stats()
429         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
430         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
431
432         // Try reading through again for the cases where the torrent data size
433         // exceeds the size of the cache.
434         assertReadAllGreeting(t, r)
435 }
436
437 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
438         pos, err := r.Seek(0, io.SeekStart)
439         assert.NoError(t, err)
440         assert.EqualValues(t, 0, pos)
441         _greeting, err := ioutil.ReadAll(r)
442         assert.NoError(t, err)
443         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
444 }
445
446 // Check that after completing leeching, a leecher transitions to a seeding
447 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
448 func TestSeedAfterDownloading(t *testing.T) {
449         greetingTempDir, mi := testutil.GreetingTestTorrent()
450         defer os.RemoveAll(greetingTempDir)
451
452         cfg := TestingConfig()
453         cfg.Seed = true
454         cfg.DataDir = greetingTempDir
455         seeder, err := NewClient(cfg)
456         require.NoError(t, err)
457         defer seeder.Close()
458         testutil.ExportStatusWriter(seeder, "s")
459         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
460         require.NoError(t, err)
461         assert.True(t, ok)
462         seederTorrent.VerifyData()
463
464         cfg = TestingConfig()
465         cfg.Seed = true
466         cfg.DataDir, err = ioutil.TempDir("", "")
467         require.NoError(t, err)
468         defer os.RemoveAll(cfg.DataDir)
469         leecher, err := NewClient(cfg)
470         require.NoError(t, err)
471         defer leecher.Close()
472         testutil.ExportStatusWriter(leecher, "l")
473
474         cfg = TestingConfig()
475         cfg.Seed = false
476         cfg.DataDir, err = ioutil.TempDir("", "")
477         require.NoError(t, err)
478         defer os.RemoveAll(cfg.DataDir)
479         leecherLeecher, _ := NewClient(cfg)
480         require.NoError(t, err)
481         defer leecherLeecher.Close()
482         testutil.ExportStatusWriter(leecherLeecher, "ll")
483         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
484                 ret = TorrentSpecFromMetaInfo(mi)
485                 ret.ChunkSize = 2
486                 return
487         }())
488         require.NoError(t, err)
489         assert.True(t, ok)
490         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
491                 ret = TorrentSpecFromMetaInfo(mi)
492                 ret.ChunkSize = 3
493                 return
494         }())
495         require.NoError(t, err)
496         assert.True(t, ok)
497         // Simultaneously DownloadAll in Leecher, and read the contents
498         // consecutively in LeecherLeecher. This non-deterministically triggered a
499         // case where the leecher wouldn't unchoke the LeecherLeecher.
500         var wg sync.WaitGroup
501         wg.Add(1)
502         go func() {
503                 defer wg.Done()
504                 r := llg.NewReader()
505                 defer r.Close()
506                 b, err := ioutil.ReadAll(r)
507                 require.NoError(t, err)
508                 assert.EqualValues(t, testutil.GreetingFileContents, b)
509         }()
510         done := make(chan struct{})
511         defer close(done)
512         go leecherGreeting.AddClientPeer(seeder)
513         go leecherGreeting.AddClientPeer(leecherLeecher)
514         wg.Add(1)
515         go func() {
516                 defer wg.Done()
517                 leecherGreeting.DownloadAll()
518                 leecher.WaitAll()
519         }()
520         wg.Wait()
521 }
522
523 func TestMergingTrackersByAddingSpecs(t *testing.T) {
524         cl, err := NewClient(TestingConfig())
525         require.NoError(t, err)
526         defer cl.Close()
527         spec := TorrentSpec{}
528         T, new, _ := cl.AddTorrentSpec(&spec)
529         if !new {
530                 t.FailNow()
531         }
532         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
533         _, new, _ = cl.AddTorrentSpec(&spec)
534         assert.False(t, new)
535         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
536         // Because trackers are disabled in TestingConfig.
537         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
538 }
539
540 // We read from a piece which is marked completed, but is missing data.
541 func TestCompletedPieceWrongSize(t *testing.T) {
542         cfg := TestingConfig()
543         cfg.DefaultStorage = badStorage{}
544         cl, err := NewClient(cfg)
545         require.NoError(t, err)
546         defer cl.Close()
547         info := metainfo.Info{
548                 PieceLength: 15,
549                 Pieces:      make([]byte, 20),
550                 Files: []metainfo.FileInfo{
551                         {Path: []string{"greeting"}, Length: 13},
552                 },
553         }
554         b, err := bencode.Marshal(info)
555         require.NoError(t, err)
556         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
557                 InfoBytes: b,
558                 InfoHash:  metainfo.HashBytes(b),
559         })
560         require.NoError(t, err)
561         defer tt.Drop()
562         assert.True(t, new)
563         r := tt.NewReader()
564         defer r.Close()
565         b, err = ioutil.ReadAll(r)
566         assert.Len(t, b, 13)
567         assert.NoError(t, err)
568 }
569
570 func BenchmarkAddLargeTorrent(b *testing.B) {
571         cfg := TestingConfig()
572         cfg.DisableTCP = true
573         cfg.DisableUTP = true
574         cfg.ListenHost = func(string) string { return "redonk" }
575         cl, err := NewClient(cfg)
576         require.NoError(b, err)
577         defer cl.Close()
578         for range iter.N(b.N) {
579                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
580                 if err != nil {
581                         b.Fatal(err)
582                 }
583                 t.Drop()
584         }
585 }
586
587 func TestResponsive(t *testing.T) {
588         seederDataDir, mi := testutil.GreetingTestTorrent()
589         defer os.RemoveAll(seederDataDir)
590         cfg := TestingConfig()
591         cfg.Seed = true
592         cfg.DataDir = seederDataDir
593         seeder, err := NewClient(cfg)
594         require.Nil(t, err)
595         defer seeder.Close()
596         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
597         seederTorrent.VerifyData()
598         leecherDataDir, err := ioutil.TempDir("", "")
599         require.Nil(t, err)
600         defer os.RemoveAll(leecherDataDir)
601         cfg = TestingConfig()
602         cfg.DataDir = leecherDataDir
603         leecher, err := NewClient(cfg)
604         require.Nil(t, err)
605         defer leecher.Close()
606         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
607                 ret = TorrentSpecFromMetaInfo(mi)
608                 ret.ChunkSize = 2
609                 return
610         }())
611         leecherTorrent.AddClientPeer(seeder)
612         reader := leecherTorrent.NewReader()
613         defer reader.Close()
614         reader.SetReadahead(0)
615         reader.SetResponsive()
616         b := make([]byte, 2)
617         _, err = reader.Seek(3, io.SeekStart)
618         require.NoError(t, err)
619         _, err = io.ReadFull(reader, b)
620         assert.Nil(t, err)
621         assert.EqualValues(t, "lo", string(b))
622         _, err = reader.Seek(11, io.SeekStart)
623         require.NoError(t, err)
624         n, err := io.ReadFull(reader, b)
625         assert.Nil(t, err)
626         assert.EqualValues(t, 2, n)
627         assert.EqualValues(t, "d\n", string(b))
628 }
629
630 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
631         seederDataDir, mi := testutil.GreetingTestTorrent()
632         defer os.RemoveAll(seederDataDir)
633         cfg := TestingConfig()
634         cfg.Seed = true
635         cfg.DataDir = seederDataDir
636         seeder, err := NewClient(cfg)
637         require.Nil(t, err)
638         defer seeder.Close()
639         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
640         seederTorrent.VerifyData()
641         leecherDataDir, err := ioutil.TempDir("", "")
642         require.Nil(t, err)
643         defer os.RemoveAll(leecherDataDir)
644         cfg = TestingConfig()
645         cfg.DataDir = leecherDataDir
646         leecher, err := NewClient(cfg)
647         require.Nil(t, err)
648         defer leecher.Close()
649         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
650                 ret = TorrentSpecFromMetaInfo(mi)
651                 ret.ChunkSize = 2
652                 return
653         }())
654         leecherTorrent.AddClientPeer(seeder)
655         reader := leecherTorrent.NewReader()
656         defer reader.Close()
657         reader.SetReadahead(0)
658         reader.SetResponsive()
659         b := make([]byte, 2)
660         _, err = reader.Seek(3, io.SeekStart)
661         require.NoError(t, err)
662         _, err = io.ReadFull(reader, b)
663         assert.Nil(t, err)
664         assert.EqualValues(t, "lo", string(b))
665         go leecherTorrent.Drop()
666         _, err = reader.Seek(11, io.SeekStart)
667         require.NoError(t, err)
668         n, err := reader.Read(b)
669         assert.EqualError(t, err, "torrent closed")
670         assert.EqualValues(t, 0, n)
671 }
672
673 func TestDHTInheritBlocklist(t *testing.T) {
674         ipl := iplist.New(nil)
675         require.NotNil(t, ipl)
676         cfg := TestingConfig()
677         cfg.IPBlocklist = ipl
678         cfg.NoDHT = false
679         cl, err := NewClient(cfg)
680         require.NoError(t, err)
681         defer cl.Close()
682         numServers := 0
683         cl.eachDhtServer(func(s *dht.Server) {
684                 assert.Equal(t, ipl, s.IPBlocklist())
685                 numServers++
686         })
687         assert.EqualValues(t, 2, numServers)
688 }
689
690 // Check that stuff is merged in subsequent AddTorrentSpec for the same
691 // infohash.
692 func TestAddTorrentSpecMerging(t *testing.T) {
693         cl, err := NewClient(TestingConfig())
694         require.NoError(t, err)
695         defer cl.Close()
696         dir, mi := testutil.GreetingTestTorrent()
697         defer os.RemoveAll(dir)
698         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
699                 InfoHash: mi.HashInfoBytes(),
700         })
701         require.NoError(t, err)
702         require.True(t, new)
703         require.Nil(t, tt.Info())
704         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
705         require.NoError(t, err)
706         require.False(t, new)
707         require.NotNil(t, tt.Info())
708 }
709
710 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
711         dir, mi := testutil.GreetingTestTorrent()
712         os.RemoveAll(dir)
713         cl, _ := NewClient(TestingConfig())
714         defer cl.Close()
715         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
716                 InfoHash: mi.HashInfoBytes(),
717         })
718         tt.Drop()
719         assert.EqualValues(t, 0, len(cl.Torrents()))
720         select {
721         case <-tt.GotInfo():
722                 t.FailNow()
723         default:
724         }
725 }
726
727 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
728         for i := range iter.N(info.NumPieces()) {
729                 p := info.Piece(i)
730                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
731         }
732 }
733
734 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
735         fileCacheDir, err := ioutil.TempDir("", "")
736         require.NoError(t, err)
737         defer os.RemoveAll(fileCacheDir)
738         fileCache, err := filecache.NewCache(fileCacheDir)
739         require.NoError(t, err)
740         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
741         defer os.RemoveAll(greetingDataTempDir)
742         filePieceStore := csf(fileCache)
743         defer filePieceStore.Close()
744         info, err := greetingMetainfo.UnmarshalInfo()
745         require.NoError(t, err)
746         ih := greetingMetainfo.HashInfoBytes()
747         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
748         require.NoError(t, err)
749         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
750         // require.Equal(t, len(testutil.GreetingFileContents), written)
751         // require.NoError(t, err)
752         for i := 0; i < info.NumPieces(); i++ {
753                 p := info.Piece(i)
754                 if alreadyCompleted {
755                         require.NoError(t, greetingData.Piece(p).MarkComplete())
756                 }
757         }
758         cfg := TestingConfig()
759         // TODO: Disable network option?
760         cfg.DisableTCP = true
761         cfg.DisableUTP = true
762         cfg.DefaultStorage = filePieceStore
763         cl, err := NewClient(cfg)
764         require.NoError(t, err)
765         defer cl.Close()
766         tt, err := cl.AddTorrent(greetingMetainfo)
767         require.NoError(t, err)
768         psrs := tt.PieceStateRuns()
769         assert.Len(t, psrs, 1)
770         assert.EqualValues(t, 3, psrs[0].Length)
771         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
772         if alreadyCompleted {
773                 r := tt.NewReader()
774                 b, err := ioutil.ReadAll(r)
775                 assert.NoError(t, err)
776                 assert.EqualValues(t, testutil.GreetingFileContents, b)
777         }
778 }
779
780 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
781         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
782 }
783
784 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
785         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
786 }
787
788 func TestAddMetainfoWithNodes(t *testing.T) {
789         cfg := TestingConfig()
790         cfg.ListenHost = func(string) string { return "" }
791         cfg.NoDHT = false
792         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
793         // For now, we want to just jam the nodes into the table, without
794         // verifying them first. Also the DHT code doesn't support mixing secure
795         // and insecure nodes if security is enabled (yet).
796         // cfg.DHTConfig.NoSecurity = true
797         cl, err := NewClient(cfg)
798         require.NoError(t, err)
799         defer cl.Close()
800         sum := func() (ret int64) {
801                 cl.eachDhtServer(func(s *dht.Server) {
802                         ret += s.Stats().OutboundQueriesAttempted
803                 })
804                 return
805         }
806         assert.EqualValues(t, 0, sum())
807         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
808         require.NoError(t, err)
809         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
810         // check if the announce-list is here instead. TODO: Add nodes.
811         assert.Len(t, tt.metainfo.AnnounceList, 5)
812         // There are 6 nodes in the torrent file.
813         assert.EqualValues(t, 6*len(cl.dhtServers), sum())
814 }
815
816 type testDownloadCancelParams struct {
817         SetLeecherStorageCapacity bool
818         LeecherStorageCapacity    int64
819         Cancel                    bool
820 }
821
822 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
823         greetingTempDir, mi := testutil.GreetingTestTorrent()
824         defer os.RemoveAll(greetingTempDir)
825         cfg := TestingConfig()
826         cfg.Seed = true
827         cfg.DataDir = greetingTempDir
828         seeder, err := NewClient(cfg)
829         require.NoError(t, err)
830         defer seeder.Close()
831         testutil.ExportStatusWriter(seeder, "s")
832         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
833         seederTorrent.VerifyData()
834         leecherDataDir, err := ioutil.TempDir("", "")
835         require.NoError(t, err)
836         defer os.RemoveAll(leecherDataDir)
837         fc, err := filecache.NewCache(leecherDataDir)
838         require.NoError(t, err)
839         if ps.SetLeecherStorageCapacity {
840                 fc.SetCapacity(ps.LeecherStorageCapacity)
841         }
842         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
843         cfg.DataDir = leecherDataDir
844         leecher, _ := NewClient(cfg)
845         defer leecher.Close()
846         testutil.ExportStatusWriter(leecher, "l")
847         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
848                 ret = TorrentSpecFromMetaInfo(mi)
849                 ret.ChunkSize = 2
850                 return
851         }())
852         require.NoError(t, err)
853         assert.True(t, new)
854         psc := leecherGreeting.SubscribePieceStateChanges()
855         defer psc.Close()
856
857         leecherGreeting.cl.mu.Lock()
858         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
859         if ps.Cancel {
860                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
861         }
862         leecherGreeting.cl.mu.Unlock()
863         done := make(chan struct{})
864         defer close(done)
865         go leecherGreeting.AddClientPeer(seeder)
866         completes := make(map[int]bool, 3)
867         expected := func() map[int]bool {
868                 if ps.Cancel {
869                         return map[int]bool{0: false, 1: false, 2: false}
870                 } else {
871                         return map[int]bool{0: true, 1: true, 2: true}
872                 }
873         }()
874         for !reflect.DeepEqual(completes, expected) {
875                 select {
876                 case _v := <-psc.Values:
877                         v := _v.(PieceStateChange)
878                         completes[v.Index] = v.Complete
879                 }
880         }
881 }
882
883 func TestTorrentDownloadAll(t *testing.T) {
884         testDownloadCancel(t, testDownloadCancelParams{})
885 }
886
887 func TestTorrentDownloadAllThenCancel(t *testing.T) {
888         testDownloadCancel(t, testDownloadCancelParams{
889                 Cancel: true,
890         })
891 }
892
893 // Ensure that it's an error for a peer to send an invalid have message.
894 func TestPeerInvalidHave(t *testing.T) {
895         cl, err := NewClient(TestingConfig())
896         require.NoError(t, err)
897         defer cl.Close()
898         info := metainfo.Info{
899                 PieceLength: 1,
900                 Pieces:      make([]byte, 20),
901                 Files:       []metainfo.FileInfo{{Length: 1}},
902         }
903         infoBytes, err := bencode.Marshal(info)
904         require.NoError(t, err)
905         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
906                 InfoBytes: infoBytes,
907                 InfoHash:  metainfo.HashBytes(infoBytes),
908                 Storage:   badStorage{},
909         })
910         require.NoError(t, err)
911         assert.True(t, _new)
912         defer tt.Drop()
913         cn := &connection{
914                 t: tt,
915         }
916         assert.NoError(t, cn.peerSentHave(0))
917         assert.Error(t, cn.peerSentHave(1))
918 }
919
920 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
921         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
922         defer os.RemoveAll(greetingTempDir)
923         cfg := TestingConfig()
924         cfg.DataDir = greetingTempDir
925         seeder, err := NewClient(TestingConfig())
926         require.NoError(t, err)
927         seeder.AddTorrentSpec(&TorrentSpec{
928                 InfoBytes: greetingMetainfo.InfoBytes,
929         })
930 }
931
932 // Check that when the listen port is 0, all the protocols listened on have
933 // the same port, and it isn't zero.
934 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
935         cl, err := NewClient(TestingConfig())
936         require.NoError(t, err)
937         defer cl.Close()
938         port := cl.LocalPort()
939         assert.NotEqual(t, 0, port)
940         cl.eachListener(func(s socket) bool {
941                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
942                 return true
943         })
944 }
945
946 func TestClientDynamicListenTCPOnly(t *testing.T) {
947         cfg := TestingConfig()
948         cfg.DisableUTP = true
949         cl, err := NewClient(cfg)
950         require.NoError(t, err)
951         defer cl.Close()
952         assert.NotEqual(t, 0, cl.LocalPort())
953         cl.eachListener(func(s socket) bool {
954                 assert.True(t, isTcpNetwork(s.Addr().Network()))
955                 return true
956         })
957 }
958
959 func TestClientDynamicListenUTPOnly(t *testing.T) {
960         cfg := TestingConfig()
961         cfg.DisableTCP = true
962         cl, err := NewClient(cfg)
963         require.NoError(t, err)
964         defer cl.Close()
965         assert.NotEqual(t, 0, cl.LocalPort())
966         cl.eachListener(func(s socket) bool {
967                 assert.True(t, isUtpNetwork(s.Addr().Network()))
968                 return true
969         })
970 }
971
972 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
973         cfg := TestingConfig()
974         cfg.DisableTCP = true
975         cfg.DisableUTP = true
976         cl, err := NewClient(cfg)
977         require.NoError(t, err)
978         defer cl.Close()
979         assert.Equal(t, 0, cl.LocalPort())
980 }
981
982 func totalConns(tts []*Torrent) (ret int) {
983         for _, tt := range tts {
984                 tt.cl.mu.Lock()
985                 ret += len(tt.conns)
986                 tt.cl.mu.Unlock()
987         }
988         return
989 }
990
991 func TestSetMaxEstablishedConn(t *testing.T) {
992         ss := testutil.NewStatusServer(t)
993         defer ss.Close()
994         var tts []*Torrent
995         ih := testutil.GreetingMetaInfo().HashInfoBytes()
996         cfg := TestingConfig()
997         cfg.DisableAcceptRateLimiting = true
998         cfg.dropDuplicatePeerIds = true
999         for i := range iter.N(3) {
1000                 cl, err := NewClient(cfg)
1001                 require.NoError(t, err)
1002                 defer cl.Close()
1003                 tt, _ := cl.AddTorrentInfoHash(ih)
1004                 tt.SetMaxEstablishedConns(2)
1005                 ss.HandleStatusWriter(cl, fmt.Sprintf("/%d", i))
1006                 tts = append(tts, tt)
1007         }
1008         addPeers := func() {
1009                 for _, tt := range tts {
1010                         for _, _tt := range tts {
1011                                 // if tt != _tt {
1012                                 tt.AddClientPeer(_tt.cl)
1013                                 // }
1014                         }
1015                 }
1016         }
1017         waitTotalConns := func(num int) {
1018                 for totalConns(tts) != num {
1019                         addPeers()
1020                         time.Sleep(time.Millisecond)
1021                 }
1022         }
1023         addPeers()
1024         waitTotalConns(6)
1025         tts[0].SetMaxEstablishedConns(1)
1026         waitTotalConns(4)
1027         tts[0].SetMaxEstablishedConns(0)
1028         waitTotalConns(2)
1029         tts[0].SetMaxEstablishedConns(1)
1030         addPeers()
1031         waitTotalConns(4)
1032         tts[0].SetMaxEstablishedConns(2)
1033         addPeers()
1034         waitTotalConns(6)
1035 }
1036
1037 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1038         os.MkdirAll(dir, 0770)
1039         file, err := os.Create(filepath.Join(dir, name))
1040         require.NoError(t, err)
1041         file.Write([]byte(name))
1042         file.Close()
1043         mi := metainfo.MetaInfo{}
1044         mi.SetDefaults()
1045         info := metainfo.Info{PieceLength: 256 * 1024}
1046         err = info.BuildFromFilePath(filepath.Join(dir, name))
1047         require.NoError(t, err)
1048         mi.InfoBytes, err = bencode.Marshal(info)
1049         require.NoError(t, err)
1050         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1051         tr, err := cl.AddTorrent(&mi)
1052         require.NoError(t, err)
1053         require.True(t, tr.Seeding())
1054         tr.VerifyData()
1055         return magnet
1056 }
1057
1058 // https://github.com/anacrolix/torrent/issues/114
1059 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1060         cfg := TestingConfig()
1061         cfg.DisableUTP = true
1062         cfg.Seed = true
1063         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1064         cfg.ForceEncryption = true
1065         os.Mkdir(cfg.DataDir, 0755)
1066         server, err := NewClient(cfg)
1067         require.NoError(t, err)
1068         defer server.Close()
1069         testutil.ExportStatusWriter(server, "s")
1070         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1071         makeMagnet(t, server, cfg.DataDir, "test2")
1072         cfg = TestingConfig()
1073         cfg.DisableUTP = true
1074         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1075         cfg.ForceEncryption = true
1076         client, err := NewClient(cfg)
1077         require.NoError(t, err)
1078         defer client.Close()
1079         testutil.ExportStatusWriter(client, "c")
1080         tr, err := client.AddMagnet(magnet1)
1081         require.NoError(t, err)
1082         tr.AddClientPeer(server)
1083         <-tr.GotInfo()
1084         tr.DownloadAll()
1085         client.WaitAll()
1086 }
1087
1088 func TestClientAddressInUse(t *testing.T) {
1089         s, _ := NewUtpSocket("udp", ":50007")
1090         if s != nil {
1091                 defer s.Close()
1092         }
1093         cfg := TestingConfig().SetListenAddr(":50007")
1094         cl, err := NewClient(cfg)
1095         require.Error(t, err)
1096         require.Nil(t, cl)
1097 }