]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Track ConnStats with atomics
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "context"
5         "encoding/binary"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "net"
10         "os"
11         "path/filepath"
12         "reflect"
13         "sync"
14         "testing"
15         "time"
16
17         "github.com/anacrolix/dht"
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/bradfitz/iter"
22         "github.com/stretchr/testify/assert"
23         "github.com/stretchr/testify/require"
24         "golang.org/x/time/rate"
25
26         "github.com/anacrolix/torrent/bencode"
27         "github.com/anacrolix/torrent/internal/testutil"
28         "github.com/anacrolix/torrent/iplist"
29         "github.com/anacrolix/torrent/metainfo"
30         "github.com/anacrolix/torrent/storage"
31 )
32
33 func TestingConfig() *Config {
34         return &Config{
35                 ListenHost:              LoopbackListenHost,
36                 NoDHT:                   true,
37                 DataDir:                 tempDir(),
38                 DisableTrackers:         true,
39                 NoDefaultPortForwarding: true,
40                 // Debug:           true,
41         }
42 }
43
44 func TestClientDefault(t *testing.T) {
45         cl, err := NewClient(TestingConfig())
46         require.NoError(t, err)
47         cl.Close()
48 }
49
50 func TestClientNilConfig(t *testing.T) {
51         cl, err := NewClient(nil)
52         require.NoError(t, err)
53         cl.Close()
54 }
55
56 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
57         cfg := TestingConfig()
58         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
59         require.NoError(t, err)
60         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
61         defer ci.Close()
62         cfg.DefaultStorage = ci
63         cl, err := NewClient(cfg)
64         require.NoError(t, err)
65         cl.Close()
66         // And again, https://github.com/anacrolix/torrent/issues/158
67         cl, err = NewClient(cfg)
68         require.NoError(t, err)
69         cl.Close()
70 }
71
72 func TestAddDropTorrent(t *testing.T) {
73         cl, err := NewClient(TestingConfig())
74         require.NoError(t, err)
75         defer cl.Close()
76         dir, mi := testutil.GreetingTestTorrent()
77         defer os.RemoveAll(dir)
78         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
79         require.NoError(t, err)
80         assert.True(t, new)
81         tt.SetMaxEstablishedConns(0)
82         tt.SetMaxEstablishedConns(1)
83         tt.Drop()
84 }
85
86 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
87         // TODO?
88         t.SkipNow()
89 }
90
91 func TestAddTorrentNoUsableURLs(t *testing.T) {
92         // TODO?
93         t.SkipNow()
94 }
95
96 func TestAddPeersToUnknownTorrent(t *testing.T) {
97         // TODO?
98         t.SkipNow()
99 }
100
101 func TestPieceHashSize(t *testing.T) {
102         assert.Equal(t, 20, pieceHash.Size())
103 }
104
105 func TestTorrentInitialState(t *testing.T) {
106         dir, mi := testutil.GreetingTestTorrent()
107         defer os.RemoveAll(dir)
108         cl := &Client{}
109         cl.initLogger()
110         tor := cl.newTorrent(
111                 mi.HashInfoBytes(),
112                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
113         )
114         tor.setChunkSize(2)
115         tor.cl.mu.Lock()
116         err := tor.setInfoBytes(mi.InfoBytes)
117         tor.cl.mu.Unlock()
118         require.NoError(t, err)
119         require.Len(t, tor.pieces, 3)
120         tor.pendAllChunkSpecs(0)
121         tor.cl.mu.Lock()
122         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
123         tor.cl.mu.Unlock()
124         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
125 }
126
127 func TestUnmarshalPEXMsg(t *testing.T) {
128         var m peerExchangeMessage
129         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
130                 t.Fatal(err)
131         }
132         if len(m.Added) != 2 {
133                 t.FailNow()
134         }
135         if m.Added[0].Port != 0x506 {
136                 t.FailNow()
137         }
138 }
139
140 func TestReducedDialTimeout(t *testing.T) {
141         cfg := &Config{}
142         cfg.setDefaults()
143         for _, _case := range []struct {
144                 Max             time.Duration
145                 HalfOpenLimit   int
146                 PendingPeers    int
147                 ExpectedReduced time.Duration
148         }{
149                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
150                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
151                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
152                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
153                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
154                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
155         } {
156                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
157                 expected := _case.ExpectedReduced
158                 if expected < cfg.MinDialTimeout {
159                         expected = cfg.MinDialTimeout
160                 }
161                 if reduced != expected {
162                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
163                 }
164         }
165 }
166
167 func TestUTPRawConn(t *testing.T) {
168         l, err := NewUtpSocket("udp", "")
169         require.NoError(t, err)
170         defer l.Close()
171         go func() {
172                 for {
173                         _, err := l.Accept()
174                         if err != nil {
175                                 break
176                         }
177                 }
178         }()
179         // Connect a UTP peer to see if the RawConn will still work.
180         s, err := NewUtpSocket("udp", "")
181         require.NoError(t, err)
182         defer s.Close()
183         utpPeer, err := s.DialContext(context.Background(), "", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
184         require.NoError(t, err)
185         defer utpPeer.Close()
186         peer, err := net.ListenPacket("udp", ":0")
187         require.NoError(t, err)
188         defer peer.Close()
189
190         msgsReceived := 0
191         // How many messages to send. I've set this to double the channel buffer
192         // size in the raw packetConn.
193         const N = 200
194         readerStopped := make(chan struct{})
195         // The reader goroutine.
196         go func() {
197                 defer close(readerStopped)
198                 b := make([]byte, 500)
199                 for i := 0; i < N; i++ {
200                         n, _, err := l.ReadFrom(b)
201                         require.NoError(t, err)
202                         msgsReceived++
203                         var d int
204                         fmt.Sscan(string(b[:n]), &d)
205                         assert.Equal(t, i, d)
206                 }
207         }()
208         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
209         require.NoError(t, err)
210         for i := 0; i < N; i++ {
211                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
212                 require.NoError(t, err)
213                 time.Sleep(time.Millisecond)
214         }
215         select {
216         case <-readerStopped:
217         case <-time.After(time.Second):
218                 t.Fatal("reader timed out")
219         }
220         if msgsReceived != N {
221                 t.Fatalf("messages received: %d", msgsReceived)
222         }
223 }
224
225 func TestAddDropManyTorrents(t *testing.T) {
226         cl, err := NewClient(TestingConfig())
227         require.NoError(t, err)
228         defer cl.Close()
229         for i := range iter.N(1000) {
230                 var spec TorrentSpec
231                 binary.PutVarint(spec.InfoHash[:], int64(i))
232                 tt, new, err := cl.AddTorrentSpec(&spec)
233                 assert.NoError(t, err)
234                 assert.True(t, new)
235                 defer tt.Drop()
236         }
237 }
238
239 type FileCacheClientStorageFactoryParams struct {
240         Capacity    int64
241         SetCapacity bool
242         Wrapper     func(*filecache.Cache) storage.ClientImpl
243 }
244
245 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
246         return func(dataDir string) storage.ClientImpl {
247                 fc, err := filecache.NewCache(dataDir)
248                 if err != nil {
249                         panic(err)
250                 }
251                 if ps.SetCapacity {
252                         fc.SetCapacity(ps.Capacity)
253                 }
254                 return ps.Wrapper(fc)
255         }
256 }
257
258 type storageFactory func(string) storage.ClientImpl
259
260 func TestClientTransferDefault(t *testing.T) {
261         testClientTransfer(t, testClientTransferParams{
262                 ExportClientStatus: true,
263                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
264                         Wrapper: fileCachePieceResourceStorage,
265                 }),
266         })
267 }
268
269 func TestClientTransferRateLimitedUpload(t *testing.T) {
270         started := time.Now()
271         testClientTransfer(t, testClientTransferParams{
272                 // We are uploading 13 bytes (the length of the greeting torrent). The
273                 // chunks are 2 bytes in length. Then the smallest burst we can run
274                 // with is 2. Time taken is (13-burst)/rate.
275                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
276                 ExportClientStatus:      true,
277         })
278         require.True(t, time.Since(started) > time.Second)
279 }
280
281 func TestClientTransferRateLimitedDownload(t *testing.T) {
282         testClientTransfer(t, testClientTransferParams{
283                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
284         })
285 }
286
287 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
288         return storage.NewResourcePieces(fc.AsResourceProvider())
289 }
290
291 func TestClientTransferSmallCache(t *testing.T) {
292         testClientTransfer(t, testClientTransferParams{
293                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
294                         SetCapacity: true,
295                         // Going below the piece length means it can't complete a piece so
296                         // that it can be hashed.
297                         Capacity: 5,
298                         Wrapper:  fileCachePieceResourceStorage,
299                 }),
300                 SetReadahead: true,
301                 // Can't readahead too far or the cache will thrash and drop data we
302                 // thought we had.
303                 Readahead:          0,
304                 ExportClientStatus: true,
305         })
306 }
307
308 func TestClientTransferVarious(t *testing.T) {
309         // Leecher storage
310         for _, ls := range []storageFactory{
311                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
312                         Wrapper: fileCachePieceResourceStorage,
313                 }),
314                 storage.NewBoltDB,
315         } {
316                 // Seeder storage
317                 for _, ss := range []func(string) storage.ClientImpl{
318                         storage.NewFile,
319                         storage.NewMMap,
320                 } {
321                         for _, responsive := range []bool{false, true} {
322                                 testClientTransfer(t, testClientTransferParams{
323                                         Responsive:     responsive,
324                                         SeederStorage:  ss,
325                                         LeecherStorage: ls,
326                                 })
327                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
328                                         testClientTransfer(t, testClientTransferParams{
329                                                 SeederStorage:  ss,
330                                                 Responsive:     responsive,
331                                                 SetReadahead:   true,
332                                                 Readahead:      readahead,
333                                                 LeecherStorage: ls,
334                                         })
335                                 }
336                         }
337                 }
338         }
339 }
340
341 type testClientTransferParams struct {
342         Responsive                 bool
343         Readahead                  int64
344         SetReadahead               bool
345         ExportClientStatus         bool
346         LeecherStorage             func(string) storage.ClientImpl
347         SeederStorage              func(string) storage.ClientImpl
348         SeederUploadRateLimiter    *rate.Limiter
349         LeecherDownloadRateLimiter *rate.Limiter
350 }
351
352 // Creates a seeder and a leecher, and ensures the data transfers when a read
353 // is attempted on the leecher.
354 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
355         greetingTempDir, mi := testutil.GreetingTestTorrent()
356         defer os.RemoveAll(greetingTempDir)
357         // Create seeder and a Torrent.
358         cfg := TestingConfig()
359         cfg.Seed = true
360         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
361         // cfg.ListenAddr = "localhost:4000"
362         if ps.SeederStorage != nil {
363                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
364                 defer cfg.DefaultStorage.Close()
365         } else {
366                 cfg.DataDir = greetingTempDir
367         }
368         seeder, err := NewClient(cfg)
369         require.NoError(t, err)
370         if ps.ExportClientStatus {
371                 testutil.ExportStatusWriter(seeder, "s")
372         }
373         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
374         // Run a Stats right after Closing the Client. This will trigger the Stats
375         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
376         defer seederTorrent.Stats()
377         defer seeder.Close()
378         seederTorrent.VerifyData()
379         // Create leecher and a Torrent.
380         leecherDataDir, err := ioutil.TempDir("", "")
381         require.NoError(t, err)
382         defer os.RemoveAll(leecherDataDir)
383         if ps.LeecherStorage == nil {
384                 cfg.DataDir = leecherDataDir
385         } else {
386                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
387         }
388         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
389         cfg.Seed = false
390         leecher, err := NewClient(cfg)
391         require.NoError(t, err)
392         defer leecher.Close()
393         if ps.ExportClientStatus {
394                 testutil.ExportStatusWriter(leecher, "l")
395         }
396         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
397                 ret = TorrentSpecFromMetaInfo(mi)
398                 ret.ChunkSize = 2
399                 return
400         }())
401         require.NoError(t, err)
402         assert.True(t, new)
403         // Now do some things with leecher and seeder.
404         leecherTorrent.AddClientPeer(seeder)
405         // The Torrent should not be interested in obtaining peers, so the one we
406         // just added should be the only one.
407         assert.False(t, leecherTorrent.Seeding())
408         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
409         r := leecherTorrent.NewReader()
410         defer r.Close()
411         if ps.Responsive {
412                 r.SetResponsive()
413         }
414         if ps.SetReadahead {
415                 r.SetReadahead(ps.Readahead)
416         }
417         assertReadAllGreeting(t, r)
418
419         seederStats := seederTorrent.Stats()
420         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
421         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
422
423         leecherStats := leecherTorrent.Stats()
424         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
425         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
426
427         // Try reading through again for the cases where the torrent data size
428         // exceeds the size of the cache.
429         assertReadAllGreeting(t, r)
430 }
431
432 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
433         pos, err := r.Seek(0, io.SeekStart)
434         assert.NoError(t, err)
435         assert.EqualValues(t, 0, pos)
436         _greeting, err := ioutil.ReadAll(r)
437         assert.NoError(t, err)
438         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
439 }
440
441 // Check that after completing leeching, a leecher transitions to a seeding
442 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
443 func TestSeedAfterDownloading(t *testing.T) {
444         greetingTempDir, mi := testutil.GreetingTestTorrent()
445         defer os.RemoveAll(greetingTempDir)
446         cfg := TestingConfig()
447         cfg.Seed = true
448         cfg.DataDir = greetingTempDir
449         seeder, err := NewClient(cfg)
450         require.NoError(t, err)
451         defer seeder.Close()
452         testutil.ExportStatusWriter(seeder, "s")
453         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
454         require.NoError(t, err)
455         assert.True(t, ok)
456         seederTorrent.VerifyData()
457         cfg.DataDir, err = ioutil.TempDir("", "")
458         require.NoError(t, err)
459         defer os.RemoveAll(cfg.DataDir)
460         leecher, err := NewClient(cfg)
461         require.NoError(t, err)
462         defer leecher.Close()
463         testutil.ExportStatusWriter(leecher, "l")
464         cfg.Seed = false
465         cfg.DataDir, err = ioutil.TempDir("", "")
466         require.NoError(t, err)
467         defer os.RemoveAll(cfg.DataDir)
468         leecherLeecher, _ := NewClient(cfg)
469         require.NoError(t, err)
470         defer leecherLeecher.Close()
471         testutil.ExportStatusWriter(leecherLeecher, "ll")
472         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
473                 ret = TorrentSpecFromMetaInfo(mi)
474                 ret.ChunkSize = 2
475                 return
476         }())
477         require.NoError(t, err)
478         assert.True(t, ok)
479         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
480                 ret = TorrentSpecFromMetaInfo(mi)
481                 ret.ChunkSize = 3
482                 return
483         }())
484         require.NoError(t, err)
485         assert.True(t, ok)
486         // Simultaneously DownloadAll in Leecher, and read the contents
487         // consecutively in LeecherLeecher. This non-deterministically triggered a
488         // case where the leecher wouldn't unchoke the LeecherLeecher.
489         var wg sync.WaitGroup
490         wg.Add(1)
491         go func() {
492                 defer wg.Done()
493                 r := llg.NewReader()
494                 defer r.Close()
495                 b, err := ioutil.ReadAll(r)
496                 require.NoError(t, err)
497                 assert.EqualValues(t, testutil.GreetingFileContents, b)
498         }()
499         done := make(chan struct{})
500         defer close(done)
501         go func() {
502                 for {
503                         go leecherGreeting.AddClientPeer(seeder)
504                         go leecherGreeting.AddClientPeer(leecherLeecher)
505                         select {
506                         case <-done:
507                                 return
508                         case <-time.After(time.Second):
509                         }
510                 }
511         }()
512         wg.Add(1)
513         go func() {
514                 defer wg.Done()
515                 leecherGreeting.DownloadAll()
516                 leecher.WaitAll()
517         }()
518         wg.Wait()
519 }
520
521 func TestMergingTrackersByAddingSpecs(t *testing.T) {
522         cl, err := NewClient(TestingConfig())
523         require.NoError(t, err)
524         defer cl.Close()
525         spec := TorrentSpec{}
526         T, new, _ := cl.AddTorrentSpec(&spec)
527         if !new {
528                 t.FailNow()
529         }
530         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
531         _, new, _ = cl.AddTorrentSpec(&spec)
532         assert.False(t, new)
533         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
534         // Because trackers are disabled in TestingConfig.
535         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
536 }
537
538 // We read from a piece which is marked completed, but is missing data.
539 func TestCompletedPieceWrongSize(t *testing.T) {
540         cfg := TestingConfig()
541         cfg.DefaultStorage = badStorage{}
542         cl, err := NewClient(cfg)
543         require.NoError(t, err)
544         defer cl.Close()
545         info := metainfo.Info{
546                 PieceLength: 15,
547                 Pieces:      make([]byte, 20),
548                 Files: []metainfo.FileInfo{
549                         {Path: []string{"greeting"}, Length: 13},
550                 },
551         }
552         b, err := bencode.Marshal(info)
553         require.NoError(t, err)
554         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
555                 InfoBytes: b,
556                 InfoHash:  metainfo.HashBytes(b),
557         })
558         require.NoError(t, err)
559         defer tt.Drop()
560         assert.True(t, new)
561         r := tt.NewReader()
562         defer r.Close()
563         b, err = ioutil.ReadAll(r)
564         assert.Len(t, b, 13)
565         assert.NoError(t, err)
566 }
567
568 func BenchmarkAddLargeTorrent(b *testing.B) {
569         cfg := TestingConfig()
570         cfg.DisableTCP = true
571         cfg.DisableUTP = true
572         cfg.ListenHost = func(string) string { return "redonk" }
573         cl, err := NewClient(cfg)
574         require.NoError(b, err)
575         defer cl.Close()
576         for range iter.N(b.N) {
577                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
578                 if err != nil {
579                         b.Fatal(err)
580                 }
581                 t.Drop()
582         }
583 }
584
585 func TestResponsive(t *testing.T) {
586         seederDataDir, mi := testutil.GreetingTestTorrent()
587         defer os.RemoveAll(seederDataDir)
588         cfg := TestingConfig()
589         cfg.Seed = true
590         cfg.DataDir = seederDataDir
591         seeder, err := NewClient(cfg)
592         require.Nil(t, err)
593         defer seeder.Close()
594         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
595         seederTorrent.VerifyData()
596         leecherDataDir, err := ioutil.TempDir("", "")
597         require.Nil(t, err)
598         defer os.RemoveAll(leecherDataDir)
599         cfg = TestingConfig()
600         cfg.DataDir = leecherDataDir
601         leecher, err := NewClient(cfg)
602         require.Nil(t, err)
603         defer leecher.Close()
604         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
605                 ret = TorrentSpecFromMetaInfo(mi)
606                 ret.ChunkSize = 2
607                 return
608         }())
609         leecherTorrent.AddClientPeer(seeder)
610         reader := leecherTorrent.NewReader()
611         defer reader.Close()
612         reader.SetReadahead(0)
613         reader.SetResponsive()
614         b := make([]byte, 2)
615         _, err = reader.Seek(3, io.SeekStart)
616         require.NoError(t, err)
617         _, err = io.ReadFull(reader, b)
618         assert.Nil(t, err)
619         assert.EqualValues(t, "lo", string(b))
620         _, err = reader.Seek(11, io.SeekStart)
621         require.NoError(t, err)
622         n, err := io.ReadFull(reader, b)
623         assert.Nil(t, err)
624         assert.EqualValues(t, 2, n)
625         assert.EqualValues(t, "d\n", string(b))
626 }
627
628 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
629         seederDataDir, mi := testutil.GreetingTestTorrent()
630         defer os.RemoveAll(seederDataDir)
631         cfg := TestingConfig()
632         cfg.Seed = true
633         cfg.DataDir = seederDataDir
634         seeder, err := NewClient(cfg)
635         require.Nil(t, err)
636         defer seeder.Close()
637         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
638         seederTorrent.VerifyData()
639         leecherDataDir, err := ioutil.TempDir("", "")
640         require.Nil(t, err)
641         defer os.RemoveAll(leecherDataDir)
642         cfg = TestingConfig()
643         cfg.DataDir = leecherDataDir
644         leecher, err := NewClient(cfg)
645         require.Nil(t, err)
646         defer leecher.Close()
647         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
648                 ret = TorrentSpecFromMetaInfo(mi)
649                 ret.ChunkSize = 2
650                 return
651         }())
652         leecherTorrent.AddClientPeer(seeder)
653         reader := leecherTorrent.NewReader()
654         defer reader.Close()
655         reader.SetReadahead(0)
656         reader.SetResponsive()
657         b := make([]byte, 2)
658         _, err = reader.Seek(3, io.SeekStart)
659         require.NoError(t, err)
660         _, err = io.ReadFull(reader, b)
661         assert.Nil(t, err)
662         assert.EqualValues(t, "lo", string(b))
663         go leecherTorrent.Drop()
664         _, err = reader.Seek(11, io.SeekStart)
665         require.NoError(t, err)
666         n, err := reader.Read(b)
667         assert.EqualError(t, err, "torrent closed")
668         assert.EqualValues(t, 0, n)
669 }
670
671 func TestDHTInheritBlocklist(t *testing.T) {
672         ipl := iplist.New(nil)
673         require.NotNil(t, ipl)
674         cfg := TestingConfig()
675         cfg.IPBlocklist = ipl
676         cfg.NoDHT = false
677         cl, err := NewClient(cfg)
678         require.NoError(t, err)
679         defer cl.Close()
680         numServers := 0
681         cl.eachDhtServer(func(s *dht.Server) {
682                 assert.Equal(t, ipl, s.IPBlocklist())
683                 numServers++
684         })
685         assert.EqualValues(t, 2, numServers)
686 }
687
688 // Check that stuff is merged in subsequent AddTorrentSpec for the same
689 // infohash.
690 func TestAddTorrentSpecMerging(t *testing.T) {
691         cl, err := NewClient(TestingConfig())
692         require.NoError(t, err)
693         defer cl.Close()
694         dir, mi := testutil.GreetingTestTorrent()
695         defer os.RemoveAll(dir)
696         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
697                 InfoHash: mi.HashInfoBytes(),
698         })
699         require.NoError(t, err)
700         require.True(t, new)
701         require.Nil(t, tt.Info())
702         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
703         require.NoError(t, err)
704         require.False(t, new)
705         require.NotNil(t, tt.Info())
706 }
707
708 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
709         dir, mi := testutil.GreetingTestTorrent()
710         os.RemoveAll(dir)
711         cl, _ := NewClient(TestingConfig())
712         defer cl.Close()
713         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
714                 InfoHash: mi.HashInfoBytes(),
715         })
716         tt.Drop()
717         assert.EqualValues(t, 0, len(cl.Torrents()))
718         select {
719         case <-tt.GotInfo():
720                 t.FailNow()
721         default:
722         }
723 }
724
725 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
726         for i := range iter.N(info.NumPieces()) {
727                 p := info.Piece(i)
728                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
729         }
730 }
731
732 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
733         fileCacheDir, err := ioutil.TempDir("", "")
734         require.NoError(t, err)
735         defer os.RemoveAll(fileCacheDir)
736         fileCache, err := filecache.NewCache(fileCacheDir)
737         require.NoError(t, err)
738         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
739         defer os.RemoveAll(greetingDataTempDir)
740         filePieceStore := csf(fileCache)
741         defer filePieceStore.Close()
742         info, err := greetingMetainfo.UnmarshalInfo()
743         require.NoError(t, err)
744         ih := greetingMetainfo.HashInfoBytes()
745         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
746         require.NoError(t, err)
747         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
748         // require.Equal(t, len(testutil.GreetingFileContents), written)
749         // require.NoError(t, err)
750         for i := 0; i < info.NumPieces(); i++ {
751                 p := info.Piece(i)
752                 if alreadyCompleted {
753                         require.NoError(t, greetingData.Piece(p).MarkComplete())
754                 }
755         }
756         cfg := TestingConfig()
757         // TODO: Disable network option?
758         cfg.DisableTCP = true
759         cfg.DisableUTP = true
760         cfg.DefaultStorage = filePieceStore
761         cl, err := NewClient(cfg)
762         require.NoError(t, err)
763         defer cl.Close()
764         tt, err := cl.AddTorrent(greetingMetainfo)
765         require.NoError(t, err)
766         psrs := tt.PieceStateRuns()
767         assert.Len(t, psrs, 1)
768         assert.EqualValues(t, 3, psrs[0].Length)
769         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
770         if alreadyCompleted {
771                 r := tt.NewReader()
772                 b, err := ioutil.ReadAll(r)
773                 assert.NoError(t, err)
774                 assert.EqualValues(t, testutil.GreetingFileContents, b)
775         }
776 }
777
778 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
779         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
780 }
781
782 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
783         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
784 }
785
786 func TestAddMetainfoWithNodes(t *testing.T) {
787         cfg := TestingConfig()
788         cfg.ListenHost = func(string) string { return "" }
789         cfg.NoDHT = false
790         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
791         // For now, we want to just jam the nodes into the table, without
792         // verifying them first. Also the DHT code doesn't support mixing secure
793         // and insecure nodes if security is enabled (yet).
794         // cfg.DHTConfig.NoSecurity = true
795         cl, err := NewClient(cfg)
796         require.NoError(t, err)
797         defer cl.Close()
798         sum := func() (ret int64) {
799                 cl.eachDhtServer(func(s *dht.Server) {
800                         ret += s.Stats().OutboundQueriesAttempted
801                 })
802                 return
803         }
804         assert.EqualValues(t, 0, sum())
805         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
806         require.NoError(t, err)
807         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
808         // check if the announce-list is here instead. TODO: Add nodes.
809         assert.Len(t, tt.metainfo.AnnounceList, 5)
810         // There are 6 nodes in the torrent file.
811         assert.EqualValues(t, 6*len(cl.dhtServers), sum())
812 }
813
814 type testDownloadCancelParams struct {
815         SetLeecherStorageCapacity bool
816         LeecherStorageCapacity    int64
817         Cancel                    bool
818 }
819
820 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
821         greetingTempDir, mi := testutil.GreetingTestTorrent()
822         defer os.RemoveAll(greetingTempDir)
823         cfg := TestingConfig()
824         cfg.Seed = true
825         cfg.DataDir = greetingTempDir
826         seeder, err := NewClient(cfg)
827         require.NoError(t, err)
828         defer seeder.Close()
829         testutil.ExportStatusWriter(seeder, "s")
830         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
831         seederTorrent.VerifyData()
832         leecherDataDir, err := ioutil.TempDir("", "")
833         require.NoError(t, err)
834         defer os.RemoveAll(leecherDataDir)
835         fc, err := filecache.NewCache(leecherDataDir)
836         require.NoError(t, err)
837         if ps.SetLeecherStorageCapacity {
838                 fc.SetCapacity(ps.LeecherStorageCapacity)
839         }
840         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
841         cfg.DataDir = leecherDataDir
842         leecher, _ := NewClient(cfg)
843         defer leecher.Close()
844         testutil.ExportStatusWriter(leecher, "l")
845         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
846                 ret = TorrentSpecFromMetaInfo(mi)
847                 ret.ChunkSize = 2
848                 return
849         }())
850         require.NoError(t, err)
851         assert.True(t, new)
852         psc := leecherGreeting.SubscribePieceStateChanges()
853         defer psc.Close()
854
855         leecherGreeting.cl.mu.Lock()
856         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
857         if ps.Cancel {
858                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
859         }
860         leecherGreeting.cl.mu.Unlock()
861         done := make(chan struct{})
862         defer close(done)
863         go func() {
864                 for {
865                         leecherGreeting.AddClientPeer(seeder)
866                         select {
867                         case <-done:
868                                 return
869                         case <-time.After(time.Second):
870                         }
871                 }
872         }()
873         completes := make(map[int]bool, 3)
874         expected := func() map[int]bool {
875                 if ps.Cancel {
876                         return map[int]bool{0: false, 1: false, 2: false}
877                 } else {
878                         return map[int]bool{0: true, 1: true, 2: true}
879                 }
880         }()
881         for !reflect.DeepEqual(completes, expected) {
882                 select {
883                 case _v := <-psc.Values:
884                         v := _v.(PieceStateChange)
885                         completes[v.Index] = v.Complete
886                 }
887         }
888 }
889
890 func TestTorrentDownloadAll(t *testing.T) {
891         testDownloadCancel(t, testDownloadCancelParams{})
892 }
893
894 func TestTorrentDownloadAllThenCancel(t *testing.T) {
895         testDownloadCancel(t, testDownloadCancelParams{
896                 Cancel: true,
897         })
898 }
899
900 // Ensure that it's an error for a peer to send an invalid have message.
901 func TestPeerInvalidHave(t *testing.T) {
902         cl, err := NewClient(TestingConfig())
903         require.NoError(t, err)
904         defer cl.Close()
905         info := metainfo.Info{
906                 PieceLength: 1,
907                 Pieces:      make([]byte, 20),
908                 Files:       []metainfo.FileInfo{{Length: 1}},
909         }
910         infoBytes, err := bencode.Marshal(info)
911         require.NoError(t, err)
912         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
913                 InfoBytes: infoBytes,
914                 InfoHash:  metainfo.HashBytes(infoBytes),
915                 Storage:   badStorage{},
916         })
917         require.NoError(t, err)
918         assert.True(t, _new)
919         defer tt.Drop()
920         cn := &connection{
921                 t: tt,
922         }
923         assert.NoError(t, cn.peerSentHave(0))
924         assert.Error(t, cn.peerSentHave(1))
925 }
926
927 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
928         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
929         defer os.RemoveAll(greetingTempDir)
930         cfg := TestingConfig()
931         cfg.DataDir = greetingTempDir
932         seeder, err := NewClient(TestingConfig())
933         require.NoError(t, err)
934         seeder.AddTorrentSpec(&TorrentSpec{
935                 InfoBytes: greetingMetainfo.InfoBytes,
936         })
937 }
938
939 // Check that when the listen port is 0, all the protocols listened on have
940 // the same port, and it isn't zero.
941 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
942         cl, err := NewClient(TestingConfig())
943         require.NoError(t, err)
944         defer cl.Close()
945         port := cl.LocalPort()
946         assert.NotEqual(t, 0, port)
947         cl.eachListener(func(s socket) bool {
948                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
949                 return true
950         })
951 }
952
953 func TestClientDynamicListenTCPOnly(t *testing.T) {
954         cfg := TestingConfig()
955         cfg.DisableUTP = true
956         cl, err := NewClient(cfg)
957         require.NoError(t, err)
958         defer cl.Close()
959         assert.NotEqual(t, 0, cl.LocalPort())
960         cl.eachListener(func(s socket) bool {
961                 assert.True(t, isTcpNetwork(s.Addr().Network()))
962                 return true
963         })
964 }
965
966 func TestClientDynamicListenUTPOnly(t *testing.T) {
967         cfg := TestingConfig()
968         cfg.DisableTCP = true
969         cl, err := NewClient(cfg)
970         require.NoError(t, err)
971         defer cl.Close()
972         assert.NotEqual(t, 0, cl.LocalPort())
973         cl.eachListener(func(s socket) bool {
974                 assert.True(t, isUtpNetwork(s.Addr().Network()))
975                 return true
976         })
977 }
978
979 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
980         cfg := TestingConfig()
981         cfg.DisableTCP = true
982         cfg.DisableUTP = true
983         cl, err := NewClient(cfg)
984         require.NoError(t, err)
985         defer cl.Close()
986         assert.Equal(t, 0, cl.LocalPort())
987 }
988
989 func totalConns(tts []*Torrent) (ret int) {
990         for _, tt := range tts {
991                 tt.cl.mu.Lock()
992                 ret += len(tt.conns)
993                 tt.cl.mu.Unlock()
994         }
995         return
996 }
997
998 func TestSetMaxEstablishedConn(t *testing.T) {
999         ss := testutil.NewStatusServer(t)
1000         defer ss.Close()
1001         var tts []*Torrent
1002         ih := testutil.GreetingMetaInfo().HashInfoBytes()
1003         for i := range iter.N(3) {
1004                 cl, err := NewClient(TestingConfig())
1005                 require.NoError(t, err)
1006                 defer cl.Close()
1007                 tt, _ := cl.AddTorrentInfoHash(ih)
1008                 tt.SetMaxEstablishedConns(2)
1009                 ss.HandleStatusWriter(cl, fmt.Sprintf("/%d", i))
1010                 tts = append(tts, tt)
1011         }
1012         addPeers := func() {
1013                 for _, tt := range tts {
1014                         for _, _tt := range tts {
1015                                 // if tt != _tt {
1016                                 tt.AddClientPeer(_tt.cl)
1017                                 // }
1018                         }
1019                 }
1020         }
1021         waitTotalConns := func(num int) {
1022                 for totalConns(tts) != num {
1023                         addPeers()
1024                         time.Sleep(time.Millisecond)
1025                 }
1026         }
1027         addPeers()
1028         waitTotalConns(6)
1029         tts[0].SetMaxEstablishedConns(1)
1030         waitTotalConns(4)
1031         tts[0].SetMaxEstablishedConns(0)
1032         waitTotalConns(2)
1033         tts[0].SetMaxEstablishedConns(1)
1034         addPeers()
1035         waitTotalConns(4)
1036         tts[0].SetMaxEstablishedConns(2)
1037         addPeers()
1038         waitTotalConns(6)
1039 }
1040
1041 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1042         os.MkdirAll(dir, 0770)
1043         file, err := os.Create(filepath.Join(dir, name))
1044         require.NoError(t, err)
1045         file.Write([]byte(name))
1046         file.Close()
1047         mi := metainfo.MetaInfo{}
1048         mi.SetDefaults()
1049         info := metainfo.Info{PieceLength: 256 * 1024}
1050         err = info.BuildFromFilePath(filepath.Join(dir, name))
1051         require.NoError(t, err)
1052         mi.InfoBytes, err = bencode.Marshal(info)
1053         require.NoError(t, err)
1054         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1055         tr, err := cl.AddTorrent(&mi)
1056         require.NoError(t, err)
1057         require.True(t, tr.Seeding())
1058         tr.VerifyData()
1059         return magnet
1060 }
1061
1062 // https://github.com/anacrolix/torrent/issues/114
1063 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1064         cfg := TestingConfig()
1065         cfg.DisableUTP = true
1066         cfg.Seed = true
1067         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1068         cfg.ForceEncryption = true
1069         os.Mkdir(cfg.DataDir, 0755)
1070         server, err := NewClient(cfg)
1071         require.NoError(t, err)
1072         defer server.Close()
1073         testutil.ExportStatusWriter(server, "s")
1074         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1075         makeMagnet(t, server, cfg.DataDir, "test2")
1076         cfg = TestingConfig()
1077         cfg.DisableUTP = true
1078         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1079         cfg.ForceEncryption = true
1080         client, err := NewClient(cfg)
1081         require.NoError(t, err)
1082         defer client.Close()
1083         testutil.ExportStatusWriter(client, "c")
1084         tr, err := client.AddMagnet(magnet1)
1085         require.NoError(t, err)
1086         tr.AddClientPeer(server)
1087         <-tr.GotInfo()
1088         tr.DownloadAll()
1089         client.WaitAll()
1090 }
1091
1092 func TestClientAddressInUse(t *testing.T) {
1093         s, _ := NewUtpSocket("udp", ":50007")
1094         if s != nil {
1095                 defer s.Close()
1096         }
1097         cfg := TestingConfig().SetListenAddr(":50007")
1098         cl, err := NewClient(cfg)
1099         require.Error(t, err)
1100         require.Nil(t, cl)
1101 }