]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Fix TestSetMaxEstablishedConn and allow it to be run with -count > 1
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net"
9         "os"
10         "path/filepath"
11         "sync"
12         "testing"
13         "time"
14
15         _ "github.com/anacrolix/envpprof"
16         "github.com/anacrolix/missinggo"
17         "github.com/anacrolix/missinggo/filecache"
18         "github.com/bradfitz/iter"
19         "github.com/stretchr/testify/assert"
20         "github.com/stretchr/testify/require"
21         "golang.org/x/time/rate"
22
23         "github.com/anacrolix/torrent/bencode"
24         "github.com/anacrolix/torrent/internal/testutil"
25         "github.com/anacrolix/torrent/iplist"
26         "github.com/anacrolix/torrent/metainfo"
27         "github.com/anacrolix/torrent/storage"
28 )
29
30 func TestingConfig() *Config {
31         return &Config{
32                 ListenAddr:              "localhost:0",
33                 NoDHT:                   true,
34                 DataDir:                 tempDir(),
35                 DisableTrackers:         true,
36                 NoDefaultPortForwarding: true,
37                 // Debug:           true,
38         }
39 }
40
41 func TestClientDefault(t *testing.T) {
42         cl, err := NewClient(TestingConfig())
43         require.NoError(t, err)
44         cl.Close()
45 }
46
47 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
48         cfg := TestingConfig()
49         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
50         require.NoError(t, err)
51         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
52         defer ci.Close()
53         cfg.DefaultStorage = ci
54         cl, err := NewClient(cfg)
55         require.NoError(t, err)
56         cl.Close()
57         // And again, https://github.com/anacrolix/torrent/issues/158
58         cl, err = NewClient(cfg)
59         require.NoError(t, err)
60         cl.Close()
61 }
62
63 func TestAddDropTorrent(t *testing.T) {
64         cl, err := NewClient(TestingConfig())
65         require.NoError(t, err)
66         defer cl.Close()
67         dir, mi := testutil.GreetingTestTorrent()
68         defer os.RemoveAll(dir)
69         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
70         require.NoError(t, err)
71         assert.True(t, new)
72         tt.SetMaxEstablishedConns(0)
73         tt.SetMaxEstablishedConns(1)
74         tt.Drop()
75 }
76
77 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
78         // TODO?
79         t.SkipNow()
80 }
81
82 func TestAddTorrentNoUsableURLs(t *testing.T) {
83         // TODO?
84         t.SkipNow()
85 }
86
87 func TestAddPeersToUnknownTorrent(t *testing.T) {
88         // TODO?
89         t.SkipNow()
90 }
91
92 func TestPieceHashSize(t *testing.T) {
93         assert.Equal(t, 20, pieceHash.Size())
94 }
95
96 func TestTorrentInitialState(t *testing.T) {
97         dir, mi := testutil.GreetingTestTorrent()
98         defer os.RemoveAll(dir)
99         cl := &Client{}
100         cl.initLogger()
101         tor := cl.newTorrent(
102                 mi.HashInfoBytes(),
103                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
104         )
105         tor.setChunkSize(2)
106         tor.cl.mu.Lock()
107         err := tor.setInfoBytes(mi.InfoBytes)
108         tor.cl.mu.Unlock()
109         require.NoError(t, err)
110         require.Len(t, tor.pieces, 3)
111         tor.pendAllChunkSpecs(0)
112         tor.cl.mu.Lock()
113         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
114         tor.cl.mu.Unlock()
115         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
116 }
117
118 func TestUnmarshalPEXMsg(t *testing.T) {
119         var m peerExchangeMessage
120         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
121                 t.Fatal(err)
122         }
123         if len(m.Added) != 2 {
124                 t.FailNow()
125         }
126         if m.Added[0].Port != 0x506 {
127                 t.FailNow()
128         }
129 }
130
131 func TestReducedDialTimeout(t *testing.T) {
132         cfg := &Config{}
133         cfg.setDefaults()
134         for _, _case := range []struct {
135                 Max             time.Duration
136                 HalfOpenLimit   int
137                 PendingPeers    int
138                 ExpectedReduced time.Duration
139         }{
140                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
141                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
142                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
143                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
144                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
145                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
146         } {
147                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
148                 expected := _case.ExpectedReduced
149                 if expected < cfg.MinDialTimeout {
150                         expected = cfg.MinDialTimeout
151                 }
152                 if reduced != expected {
153                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
154                 }
155         }
156 }
157
158 func TestUTPRawConn(t *testing.T) {
159         l, err := NewUtpSocket("udp", "")
160         require.NoError(t, err)
161         defer l.Close()
162         go func() {
163                 for {
164                         _, err := l.Accept()
165                         if err != nil {
166                                 break
167                         }
168                 }
169         }()
170         // Connect a UTP peer to see if the RawConn will still work.
171         s, err := NewUtpSocket("udp", "")
172         require.NoError(t, err)
173         defer s.Close()
174         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
175         require.NoError(t, err)
176         defer utpPeer.Close()
177         peer, err := net.ListenPacket("udp", ":0")
178         require.NoError(t, err)
179         defer peer.Close()
180
181         msgsReceived := 0
182         // How many messages to send. I've set this to double the channel buffer
183         // size in the raw packetConn.
184         const N = 200
185         readerStopped := make(chan struct{})
186         // The reader goroutine.
187         go func() {
188                 defer close(readerStopped)
189                 b := make([]byte, 500)
190                 for i := 0; i < N; i++ {
191                         n, _, err := l.ReadFrom(b)
192                         require.NoError(t, err)
193                         msgsReceived++
194                         var d int
195                         fmt.Sscan(string(b[:n]), &d)
196                         assert.Equal(t, i, d)
197                 }
198         }()
199         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
200         require.NoError(t, err)
201         for i := 0; i < N; i++ {
202                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
203                 require.NoError(t, err)
204                 time.Sleep(time.Millisecond)
205         }
206         select {
207         case <-readerStopped:
208         case <-time.After(time.Second):
209                 t.Fatal("reader timed out")
210         }
211         if msgsReceived != N {
212                 t.Fatalf("messages received: %d", msgsReceived)
213         }
214 }
215
216 func TestTwoClientsArbitraryPorts(t *testing.T) {
217         for i := 0; i < 2; i++ {
218                 cl, err := NewClient(TestingConfig())
219                 if err != nil {
220                         t.Fatal(err)
221                 }
222                 defer cl.Close()
223         }
224 }
225
226 func TestAddDropManyTorrents(t *testing.T) {
227         cl, err := NewClient(TestingConfig())
228         require.NoError(t, err)
229         defer cl.Close()
230         for i := range iter.N(1000) {
231                 var spec TorrentSpec
232                 binary.PutVarint(spec.InfoHash[:], int64(i))
233                 tt, new, err := cl.AddTorrentSpec(&spec)
234                 assert.NoError(t, err)
235                 assert.True(t, new)
236                 defer tt.Drop()
237         }
238 }
239
240 type FileCacheClientStorageFactoryParams struct {
241         Capacity    int64
242         SetCapacity bool
243         Wrapper     func(*filecache.Cache) storage.ClientImpl
244 }
245
246 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
247         return func(dataDir string) storage.ClientImpl {
248                 fc, err := filecache.NewCache(dataDir)
249                 if err != nil {
250                         panic(err)
251                 }
252                 if ps.SetCapacity {
253                         fc.SetCapacity(ps.Capacity)
254                 }
255                 return ps.Wrapper(fc)
256         }
257 }
258
259 type storageFactory func(string) storage.ClientImpl
260
261 func TestClientTransferDefault(t *testing.T) {
262         testClientTransfer(t, testClientTransferParams{
263                 ExportClientStatus: true,
264                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
265                         Wrapper: fileCachePieceResourceStorage,
266                 }),
267         })
268 }
269
270 func TestClientTransferRateLimitedUpload(t *testing.T) {
271         started := time.Now()
272         testClientTransfer(t, testClientTransferParams{
273                 // We are uploading 13 bytes (the length of the greeting torrent). The
274                 // chunks are 2 bytes in length. Then the smallest burst we can run
275                 // with is 2. Time taken is (13-burst)/rate.
276                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
277                 ExportClientStatus:      true,
278         })
279         require.True(t, time.Since(started) > time.Second)
280 }
281
282 func TestClientTransferRateLimitedDownload(t *testing.T) {
283         testClientTransfer(t, testClientTransferParams{
284                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
285         })
286 }
287
288 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
289         return storage.NewResourcePieces(fc.AsResourceProvider())
290 }
291
292 func TestClientTransferSmallCache(t *testing.T) {
293         testClientTransfer(t, testClientTransferParams{
294                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
295                         SetCapacity: true,
296                         // Going below the piece length means it can't complete a piece so
297                         // that it can be hashed.
298                         Capacity: 5,
299                         Wrapper:  fileCachePieceResourceStorage,
300                 }),
301                 SetReadahead: true,
302                 // Can't readahead too far or the cache will thrash and drop data we
303                 // thought we had.
304                 Readahead:          0,
305                 ExportClientStatus: true,
306         })
307 }
308
309 func TestClientTransferVarious(t *testing.T) {
310         // Leecher storage
311         for _, ls := range []storageFactory{
312                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
313                         Wrapper: fileCachePieceResourceStorage,
314                 }),
315                 storage.NewBoltDB,
316         } {
317                 // Seeder storage
318                 for _, ss := range []func(string) storage.ClientImpl{
319                         storage.NewFile,
320                         storage.NewMMap,
321                 } {
322                         for _, responsive := range []bool{false, true} {
323                                 testClientTransfer(t, testClientTransferParams{
324                                         Responsive:     responsive,
325                                         SeederStorage:  ss,
326                                         LeecherStorage: ls,
327                                 })
328                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
329                                         testClientTransfer(t, testClientTransferParams{
330                                                 SeederStorage:  ss,
331                                                 Responsive:     responsive,
332                                                 SetReadahead:   true,
333                                                 Readahead:      readahead,
334                                                 LeecherStorage: ls,
335                                         })
336                                 }
337                         }
338                 }
339         }
340 }
341
342 type testClientTransferParams struct {
343         Responsive                 bool
344         Readahead                  int64
345         SetReadahead               bool
346         ExportClientStatus         bool
347         LeecherStorage             func(string) storage.ClientImpl
348         SeederStorage              func(string) storage.ClientImpl
349         SeederUploadRateLimiter    *rate.Limiter
350         LeecherDownloadRateLimiter *rate.Limiter
351 }
352
353 // Creates a seeder and a leecher, and ensures the data transfers when a read
354 // is attempted on the leecher.
355 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
356         greetingTempDir, mi := testutil.GreetingTestTorrent()
357         defer os.RemoveAll(greetingTempDir)
358         // Create seeder and a Torrent.
359         cfg := TestingConfig()
360         cfg.Seed = true
361         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
362         // cfg.ListenAddr = "localhost:4000"
363         if ps.SeederStorage != nil {
364                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
365                 defer cfg.DefaultStorage.Close()
366         } else {
367                 cfg.DataDir = greetingTempDir
368         }
369         seeder, err := NewClient(cfg)
370         require.NoError(t, err)
371         if ps.ExportClientStatus {
372                 testutil.ExportStatusWriter(seeder, "s")
373         }
374         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
375         // Run a Stats right after Closing the Client. This will trigger the Stats
376         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
377         defer seederTorrent.Stats()
378         defer seeder.Close()
379         seederTorrent.VerifyData()
380         // Create leecher and a Torrent.
381         leecherDataDir, err := ioutil.TempDir("", "")
382         require.NoError(t, err)
383         defer os.RemoveAll(leecherDataDir)
384         if ps.LeecherStorage == nil {
385                 cfg.DataDir = leecherDataDir
386         } else {
387                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
388         }
389         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
390         cfg.Seed = false
391         leecher, err := NewClient(cfg)
392         require.NoError(t, err)
393         defer leecher.Close()
394         if ps.ExportClientStatus {
395                 testutil.ExportStatusWriter(leecher, "l")
396         }
397         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
398                 ret = TorrentSpecFromMetaInfo(mi)
399                 ret.ChunkSize = 2
400                 return
401         }())
402         require.NoError(t, err)
403         assert.True(t, new)
404         // Now do some things with leecher and seeder.
405         addClientPeer(leecherTorrent, seeder)
406         // The Torrent should not be interested in obtaining peers, so the one we
407         // just added should be the only one.
408         assert.False(t, leecherTorrent.Seeding())
409         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
410         r := leecherTorrent.NewReader()
411         defer r.Close()
412         if ps.Responsive {
413                 r.SetResponsive()
414         }
415         if ps.SetReadahead {
416                 r.SetReadahead(ps.Readahead)
417         }
418         assertReadAllGreeting(t, r)
419         assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData)
420         assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten)
421         assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData)
422         assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead)
423         // Try reading through again for the cases where the torrent data size
424         // exceeds the size of the cache.
425         assertReadAllGreeting(t, r)
426 }
427
428 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
429         pos, err := r.Seek(0, io.SeekStart)
430         assert.NoError(t, err)
431         assert.EqualValues(t, 0, pos)
432         _greeting, err := ioutil.ReadAll(r)
433         assert.NoError(t, err)
434         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
435 }
436
437 // Check that after completing leeching, a leecher transitions to a seeding
438 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
439 func TestSeedAfterDownloading(t *testing.T) {
440         greetingTempDir, mi := testutil.GreetingTestTorrent()
441         defer os.RemoveAll(greetingTempDir)
442         cfg := TestingConfig()
443         cfg.Seed = true
444         cfg.DataDir = greetingTempDir
445         seeder, err := NewClient(cfg)
446         require.NoError(t, err)
447         defer seeder.Close()
448         testutil.ExportStatusWriter(seeder, "s")
449         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
450         seederTorrent.VerifyData()
451         cfg.DataDir, err = ioutil.TempDir("", "")
452         require.NoError(t, err)
453         defer os.RemoveAll(cfg.DataDir)
454         leecher, err := NewClient(cfg)
455         require.NoError(t, err)
456         defer leecher.Close()
457         testutil.ExportStatusWriter(leecher, "l")
458         cfg.Seed = false
459         cfg.DataDir, err = ioutil.TempDir("", "")
460         require.NoError(t, err)
461         defer os.RemoveAll(cfg.DataDir)
462         leecherLeecher, _ := NewClient(cfg)
463         defer leecherLeecher.Close()
464         testutil.ExportStatusWriter(leecherLeecher, "ll")
465         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
466                 ret = TorrentSpecFromMetaInfo(mi)
467                 ret.ChunkSize = 2
468                 return
469         }())
470         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
471                 ret = TorrentSpecFromMetaInfo(mi)
472                 ret.ChunkSize = 3
473                 return
474         }())
475         // Simultaneously DownloadAll in Leecher, and read the contents
476         // consecutively in LeecherLeecher. This non-deterministically triggered a
477         // case where the leecher wouldn't unchoke the LeecherLeecher.
478         var wg sync.WaitGroup
479         wg.Add(1)
480         go func() {
481                 defer wg.Done()
482                 r := llg.NewReader()
483                 defer r.Close()
484                 b, err := ioutil.ReadAll(r)
485                 require.NoError(t, err)
486                 assert.EqualValues(t, testutil.GreetingFileContents, b)
487         }()
488         addClientPeer(leecherGreeting, seeder)
489         addClientPeer(leecherGreeting, leecherLeecher)
490         wg.Add(1)
491         go func() {
492                 defer wg.Done()
493                 leecherGreeting.DownloadAll()
494                 leecher.WaitAll()
495         }()
496         wg.Wait()
497 }
498
499 func TestMergingTrackersByAddingSpecs(t *testing.T) {
500         cl, err := NewClient(TestingConfig())
501         require.NoError(t, err)
502         defer cl.Close()
503         spec := TorrentSpec{}
504         T, new, _ := cl.AddTorrentSpec(&spec)
505         if !new {
506                 t.FailNow()
507         }
508         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
509         _, new, _ = cl.AddTorrentSpec(&spec)
510         assert.False(t, new)
511         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
512         // Because trackers are disabled in TestingConfig.
513         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
514 }
515
516 // We read from a piece which is marked completed, but is missing data.
517 func TestCompletedPieceWrongSize(t *testing.T) {
518         cfg := TestingConfig()
519         cfg.DefaultStorage = badStorage{}
520         cl, err := NewClient(cfg)
521         require.NoError(t, err)
522         defer cl.Close()
523         info := metainfo.Info{
524                 PieceLength: 15,
525                 Pieces:      make([]byte, 20),
526                 Files: []metainfo.FileInfo{
527                         {Path: []string{"greeting"}, Length: 13},
528                 },
529         }
530         b, err := bencode.Marshal(info)
531         require.NoError(t, err)
532         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
533                 InfoBytes: b,
534                 InfoHash:  metainfo.HashBytes(b),
535         })
536         require.NoError(t, err)
537         defer tt.Drop()
538         assert.True(t, new)
539         r := tt.NewReader()
540         defer r.Close()
541         b, err = ioutil.ReadAll(r)
542         assert.Len(t, b, 13)
543         assert.NoError(t, err)
544 }
545
546 func BenchmarkAddLargeTorrent(b *testing.B) {
547         cfg := TestingConfig()
548         cfg.DisableTCP = true
549         cfg.DisableUTP = true
550         cfg.ListenAddr = "redonk"
551         cl, err := NewClient(cfg)
552         require.NoError(b, err)
553         defer cl.Close()
554         for range iter.N(b.N) {
555                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
556                 if err != nil {
557                         b.Fatal(err)
558                 }
559                 t.Drop()
560         }
561 }
562
563 func TestResponsive(t *testing.T) {
564         seederDataDir, mi := testutil.GreetingTestTorrent()
565         defer os.RemoveAll(seederDataDir)
566         cfg := TestingConfig()
567         cfg.Seed = true
568         cfg.DataDir = seederDataDir
569         seeder, err := NewClient(cfg)
570         require.Nil(t, err)
571         defer seeder.Close()
572         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
573         seederTorrent.VerifyData()
574         leecherDataDir, err := ioutil.TempDir("", "")
575         require.Nil(t, err)
576         defer os.RemoveAll(leecherDataDir)
577         cfg = TestingConfig()
578         cfg.DataDir = leecherDataDir
579         leecher, err := NewClient(cfg)
580         require.Nil(t, err)
581         defer leecher.Close()
582         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
583                 ret = TorrentSpecFromMetaInfo(mi)
584                 ret.ChunkSize = 2
585                 return
586         }())
587         addClientPeer(leecherTorrent, seeder)
588         reader := leecherTorrent.NewReader()
589         defer reader.Close()
590         reader.SetReadahead(0)
591         reader.SetResponsive()
592         b := make([]byte, 2)
593         _, err = reader.Seek(3, io.SeekStart)
594         require.NoError(t, err)
595         _, err = io.ReadFull(reader, b)
596         assert.Nil(t, err)
597         assert.EqualValues(t, "lo", string(b))
598         _, err = reader.Seek(11, io.SeekStart)
599         require.NoError(t, err)
600         n, err := io.ReadFull(reader, b)
601         assert.Nil(t, err)
602         assert.EqualValues(t, 2, n)
603         assert.EqualValues(t, "d\n", string(b))
604 }
605
606 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
607         seederDataDir, mi := testutil.GreetingTestTorrent()
608         defer os.RemoveAll(seederDataDir)
609         cfg := TestingConfig()
610         cfg.Seed = true
611         cfg.DataDir = seederDataDir
612         seeder, err := NewClient(cfg)
613         require.Nil(t, err)
614         defer seeder.Close()
615         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
616         seederTorrent.VerifyData()
617         leecherDataDir, err := ioutil.TempDir("", "")
618         require.Nil(t, err)
619         defer os.RemoveAll(leecherDataDir)
620         cfg = TestingConfig()
621         cfg.DataDir = leecherDataDir
622         leecher, err := NewClient(cfg)
623         require.Nil(t, err)
624         defer leecher.Close()
625         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
626                 ret = TorrentSpecFromMetaInfo(mi)
627                 ret.ChunkSize = 2
628                 return
629         }())
630         addClientPeer(leecherTorrent, seeder)
631         reader := leecherTorrent.NewReader()
632         defer reader.Close()
633         reader.SetReadahead(0)
634         reader.SetResponsive()
635         b := make([]byte, 2)
636         _, err = reader.Seek(3, io.SeekStart)
637         require.NoError(t, err)
638         _, err = io.ReadFull(reader, b)
639         assert.Nil(t, err)
640         assert.EqualValues(t, "lo", string(b))
641         go leecherTorrent.Drop()
642         _, err = reader.Seek(11, io.SeekStart)
643         require.NoError(t, err)
644         n, err := reader.Read(b)
645         assert.EqualError(t, err, "torrent closed")
646         assert.EqualValues(t, 0, n)
647 }
648
649 func TestDHTInheritBlocklist(t *testing.T) {
650         ipl := iplist.New(nil)
651         require.NotNil(t, ipl)
652         cfg := TestingConfig()
653         cfg.IPBlocklist = ipl
654         cfg.NoDHT = false
655         cl, err := NewClient(cfg)
656         require.NoError(t, err)
657         defer cl.Close()
658         require.Equal(t, ipl, cl.DHT().IPBlocklist())
659 }
660
661 // Check that stuff is merged in subsequent AddTorrentSpec for the same
662 // infohash.
663 func TestAddTorrentSpecMerging(t *testing.T) {
664         cl, err := NewClient(TestingConfig())
665         require.NoError(t, err)
666         defer cl.Close()
667         dir, mi := testutil.GreetingTestTorrent()
668         defer os.RemoveAll(dir)
669         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
670                 InfoHash: mi.HashInfoBytes(),
671         })
672         require.NoError(t, err)
673         require.True(t, new)
674         require.Nil(t, tt.Info())
675         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
676         require.NoError(t, err)
677         require.False(t, new)
678         require.NotNil(t, tt.Info())
679 }
680
681 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
682         dir, mi := testutil.GreetingTestTorrent()
683         os.RemoveAll(dir)
684         cl, _ := NewClient(TestingConfig())
685         defer cl.Close()
686         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
687                 InfoHash: mi.HashInfoBytes(),
688         })
689         tt.Drop()
690         assert.EqualValues(t, 0, len(cl.Torrents()))
691         select {
692         case <-tt.GotInfo():
693                 t.FailNow()
694         default:
695         }
696 }
697
698 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
699         for i := range iter.N(info.NumPieces()) {
700                 p := info.Piece(i)
701                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
702         }
703 }
704
705 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
706         fileCacheDir, err := ioutil.TempDir("", "")
707         require.NoError(t, err)
708         defer os.RemoveAll(fileCacheDir)
709         fileCache, err := filecache.NewCache(fileCacheDir)
710         require.NoError(t, err)
711         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
712         defer os.RemoveAll(greetingDataTempDir)
713         filePieceStore := csf(fileCache)
714         defer filePieceStore.Close()
715         info, err := greetingMetainfo.UnmarshalInfo()
716         require.NoError(t, err)
717         ih := greetingMetainfo.HashInfoBytes()
718         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
719         require.NoError(t, err)
720         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
721         // require.Equal(t, len(testutil.GreetingFileContents), written)
722         // require.NoError(t, err)
723         for i := 0; i < info.NumPieces(); i++ {
724                 p := info.Piece(i)
725                 if alreadyCompleted {
726                         require.NoError(t, greetingData.Piece(p).MarkComplete())
727                 }
728         }
729         cfg := TestingConfig()
730         // TODO: Disable network option?
731         cfg.DisableTCP = true
732         cfg.DisableUTP = true
733         cfg.DefaultStorage = filePieceStore
734         cl, err := NewClient(cfg)
735         require.NoError(t, err)
736         defer cl.Close()
737         tt, err := cl.AddTorrent(greetingMetainfo)
738         require.NoError(t, err)
739         psrs := tt.PieceStateRuns()
740         assert.Len(t, psrs, 1)
741         assert.EqualValues(t, 3, psrs[0].Length)
742         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
743         if alreadyCompleted {
744                 r := tt.NewReader()
745                 b, err := ioutil.ReadAll(r)
746                 assert.NoError(t, err)
747                 assert.EqualValues(t, testutil.GreetingFileContents, b)
748         }
749 }
750
751 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
752         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
753 }
754
755 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
756         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
757 }
758
759 func TestAddMetainfoWithNodes(t *testing.T) {
760         cfg := TestingConfig()
761         cfg.ListenAddr = ":0"
762         cfg.NoDHT = false
763         // For now, we want to just jam the nodes into the table, without
764         // verifying them first. Also the DHT code doesn't support mixing secure
765         // and insecure nodes if security is enabled (yet).
766         cfg.DHTConfig.NoSecurity = true
767         cl, err := NewClient(cfg)
768         require.NoError(t, err)
769         defer cl.Close()
770         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
771         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
772         require.NoError(t, err)
773         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
774         // check if the announce-list is here instead. TODO: Add nodes.
775         assert.Len(t, tt.metainfo.AnnounceList, 5)
776         // There are 6 nodes in the torrent file.
777         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
778 }
779
780 type testDownloadCancelParams struct {
781         ExportClientStatus        bool
782         SetLeecherStorageCapacity bool
783         LeecherStorageCapacity    int64
784         Cancel                    bool
785 }
786
787 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
788         greetingTempDir, mi := testutil.GreetingTestTorrent()
789         defer os.RemoveAll(greetingTempDir)
790         cfg := TestingConfig()
791         cfg.Seed = true
792         cfg.DataDir = greetingTempDir
793         seeder, err := NewClient(cfg)
794         require.NoError(t, err)
795         defer seeder.Close()
796         if ps.ExportClientStatus {
797                 testutil.ExportStatusWriter(seeder, "s")
798         }
799         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
800         seederTorrent.VerifyData()
801         leecherDataDir, err := ioutil.TempDir("", "")
802         require.NoError(t, err)
803         defer os.RemoveAll(leecherDataDir)
804         fc, err := filecache.NewCache(leecherDataDir)
805         require.NoError(t, err)
806         if ps.SetLeecherStorageCapacity {
807                 fc.SetCapacity(ps.LeecherStorageCapacity)
808         }
809         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
810         cfg.DataDir = leecherDataDir
811         leecher, _ := NewClient(cfg)
812         defer leecher.Close()
813         if ps.ExportClientStatus {
814                 testutil.ExportStatusWriter(leecher, "l")
815         }
816         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
817                 ret = TorrentSpecFromMetaInfo(mi)
818                 ret.ChunkSize = 2
819                 return
820         }())
821         require.NoError(t, err)
822         assert.True(t, new)
823         psc := leecherGreeting.SubscribePieceStateChanges()
824         defer psc.Close()
825
826         leecherGreeting.cl.mu.Lock()
827         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
828         if ps.Cancel {
829                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
830         }
831         leecherGreeting.cl.mu.Unlock()
832
833         addClientPeer(leecherGreeting, seeder)
834         completes := make(map[int]bool, 3)
835 values:
836         for {
837                 // started := time.Now()
838                 select {
839                 case _v := <-psc.Values:
840                         // log.Print(time.Since(started))
841                         v := _v.(PieceStateChange)
842                         completes[v.Index] = v.Complete
843                 case <-time.After(100 * time.Millisecond):
844                         break values
845                 }
846         }
847         if ps.Cancel {
848                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
849         } else {
850                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
851         }
852
853 }
854
855 func TestTorrentDownloadAll(t *testing.T) {
856         testDownloadCancel(t, testDownloadCancelParams{})
857 }
858
859 func TestTorrentDownloadAllThenCancel(t *testing.T) {
860         testDownloadCancel(t, testDownloadCancelParams{
861                 Cancel: true,
862         })
863 }
864
865 // Ensure that it's an error for a peer to send an invalid have message.
866 func TestPeerInvalidHave(t *testing.T) {
867         cl, err := NewClient(TestingConfig())
868         require.NoError(t, err)
869         defer cl.Close()
870         info := metainfo.Info{
871                 PieceLength: 1,
872                 Pieces:      make([]byte, 20),
873                 Files:       []metainfo.FileInfo{{Length: 1}},
874         }
875         infoBytes, err := bencode.Marshal(info)
876         require.NoError(t, err)
877         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
878                 InfoBytes: infoBytes,
879                 InfoHash:  metainfo.HashBytes(infoBytes),
880                 Storage:   badStorage{},
881         })
882         require.NoError(t, err)
883         assert.True(t, _new)
884         defer tt.Drop()
885         cn := &connection{
886                 t: tt,
887         }
888         assert.NoError(t, cn.peerSentHave(0))
889         assert.Error(t, cn.peerSentHave(1))
890 }
891
892 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
893         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
894         defer os.RemoveAll(greetingTempDir)
895         cfg := TestingConfig()
896         cfg.DataDir = greetingTempDir
897         seeder, err := NewClient(TestingConfig())
898         require.NoError(t, err)
899         seeder.AddTorrentSpec(&TorrentSpec{
900                 InfoBytes: greetingMetainfo.InfoBytes,
901         })
902 }
903
904 func TestPrepareTrackerAnnounce(t *testing.T) {
905         cl := &Client{}
906         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
907         require.NoError(t, err)
908         assert.False(t, blocked)
909         assert.EqualValues(t, "localhost:1234", host)
910         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
911 }
912
913 // Check that when the listen port is 0, all the protocols listened on have
914 // the same port, and it isn't zero.
915 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
916         cl, err := NewClient(TestingConfig())
917         require.NoError(t, err)
918         defer cl.Close()
919         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
920         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
921 }
922
923 func TestClientDynamicListenTCPOnly(t *testing.T) {
924         cfg := TestingConfig()
925         cfg.DisableUTP = true
926         cl, err := NewClient(cfg)
927         require.NoError(t, err)
928         defer cl.Close()
929         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
930         assert.Nil(t, cl.utpSock)
931 }
932
933 func TestClientDynamicListenUTPOnly(t *testing.T) {
934         cfg := TestingConfig()
935         cfg.DisableTCP = true
936         cl, err := NewClient(cfg)
937         require.NoError(t, err)
938         defer cl.Close()
939         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
940         assert.Nil(t, cl.tcpListener)
941 }
942
943 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
944         cfg := TestingConfig()
945         cfg.DisableTCP = true
946         cfg.DisableUTP = true
947         cl, err := NewClient(cfg)
948         require.NoError(t, err)
949         defer cl.Close()
950         assert.Nil(t, cl.ListenAddr())
951 }
952
953 func addClientPeer(t *Torrent, cl *Client) {
954         t.AddPeers([]Peer{
955                 {
956                         IP:   missinggo.AddrIP(cl.ListenAddr()),
957                         Port: missinggo.AddrPort(cl.ListenAddr()),
958                 },
959         })
960 }
961
962 func totalConns(tts []*Torrent) (ret int) {
963         for _, tt := range tts {
964                 tt.cl.mu.Lock()
965                 ret += len(tt.conns)
966                 tt.cl.mu.Unlock()
967         }
968         return
969 }
970
971 func TestSetMaxEstablishedConn(t *testing.T) {
972         ss := testutil.NewStatusServer(t)
973         defer ss.Close()
974         var tts []*Torrent
975         ih := testutil.GreetingMetaInfo().HashInfoBytes()
976         for i := range iter.N(3) {
977                 cl, err := NewClient(TestingConfig())
978                 require.NoError(t, err)
979                 defer cl.Close()
980                 tt, _ := cl.AddTorrentInfoHash(ih)
981                 tt.SetMaxEstablishedConns(2)
982                 ss.HandleStatusWriter(cl, fmt.Sprintf("/%d", i))
983                 tts = append(tts, tt)
984         }
985         addPeers := func() {
986                 for _, tt := range tts {
987                         for _, _tt := range tts {
988                                 // if tt != _tt {
989                                 addClientPeer(tt, _tt.cl)
990                                 // }
991                         }
992                 }
993         }
994         waitTotalConns := func(num int) {
995                 for totalConns(tts) != num {
996                         addPeers()
997                         time.Sleep(time.Millisecond)
998                 }
999         }
1000         addPeers()
1001         waitTotalConns(6)
1002         tts[0].SetMaxEstablishedConns(1)
1003         waitTotalConns(4)
1004         tts[0].SetMaxEstablishedConns(0)
1005         waitTotalConns(2)
1006         tts[0].SetMaxEstablishedConns(1)
1007         addPeers()
1008         waitTotalConns(4)
1009         tts[0].SetMaxEstablishedConns(2)
1010         addPeers()
1011         waitTotalConns(6)
1012 }
1013
1014 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1015         os.MkdirAll(dir, 0770)
1016         file, err := os.Create(filepath.Join(dir, name))
1017         require.NoError(t, err)
1018         file.Write([]byte(name))
1019         file.Close()
1020         mi := metainfo.MetaInfo{}
1021         mi.SetDefaults()
1022         info := metainfo.Info{PieceLength: 256 * 1024}
1023         err = info.BuildFromFilePath(filepath.Join(dir, name))
1024         require.NoError(t, err)
1025         mi.InfoBytes, err = bencode.Marshal(info)
1026         require.NoError(t, err)
1027         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1028         tr, err := cl.AddTorrent(&mi)
1029         require.NoError(t, err)
1030         require.True(t, tr.Seeding())
1031         tr.VerifyData()
1032         return magnet
1033 }
1034
1035 // https://github.com/anacrolix/torrent/issues/114
1036 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1037         cfg := TestingConfig()
1038         cfg.DisableUTP = true
1039         cfg.Seed = true
1040         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1041         cfg.ForceEncryption = true
1042         os.Mkdir(cfg.DataDir, 0755)
1043         server, err := NewClient(cfg)
1044         require.NoError(t, err)
1045         defer server.Close()
1046         testutil.ExportStatusWriter(server, "s")
1047         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1048         makeMagnet(t, server, cfg.DataDir, "test2")
1049         cfg = TestingConfig()
1050         cfg.DisableUTP = true
1051         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1052         cfg.ForceEncryption = true
1053         client, err := NewClient(cfg)
1054         require.NoError(t, err)
1055         defer client.Close()
1056         testutil.ExportStatusWriter(client, "c")
1057         tr, err := client.AddMagnet(magnet1)
1058         require.NoError(t, err)
1059         tr.AddPeers([]Peer{{
1060                 IP:   missinggo.AddrIP(server.ListenAddr()),
1061                 Port: missinggo.AddrPort(server.ListenAddr()),
1062         }})
1063         <-tr.GotInfo()
1064         tr.DownloadAll()
1065         client.WaitAll()
1066 }
1067
1068 func TestClientAddressInUse(t *testing.T) {
1069         s, _ := NewUtpSocket("udp", ":50007")
1070         if s != nil {
1071                 defer s.Close()
1072         }
1073         cfg := TestingConfig()
1074         cfg.ListenAddr = ":50007"
1075         cl, err := NewClient(cfg)
1076         require.Error(t, err)
1077         require.Nil(t, cl)
1078 }