]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Some test tidying and improvements
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net"
9         "os"
10         "path/filepath"
11         "sync"
12         "testing"
13         "time"
14
15         _ "github.com/anacrolix/envpprof"
16         "github.com/anacrolix/missinggo"
17         "github.com/anacrolix/missinggo/filecache"
18         "github.com/bradfitz/iter"
19         "github.com/stretchr/testify/assert"
20         "github.com/stretchr/testify/require"
21         "golang.org/x/time/rate"
22
23         "github.com/anacrolix/torrent/bencode"
24         "github.com/anacrolix/torrent/internal/testutil"
25         "github.com/anacrolix/torrent/iplist"
26         "github.com/anacrolix/torrent/metainfo"
27         "github.com/anacrolix/torrent/storage"
28 )
29
30 func TestingConfig() *Config {
31         return &Config{
32                 ListenAddr:              "localhost:0",
33                 NoDHT:                   true,
34                 DataDir:                 tempDir(),
35                 DisableTrackers:         true,
36                 NoDefaultPortForwarding: true,
37                 // Debug:           true,
38         }
39 }
40
41 func TestClientDefault(t *testing.T) {
42         cl, err := NewClient(TestingConfig())
43         require.NoError(t, err)
44         cl.Close()
45 }
46
47 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
48         cfg := TestingConfig()
49         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
50         require.NoError(t, err)
51         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
52         defer ci.Close()
53         cfg.DefaultStorage = ci
54         cl, err := NewClient(cfg)
55         require.NoError(t, err)
56         cl.Close()
57         // And again, https://github.com/anacrolix/torrent/issues/158
58         cl, err = NewClient(cfg)
59         require.NoError(t, err)
60         cl.Close()
61 }
62
63 func TestAddDropTorrent(t *testing.T) {
64         cl, err := NewClient(TestingConfig())
65         require.NoError(t, err)
66         defer cl.Close()
67         dir, mi := testutil.GreetingTestTorrent()
68         defer os.RemoveAll(dir)
69         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
70         require.NoError(t, err)
71         assert.True(t, new)
72         tt.SetMaxEstablishedConns(0)
73         tt.SetMaxEstablishedConns(1)
74         tt.Drop()
75 }
76
77 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
78         // TODO?
79         t.SkipNow()
80 }
81
82 func TestAddTorrentNoUsableURLs(t *testing.T) {
83         // TODO?
84         t.SkipNow()
85 }
86
87 func TestAddPeersToUnknownTorrent(t *testing.T) {
88         // TODO?
89         t.SkipNow()
90 }
91
92 func TestPieceHashSize(t *testing.T) {
93         assert.Equal(t, 20, pieceHash.Size())
94 }
95
96 func TestTorrentInitialState(t *testing.T) {
97         dir, mi := testutil.GreetingTestTorrent()
98         defer os.RemoveAll(dir)
99         cl := &Client{}
100         cl.initLogger()
101         tor := cl.newTorrent(
102                 mi.HashInfoBytes(),
103                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
104         )
105         tor.setChunkSize(2)
106         tor.cl.mu.Lock()
107         err := tor.setInfoBytes(mi.InfoBytes)
108         tor.cl.mu.Unlock()
109         require.NoError(t, err)
110         require.Len(t, tor.pieces, 3)
111         tor.pendAllChunkSpecs(0)
112         tor.cl.mu.Lock()
113         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
114         tor.cl.mu.Unlock()
115         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
116 }
117
118 func TestUnmarshalPEXMsg(t *testing.T) {
119         var m peerExchangeMessage
120         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
121                 t.Fatal(err)
122         }
123         if len(m.Added) != 2 {
124                 t.FailNow()
125         }
126         if m.Added[0].Port != 0x506 {
127                 t.FailNow()
128         }
129 }
130
131 func TestReducedDialTimeout(t *testing.T) {
132         cfg := &Config{}
133         cfg.setDefaults()
134         for _, _case := range []struct {
135                 Max             time.Duration
136                 HalfOpenLimit   int
137                 PendingPeers    int
138                 ExpectedReduced time.Duration
139         }{
140                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
141                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
142                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
143                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
144                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
145                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
146         } {
147                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
148                 expected := _case.ExpectedReduced
149                 if expected < cfg.MinDialTimeout {
150                         expected = cfg.MinDialTimeout
151                 }
152                 if reduced != expected {
153                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
154                 }
155         }
156 }
157
158 func TestUTPRawConn(t *testing.T) {
159         l, err := NewUtpSocket("udp", "")
160         require.NoError(t, err)
161         defer l.Close()
162         go func() {
163                 for {
164                         _, err := l.Accept()
165                         if err != nil {
166                                 break
167                         }
168                 }
169         }()
170         // Connect a UTP peer to see if the RawConn will still work.
171         s, err := NewUtpSocket("udp", "")
172         require.NoError(t, err)
173         defer s.Close()
174         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
175         require.NoError(t, err)
176         defer utpPeer.Close()
177         peer, err := net.ListenPacket("udp", ":0")
178         require.NoError(t, err)
179         defer peer.Close()
180
181         msgsReceived := 0
182         // How many messages to send. I've set this to double the channel buffer
183         // size in the raw packetConn.
184         const N = 200
185         readerStopped := make(chan struct{})
186         // The reader goroutine.
187         go func() {
188                 defer close(readerStopped)
189                 b := make([]byte, 500)
190                 for i := 0; i < N; i++ {
191                         n, _, err := l.ReadFrom(b)
192                         require.NoError(t, err)
193                         msgsReceived++
194                         var d int
195                         fmt.Sscan(string(b[:n]), &d)
196                         assert.Equal(t, i, d)
197                 }
198         }()
199         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
200         require.NoError(t, err)
201         for i := 0; i < N; i++ {
202                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
203                 require.NoError(t, err)
204                 time.Sleep(time.Millisecond)
205         }
206         select {
207         case <-readerStopped:
208         case <-time.After(time.Second):
209                 t.Fatal("reader timed out")
210         }
211         if msgsReceived != N {
212                 t.Fatalf("messages received: %d", msgsReceived)
213         }
214 }
215
216 func TestTwoClientsArbitraryPorts(t *testing.T) {
217         for i := 0; i < 2; i++ {
218                 cl, err := NewClient(TestingConfig())
219                 if err != nil {
220                         t.Fatal(err)
221                 }
222                 defer cl.Close()
223         }
224 }
225
226 func TestAddDropManyTorrents(t *testing.T) {
227         cl, err := NewClient(TestingConfig())
228         require.NoError(t, err)
229         defer cl.Close()
230         for i := range iter.N(1000) {
231                 var spec TorrentSpec
232                 binary.PutVarint(spec.InfoHash[:], int64(i))
233                 tt, new, err := cl.AddTorrentSpec(&spec)
234                 assert.NoError(t, err)
235                 assert.True(t, new)
236                 defer tt.Drop()
237         }
238 }
239
240 type FileCacheClientStorageFactoryParams struct {
241         Capacity    int64
242         SetCapacity bool
243         Wrapper     func(*filecache.Cache) storage.ClientImpl
244 }
245
246 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
247         return func(dataDir string) storage.ClientImpl {
248                 fc, err := filecache.NewCache(dataDir)
249                 if err != nil {
250                         panic(err)
251                 }
252                 if ps.SetCapacity {
253                         fc.SetCapacity(ps.Capacity)
254                 }
255                 return ps.Wrapper(fc)
256         }
257 }
258
259 type storageFactory func(string) storage.ClientImpl
260
261 func TestClientTransferDefault(t *testing.T) {
262         testClientTransfer(t, testClientTransferParams{
263                 ExportClientStatus: true,
264                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
265                         Wrapper: fileCachePieceResourceStorage,
266                 }),
267         })
268 }
269
270 func TestClientTransferRateLimitedUpload(t *testing.T) {
271         started := time.Now()
272         testClientTransfer(t, testClientTransferParams{
273                 // We are uploading 13 bytes (the length of the greeting torrent). The
274                 // chunks are 2 bytes in length. Then the smallest burst we can run
275                 // with is 2. Time taken is (13-burst)/rate.
276                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
277         })
278         require.True(t, time.Since(started) > time.Second)
279 }
280
281 func TestClientTransferRateLimitedDownload(t *testing.T) {
282         testClientTransfer(t, testClientTransferParams{
283                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
284         })
285 }
286
287 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
288         return storage.NewResourcePieces(fc.AsResourceProvider())
289 }
290
291 func TestClientTransferSmallCache(t *testing.T) {
292         testClientTransfer(t, testClientTransferParams{
293                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
294                         SetCapacity: true,
295                         // Going below the piece length means it can't complete a piece so
296                         // that it can be hashed.
297                         Capacity: 5,
298                         Wrapper:  fileCachePieceResourceStorage,
299                 }),
300                 SetReadahead: true,
301                 // Can't readahead too far or the cache will thrash and drop data we
302                 // thought we had.
303                 Readahead:          0,
304                 ExportClientStatus: true,
305         })
306 }
307
308 func TestClientTransferVarious(t *testing.T) {
309         // Leecher storage
310         for _, ls := range []storageFactory{
311                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
312                         Wrapper: fileCachePieceResourceStorage,
313                 }),
314                 storage.NewBoltDB,
315         } {
316                 // Seeder storage
317                 for _, ss := range []func(string) storage.ClientImpl{
318                         storage.NewFile,
319                         storage.NewMMap,
320                 } {
321                         for _, responsive := range []bool{false, true} {
322                                 testClientTransfer(t, testClientTransferParams{
323                                         Responsive:     responsive,
324                                         SeederStorage:  ss,
325                                         LeecherStorage: ls,
326                                 })
327                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
328                                         testClientTransfer(t, testClientTransferParams{
329                                                 SeederStorage:  ss,
330                                                 Responsive:     responsive,
331                                                 SetReadahead:   true,
332                                                 Readahead:      readahead,
333                                                 LeecherStorage: ls,
334                                         })
335                                 }
336                         }
337                 }
338         }
339 }
340
341 type testClientTransferParams struct {
342         Responsive                 bool
343         Readahead                  int64
344         SetReadahead               bool
345         ExportClientStatus         bool
346         LeecherStorage             func(string) storage.ClientImpl
347         SeederStorage              func(string) storage.ClientImpl
348         SeederUploadRateLimiter    *rate.Limiter
349         LeecherDownloadRateLimiter *rate.Limiter
350 }
351
352 // Creates a seeder and a leecher, and ensures the data transfers when a read
353 // is attempted on the leecher.
354 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
355         greetingTempDir, mi := testutil.GreetingTestTorrent()
356         defer os.RemoveAll(greetingTempDir)
357         // Create seeder and a Torrent.
358         cfg := TestingConfig()
359         cfg.Seed = true
360         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
361         // cfg.ListenAddr = "localhost:4000"
362         if ps.SeederStorage != nil {
363                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
364                 defer cfg.DefaultStorage.Close()
365         } else {
366                 cfg.DataDir = greetingTempDir
367         }
368         seeder, err := NewClient(cfg)
369         require.NoError(t, err)
370         if ps.ExportClientStatus {
371                 testutil.ExportStatusWriter(seeder, "s")
372         }
373         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
374         // Run a Stats right after Closing the Client. This will trigger the Stats
375         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
376         defer seederTorrent.Stats()
377         defer seeder.Close()
378         seederTorrent.VerifyData()
379         // Create leecher and a Torrent.
380         leecherDataDir, err := ioutil.TempDir("", "")
381         require.NoError(t, err)
382         defer os.RemoveAll(leecherDataDir)
383         if ps.LeecherStorage == nil {
384                 cfg.DataDir = leecherDataDir
385         } else {
386                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
387         }
388         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
389         cfg.Seed = false
390         leecher, err := NewClient(cfg)
391         require.NoError(t, err)
392         defer leecher.Close()
393         if ps.ExportClientStatus {
394                 testutil.ExportStatusWriter(leecher, "l")
395         }
396         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
397                 ret = TorrentSpecFromMetaInfo(mi)
398                 ret.ChunkSize = 2
399                 return
400         }())
401         require.NoError(t, err)
402         assert.True(t, new)
403         // Now do some things with leecher and seeder.
404         addClientPeer(leecherTorrent, seeder)
405         // The Torrent should not be interested in obtaining peers, so the one we
406         // just added should be the only one.
407         assert.False(t, leecherTorrent.Seeding())
408         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
409         r := leecherTorrent.NewReader()
410         defer r.Close()
411         if ps.Responsive {
412                 r.SetResponsive()
413         }
414         if ps.SetReadahead {
415                 r.SetReadahead(ps.Readahead)
416         }
417         assertReadAllGreeting(t, r)
418         assert.True(t, 13 <= seederTorrent.Stats().DataBytesWritten)
419         assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten)
420         assert.True(t, 13 <= leecherTorrent.Stats().DataBytesRead)
421         assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead)
422         // Try reading through again for the cases where the torrent data size
423         // exceeds the size of the cache.
424         assertReadAllGreeting(t, r)
425 }
426
427 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
428         pos, err := r.Seek(0, io.SeekStart)
429         assert.NoError(t, err)
430         assert.EqualValues(t, 0, pos)
431         _greeting, err := ioutil.ReadAll(r)
432         assert.NoError(t, err)
433         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
434 }
435
436 // Check that after completing leeching, a leecher transitions to a seeding
437 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
438 func TestSeedAfterDownloading(t *testing.T) {
439         greetingTempDir, mi := testutil.GreetingTestTorrent()
440         defer os.RemoveAll(greetingTempDir)
441         cfg := TestingConfig()
442         cfg.Seed = true
443         cfg.DataDir = greetingTempDir
444         seeder, err := NewClient(cfg)
445         require.NoError(t, err)
446         defer seeder.Close()
447         testutil.ExportStatusWriter(seeder, "s")
448         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
449         seederTorrent.VerifyData()
450         cfg.DataDir, err = ioutil.TempDir("", "")
451         require.NoError(t, err)
452         defer os.RemoveAll(cfg.DataDir)
453         leecher, err := NewClient(cfg)
454         require.NoError(t, err)
455         defer leecher.Close()
456         testutil.ExportStatusWriter(leecher, "l")
457         cfg.Seed = false
458         cfg.DataDir, err = ioutil.TempDir("", "")
459         require.NoError(t, err)
460         defer os.RemoveAll(cfg.DataDir)
461         leecherLeecher, _ := NewClient(cfg)
462         defer leecherLeecher.Close()
463         testutil.ExportStatusWriter(leecherLeecher, "ll")
464         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
465                 ret = TorrentSpecFromMetaInfo(mi)
466                 ret.ChunkSize = 2
467                 return
468         }())
469         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
470                 ret = TorrentSpecFromMetaInfo(mi)
471                 ret.ChunkSize = 3
472                 return
473         }())
474         // Simultaneously DownloadAll in Leecher, and read the contents
475         // consecutively in LeecherLeecher. This non-deterministically triggered a
476         // case where the leecher wouldn't unchoke the LeecherLeecher.
477         var wg sync.WaitGroup
478         wg.Add(1)
479         go func() {
480                 defer wg.Done()
481                 r := llg.NewReader()
482                 defer r.Close()
483                 b, err := ioutil.ReadAll(r)
484                 require.NoError(t, err)
485                 assert.EqualValues(t, testutil.GreetingFileContents, b)
486         }()
487         addClientPeer(leecherGreeting, seeder)
488         addClientPeer(leecherGreeting, leecherLeecher)
489         wg.Add(1)
490         go func() {
491                 defer wg.Done()
492                 leecherGreeting.DownloadAll()
493                 leecher.WaitAll()
494         }()
495         wg.Wait()
496 }
497
498 func TestMergingTrackersByAddingSpecs(t *testing.T) {
499         cl, err := NewClient(TestingConfig())
500         require.NoError(t, err)
501         defer cl.Close()
502         spec := TorrentSpec{}
503         T, new, _ := cl.AddTorrentSpec(&spec)
504         if !new {
505                 t.FailNow()
506         }
507         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
508         _, new, _ = cl.AddTorrentSpec(&spec)
509         assert.False(t, new)
510         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
511         // Because trackers are disabled in TestingConfig.
512         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
513 }
514
515 // We read from a piece which is marked completed, but is missing data.
516 func TestCompletedPieceWrongSize(t *testing.T) {
517         cfg := TestingConfig()
518         cfg.DefaultStorage = badStorage{}
519         cl, err := NewClient(cfg)
520         require.NoError(t, err)
521         defer cl.Close()
522         info := metainfo.Info{
523                 PieceLength: 15,
524                 Pieces:      make([]byte, 20),
525                 Files: []metainfo.FileInfo{
526                         {Path: []string{"greeting"}, Length: 13},
527                 },
528         }
529         b, err := bencode.Marshal(info)
530         require.NoError(t, err)
531         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
532                 InfoBytes: b,
533                 InfoHash:  metainfo.HashBytes(b),
534         })
535         require.NoError(t, err)
536         defer tt.Drop()
537         assert.True(t, new)
538         r := tt.NewReader()
539         defer r.Close()
540         b, err = ioutil.ReadAll(r)
541         assert.Len(t, b, 13)
542         assert.NoError(t, err)
543 }
544
545 func BenchmarkAddLargeTorrent(b *testing.B) {
546         cfg := TestingConfig()
547         cfg.DisableTCP = true
548         cfg.DisableUTP = true
549         cfg.ListenAddr = "redonk"
550         cl, err := NewClient(cfg)
551         require.NoError(b, err)
552         defer cl.Close()
553         for range iter.N(b.N) {
554                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
555                 if err != nil {
556                         b.Fatal(err)
557                 }
558                 t.Drop()
559         }
560 }
561
562 func TestResponsive(t *testing.T) {
563         seederDataDir, mi := testutil.GreetingTestTorrent()
564         defer os.RemoveAll(seederDataDir)
565         cfg := TestingConfig()
566         cfg.Seed = true
567         cfg.DataDir = seederDataDir
568         seeder, err := NewClient(cfg)
569         require.Nil(t, err)
570         defer seeder.Close()
571         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
572         seederTorrent.VerifyData()
573         leecherDataDir, err := ioutil.TempDir("", "")
574         require.Nil(t, err)
575         defer os.RemoveAll(leecherDataDir)
576         cfg = TestingConfig()
577         cfg.DataDir = leecherDataDir
578         leecher, err := NewClient(cfg)
579         require.Nil(t, err)
580         defer leecher.Close()
581         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
582                 ret = TorrentSpecFromMetaInfo(mi)
583                 ret.ChunkSize = 2
584                 return
585         }())
586         addClientPeer(leecherTorrent, seeder)
587         reader := leecherTorrent.NewReader()
588         defer reader.Close()
589         reader.SetReadahead(0)
590         reader.SetResponsive()
591         b := make([]byte, 2)
592         _, err = reader.Seek(3, io.SeekStart)
593         require.NoError(t, err)
594         _, err = io.ReadFull(reader, b)
595         assert.Nil(t, err)
596         assert.EqualValues(t, "lo", string(b))
597         _, err = reader.Seek(11, io.SeekStart)
598         require.NoError(t, err)
599         n, err := io.ReadFull(reader, b)
600         assert.Nil(t, err)
601         assert.EqualValues(t, 2, n)
602         assert.EqualValues(t, "d\n", string(b))
603 }
604
605 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
606         seederDataDir, mi := testutil.GreetingTestTorrent()
607         defer os.RemoveAll(seederDataDir)
608         cfg := TestingConfig()
609         cfg.Seed = true
610         cfg.DataDir = seederDataDir
611         seeder, err := NewClient(cfg)
612         require.Nil(t, err)
613         defer seeder.Close()
614         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
615         seederTorrent.VerifyData()
616         leecherDataDir, err := ioutil.TempDir("", "")
617         require.Nil(t, err)
618         defer os.RemoveAll(leecherDataDir)
619         cfg = TestingConfig()
620         cfg.DataDir = leecherDataDir
621         leecher, err := NewClient(cfg)
622         require.Nil(t, err)
623         defer leecher.Close()
624         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
625                 ret = TorrentSpecFromMetaInfo(mi)
626                 ret.ChunkSize = 2
627                 return
628         }())
629         addClientPeer(leecherTorrent, seeder)
630         reader := leecherTorrent.NewReader()
631         defer reader.Close()
632         reader.SetReadahead(0)
633         reader.SetResponsive()
634         b := make([]byte, 2)
635         _, err = reader.Seek(3, io.SeekStart)
636         require.NoError(t, err)
637         _, err = io.ReadFull(reader, b)
638         assert.Nil(t, err)
639         assert.EqualValues(t, "lo", string(b))
640         go leecherTorrent.Drop()
641         _, err = reader.Seek(11, io.SeekStart)
642         require.NoError(t, err)
643         n, err := reader.Read(b)
644         assert.EqualError(t, err, "torrent closed")
645         assert.EqualValues(t, 0, n)
646 }
647
648 func TestDHTInheritBlocklist(t *testing.T) {
649         ipl := iplist.New(nil)
650         require.NotNil(t, ipl)
651         cfg := TestingConfig()
652         cfg.IPBlocklist = ipl
653         cfg.NoDHT = false
654         cl, err := NewClient(cfg)
655         require.NoError(t, err)
656         defer cl.Close()
657         require.Equal(t, ipl, cl.DHT().IPBlocklist())
658 }
659
660 // Check that stuff is merged in subsequent AddTorrentSpec for the same
661 // infohash.
662 func TestAddTorrentSpecMerging(t *testing.T) {
663         cl, err := NewClient(TestingConfig())
664         require.NoError(t, err)
665         defer cl.Close()
666         dir, mi := testutil.GreetingTestTorrent()
667         defer os.RemoveAll(dir)
668         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
669                 InfoHash: mi.HashInfoBytes(),
670         })
671         require.NoError(t, err)
672         require.True(t, new)
673         require.Nil(t, tt.Info())
674         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
675         require.NoError(t, err)
676         require.False(t, new)
677         require.NotNil(t, tt.Info())
678 }
679
680 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
681         dir, mi := testutil.GreetingTestTorrent()
682         os.RemoveAll(dir)
683         cl, _ := NewClient(TestingConfig())
684         defer cl.Close()
685         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
686                 InfoHash: mi.HashInfoBytes(),
687         })
688         tt.Drop()
689         assert.EqualValues(t, 0, len(cl.Torrents()))
690         select {
691         case <-tt.GotInfo():
692                 t.FailNow()
693         default:
694         }
695 }
696
697 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
698         for i := range iter.N(info.NumPieces()) {
699                 p := info.Piece(i)
700                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
701         }
702 }
703
704 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
705         fileCacheDir, err := ioutil.TempDir("", "")
706         require.NoError(t, err)
707         defer os.RemoveAll(fileCacheDir)
708         fileCache, err := filecache.NewCache(fileCacheDir)
709         require.NoError(t, err)
710         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
711         defer os.RemoveAll(greetingDataTempDir)
712         filePieceStore := csf(fileCache)
713         defer filePieceStore.Close()
714         info, err := greetingMetainfo.UnmarshalInfo()
715         require.NoError(t, err)
716         ih := greetingMetainfo.HashInfoBytes()
717         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
718         require.NoError(t, err)
719         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
720         // require.Equal(t, len(testutil.GreetingFileContents), written)
721         // require.NoError(t, err)
722         for i := 0; i < info.NumPieces(); i++ {
723                 p := info.Piece(i)
724                 if alreadyCompleted {
725                         require.NoError(t, greetingData.Piece(p).MarkComplete())
726                 }
727         }
728         cfg := TestingConfig()
729         // TODO: Disable network option?
730         cfg.DisableTCP = true
731         cfg.DisableUTP = true
732         cfg.DefaultStorage = filePieceStore
733         cl, err := NewClient(cfg)
734         require.NoError(t, err)
735         defer cl.Close()
736         tt, err := cl.AddTorrent(greetingMetainfo)
737         require.NoError(t, err)
738         psrs := tt.PieceStateRuns()
739         assert.Len(t, psrs, 1)
740         assert.EqualValues(t, 3, psrs[0].Length)
741         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
742         if alreadyCompleted {
743                 r := tt.NewReader()
744                 b, err := ioutil.ReadAll(r)
745                 assert.NoError(t, err)
746                 assert.EqualValues(t, testutil.GreetingFileContents, b)
747         }
748 }
749
750 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
751         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
752 }
753
754 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
755         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
756 }
757
758 func TestAddMetainfoWithNodes(t *testing.T) {
759         cfg := TestingConfig()
760         cfg.ListenAddr = ":0"
761         cfg.NoDHT = false
762         // For now, we want to just jam the nodes into the table, without
763         // verifying them first. Also the DHT code doesn't support mixing secure
764         // and insecure nodes if security is enabled (yet).
765         cfg.DHTConfig.NoSecurity = true
766         cl, err := NewClient(cfg)
767         require.NoError(t, err)
768         defer cl.Close()
769         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
770         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
771         require.NoError(t, err)
772         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
773         // check if the announce-list is here instead. TODO: Add nodes.
774         assert.Len(t, tt.metainfo.AnnounceList, 5)
775         // There are 6 nodes in the torrent file.
776         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
777 }
778
779 type testDownloadCancelParams struct {
780         ExportClientStatus        bool
781         SetLeecherStorageCapacity bool
782         LeecherStorageCapacity    int64
783         Cancel                    bool
784 }
785
786 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
787         greetingTempDir, mi := testutil.GreetingTestTorrent()
788         defer os.RemoveAll(greetingTempDir)
789         cfg := TestingConfig()
790         cfg.Seed = true
791         cfg.DataDir = greetingTempDir
792         seeder, err := NewClient(cfg)
793         require.NoError(t, err)
794         defer seeder.Close()
795         if ps.ExportClientStatus {
796                 testutil.ExportStatusWriter(seeder, "s")
797         }
798         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
799         seederTorrent.VerifyData()
800         leecherDataDir, err := ioutil.TempDir("", "")
801         require.NoError(t, err)
802         defer os.RemoveAll(leecherDataDir)
803         fc, err := filecache.NewCache(leecherDataDir)
804         require.NoError(t, err)
805         if ps.SetLeecherStorageCapacity {
806                 fc.SetCapacity(ps.LeecherStorageCapacity)
807         }
808         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
809         cfg.DataDir = leecherDataDir
810         leecher, _ := NewClient(cfg)
811         defer leecher.Close()
812         if ps.ExportClientStatus {
813                 testutil.ExportStatusWriter(leecher, "l")
814         }
815         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
816                 ret = TorrentSpecFromMetaInfo(mi)
817                 ret.ChunkSize = 2
818                 return
819         }())
820         require.NoError(t, err)
821         assert.True(t, new)
822         psc := leecherGreeting.SubscribePieceStateChanges()
823         defer psc.Close()
824
825         leecherGreeting.cl.mu.Lock()
826         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
827         if ps.Cancel {
828                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
829         }
830         leecherGreeting.cl.mu.Unlock()
831
832         addClientPeer(leecherGreeting, seeder)
833         completes := make(map[int]bool, 3)
834 values:
835         for {
836                 // started := time.Now()
837                 select {
838                 case _v := <-psc.Values:
839                         // log.Print(time.Since(started))
840                         v := _v.(PieceStateChange)
841                         completes[v.Index] = v.Complete
842                 case <-time.After(100 * time.Millisecond):
843                         break values
844                 }
845         }
846         if ps.Cancel {
847                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
848         } else {
849                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
850         }
851
852 }
853
854 func TestTorrentDownloadAll(t *testing.T) {
855         testDownloadCancel(t, testDownloadCancelParams{})
856 }
857
858 func TestTorrentDownloadAllThenCancel(t *testing.T) {
859         testDownloadCancel(t, testDownloadCancelParams{
860                 Cancel: true,
861         })
862 }
863
864 // Ensure that it's an error for a peer to send an invalid have message.
865 func TestPeerInvalidHave(t *testing.T) {
866         cl, err := NewClient(TestingConfig())
867         require.NoError(t, err)
868         defer cl.Close()
869         info := metainfo.Info{
870                 PieceLength: 1,
871                 Pieces:      make([]byte, 20),
872                 Files:       []metainfo.FileInfo{{Length: 1}},
873         }
874         infoBytes, err := bencode.Marshal(info)
875         require.NoError(t, err)
876         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
877                 InfoBytes: infoBytes,
878                 InfoHash:  metainfo.HashBytes(infoBytes),
879                 Storage:   badStorage{},
880         })
881         require.NoError(t, err)
882         assert.True(t, _new)
883         defer tt.Drop()
884         cn := &connection{
885                 t: tt,
886         }
887         assert.NoError(t, cn.peerSentHave(0))
888         assert.Error(t, cn.peerSentHave(1))
889 }
890
891 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
892         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
893         defer os.RemoveAll(greetingTempDir)
894         cfg := TestingConfig()
895         cfg.DataDir = greetingTempDir
896         seeder, err := NewClient(TestingConfig())
897         require.NoError(t, err)
898         seeder.AddTorrentSpec(&TorrentSpec{
899                 InfoBytes: greetingMetainfo.InfoBytes,
900         })
901 }
902
903 func TestPrepareTrackerAnnounce(t *testing.T) {
904         cl := &Client{}
905         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
906         require.NoError(t, err)
907         assert.False(t, blocked)
908         assert.EqualValues(t, "localhost:1234", host)
909         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
910 }
911
912 // Check that when the listen port is 0, all the protocols listened on have
913 // the same port, and it isn't zero.
914 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
915         cl, err := NewClient(TestingConfig())
916         require.NoError(t, err)
917         defer cl.Close()
918         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
919         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
920 }
921
922 func TestClientDynamicListenTCPOnly(t *testing.T) {
923         cfg := TestingConfig()
924         cfg.DisableUTP = true
925         cl, err := NewClient(cfg)
926         require.NoError(t, err)
927         defer cl.Close()
928         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
929         assert.Nil(t, cl.utpSock)
930 }
931
932 func TestClientDynamicListenUTPOnly(t *testing.T) {
933         cfg := TestingConfig()
934         cfg.DisableTCP = true
935         cl, err := NewClient(cfg)
936         require.NoError(t, err)
937         defer cl.Close()
938         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
939         assert.Nil(t, cl.tcpListener)
940 }
941
942 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
943         cfg := TestingConfig()
944         cfg.DisableTCP = true
945         cfg.DisableUTP = true
946         cl, err := NewClient(cfg)
947         require.NoError(t, err)
948         defer cl.Close()
949         assert.Nil(t, cl.ListenAddr())
950 }
951
952 func addClientPeer(t *Torrent, cl *Client) {
953         t.AddPeers([]Peer{
954                 {
955                         IP:   missinggo.AddrIP(cl.ListenAddr()),
956                         Port: missinggo.AddrPort(cl.ListenAddr()),
957                 },
958         })
959 }
960
961 func totalConns(tts []*Torrent) (ret int) {
962         for _, tt := range tts {
963                 tt.cl.mu.Lock()
964                 ret += len(tt.conns)
965                 tt.cl.mu.Unlock()
966         }
967         return
968 }
969
970 func TestSetMaxEstablishedConn(t *testing.T) {
971         var tts []*Torrent
972         ih := testutil.GreetingMetaInfo().HashInfoBytes()
973         for i := range iter.N(3) {
974                 cl, err := NewClient(TestingConfig())
975                 require.NoError(t, err)
976                 defer cl.Close()
977                 tt, _ := cl.AddTorrentInfoHash(ih)
978                 tt.SetMaxEstablishedConns(2)
979                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
980                 tts = append(tts, tt)
981         }
982         addPeers := func() {
983                 for i, tt := range tts {
984                         for _, _tt := range tts[:i] {
985                                 addClientPeer(tt, _tt.cl)
986                         }
987                 }
988         }
989         waitTotalConns := func(num int) {
990                 for totalConns(tts) != num {
991                         time.Sleep(time.Millisecond)
992                 }
993         }
994         addPeers()
995         waitTotalConns(6)
996         tts[0].SetMaxEstablishedConns(1)
997         waitTotalConns(4)
998         tts[0].SetMaxEstablishedConns(0)
999         waitTotalConns(2)
1000         tts[0].SetMaxEstablishedConns(1)
1001         addPeers()
1002         waitTotalConns(4)
1003         tts[0].SetMaxEstablishedConns(2)
1004         addPeers()
1005         waitTotalConns(6)
1006 }
1007
1008 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1009         os.MkdirAll(dir, 0770)
1010         file, err := os.Create(filepath.Join(dir, name))
1011         require.NoError(t, err)
1012         file.Write([]byte(name))
1013         file.Close()
1014         mi := metainfo.MetaInfo{}
1015         mi.SetDefaults()
1016         info := metainfo.Info{PieceLength: 256 * 1024}
1017         err = info.BuildFromFilePath(filepath.Join(dir, name))
1018         require.NoError(t, err)
1019         mi.InfoBytes, err = bencode.Marshal(info)
1020         require.NoError(t, err)
1021         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1022         tr, err := cl.AddTorrent(&mi)
1023         require.NoError(t, err)
1024         require.True(t, tr.Seeding())
1025         tr.VerifyData()
1026         return magnet
1027 }
1028
1029 // https://github.com/anacrolix/torrent/issues/114
1030 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1031         cfg := TestingConfig()
1032         cfg.DisableUTP = true
1033         cfg.Seed = true
1034         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1035         cfg.ForceEncryption = true
1036         os.Mkdir(cfg.DataDir, 0755)
1037         server, err := NewClient(cfg)
1038         require.NoError(t, err)
1039         defer server.Close()
1040         testutil.ExportStatusWriter(server, "s")
1041         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1042         makeMagnet(t, server, cfg.DataDir, "test2")
1043         cfg = TestingConfig()
1044         cfg.DisableUTP = true
1045         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1046         cfg.ForceEncryption = true
1047         client, err := NewClient(cfg)
1048         require.NoError(t, err)
1049         defer client.Close()
1050         testutil.ExportStatusWriter(client, "c")
1051         tr, err := client.AddMagnet(magnet1)
1052         require.NoError(t, err)
1053         tr.AddPeers([]Peer{{
1054                 IP:   missinggo.AddrIP(server.ListenAddr()),
1055                 Port: missinggo.AddrPort(server.ListenAddr()),
1056         }})
1057         <-tr.GotInfo()
1058         tr.DownloadAll()
1059         client.WaitAll()
1060 }
1061
1062 func TestClientAddressInUse(t *testing.T) {
1063         s, _ := NewUtpSocket("udp", ":50007")
1064         if s != nil {
1065                 defer s.Close()
1066         }
1067         cfg := TestingConfig()
1068         cfg.ListenAddr = ":50007"
1069         cl, err := NewClient(cfg)
1070         require.Error(t, err)
1071         require.Nil(t, cl)
1072 }