]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Add support for disabling IPv4 and IPv4 peers
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "context"
5         "encoding/binary"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "net"
10         "os"
11         "path/filepath"
12         "sync"
13         "testing"
14         "time"
15
16         _ "github.com/anacrolix/envpprof"
17         "github.com/anacrolix/missinggo"
18         "github.com/anacrolix/missinggo/filecache"
19         "github.com/bradfitz/iter"
20         "github.com/stretchr/testify/assert"
21         "github.com/stretchr/testify/require"
22         "golang.org/x/time/rate"
23
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/internal/testutil"
26         "github.com/anacrolix/torrent/iplist"
27         "github.com/anacrolix/torrent/metainfo"
28         "github.com/anacrolix/torrent/storage"
29 )
30
31 func TestingConfig() *Config {
32         return &Config{
33                 ListenAddr:              "localhost:0",
34                 NoDHT:                   true,
35                 DataDir:                 tempDir(),
36                 DisableTrackers:         true,
37                 NoDefaultPortForwarding: true,
38                 // Debug:           true,
39         }
40 }
41
42 func TestClientDefault(t *testing.T) {
43         cl, err := NewClient(TestingConfig())
44         require.NoError(t, err)
45         cl.Close()
46 }
47
48 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
49         cfg := TestingConfig()
50         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
51         require.NoError(t, err)
52         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
53         defer ci.Close()
54         cfg.DefaultStorage = ci
55         cl, err := NewClient(cfg)
56         require.NoError(t, err)
57         cl.Close()
58         // And again, https://github.com/anacrolix/torrent/issues/158
59         cl, err = NewClient(cfg)
60         require.NoError(t, err)
61         cl.Close()
62 }
63
64 func TestAddDropTorrent(t *testing.T) {
65         cl, err := NewClient(TestingConfig())
66         require.NoError(t, err)
67         defer cl.Close()
68         dir, mi := testutil.GreetingTestTorrent()
69         defer os.RemoveAll(dir)
70         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
71         require.NoError(t, err)
72         assert.True(t, new)
73         tt.SetMaxEstablishedConns(0)
74         tt.SetMaxEstablishedConns(1)
75         tt.Drop()
76 }
77
78 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
79         // TODO?
80         t.SkipNow()
81 }
82
83 func TestAddTorrentNoUsableURLs(t *testing.T) {
84         // TODO?
85         t.SkipNow()
86 }
87
88 func TestAddPeersToUnknownTorrent(t *testing.T) {
89         // TODO?
90         t.SkipNow()
91 }
92
93 func TestPieceHashSize(t *testing.T) {
94         assert.Equal(t, 20, pieceHash.Size())
95 }
96
97 func TestTorrentInitialState(t *testing.T) {
98         dir, mi := testutil.GreetingTestTorrent()
99         defer os.RemoveAll(dir)
100         cl := &Client{}
101         cl.initLogger()
102         tor := cl.newTorrent(
103                 mi.HashInfoBytes(),
104                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
105         )
106         tor.setChunkSize(2)
107         tor.cl.mu.Lock()
108         err := tor.setInfoBytes(mi.InfoBytes)
109         tor.cl.mu.Unlock()
110         require.NoError(t, err)
111         require.Len(t, tor.pieces, 3)
112         tor.pendAllChunkSpecs(0)
113         tor.cl.mu.Lock()
114         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
115         tor.cl.mu.Unlock()
116         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
117 }
118
119 func TestUnmarshalPEXMsg(t *testing.T) {
120         var m peerExchangeMessage
121         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
122                 t.Fatal(err)
123         }
124         if len(m.Added) != 2 {
125                 t.FailNow()
126         }
127         if m.Added[0].Port != 0x506 {
128                 t.FailNow()
129         }
130 }
131
132 func TestReducedDialTimeout(t *testing.T) {
133         cfg := &Config{}
134         cfg.setDefaults()
135         for _, _case := range []struct {
136                 Max             time.Duration
137                 HalfOpenLimit   int
138                 PendingPeers    int
139                 ExpectedReduced time.Duration
140         }{
141                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
142                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
143                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
144                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
145                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
146                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
147         } {
148                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
149                 expected := _case.ExpectedReduced
150                 if expected < cfg.MinDialTimeout {
151                         expected = cfg.MinDialTimeout
152                 }
153                 if reduced != expected {
154                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
155                 }
156         }
157 }
158
159 func TestUTPRawConn(t *testing.T) {
160         l, err := NewUtpSocket("udp", "")
161         require.NoError(t, err)
162         defer l.Close()
163         go func() {
164                 for {
165                         _, err := l.Accept()
166                         if err != nil {
167                                 break
168                         }
169                 }
170         }()
171         // Connect a UTP peer to see if the RawConn will still work.
172         s, err := NewUtpSocket("udp", "")
173         require.NoError(t, err)
174         defer s.Close()
175         utpPeer, err := s.DialContext(context.Background(), "", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
176         require.NoError(t, err)
177         defer utpPeer.Close()
178         peer, err := net.ListenPacket("udp", ":0")
179         require.NoError(t, err)
180         defer peer.Close()
181
182         msgsReceived := 0
183         // How many messages to send. I've set this to double the channel buffer
184         // size in the raw packetConn.
185         const N = 200
186         readerStopped := make(chan struct{})
187         // The reader goroutine.
188         go func() {
189                 defer close(readerStopped)
190                 b := make([]byte, 500)
191                 for i := 0; i < N; i++ {
192                         n, _, err := l.ReadFrom(b)
193                         require.NoError(t, err)
194                         msgsReceived++
195                         var d int
196                         fmt.Sscan(string(b[:n]), &d)
197                         assert.Equal(t, i, d)
198                 }
199         }()
200         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
201         require.NoError(t, err)
202         for i := 0; i < N; i++ {
203                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
204                 require.NoError(t, err)
205                 time.Sleep(time.Millisecond)
206         }
207         select {
208         case <-readerStopped:
209         case <-time.After(time.Second):
210                 t.Fatal("reader timed out")
211         }
212         if msgsReceived != N {
213                 t.Fatalf("messages received: %d", msgsReceived)
214         }
215 }
216
217 func TestTwoClientsArbitraryPorts(t *testing.T) {
218         for i := 0; i < 2; i++ {
219                 cl, err := NewClient(TestingConfig())
220                 if err != nil {
221                         t.Fatal(err)
222                 }
223                 defer cl.Close()
224         }
225 }
226
227 func TestAddDropManyTorrents(t *testing.T) {
228         cl, err := NewClient(TestingConfig())
229         require.NoError(t, err)
230         defer cl.Close()
231         for i := range iter.N(1000) {
232                 var spec TorrentSpec
233                 binary.PutVarint(spec.InfoHash[:], int64(i))
234                 tt, new, err := cl.AddTorrentSpec(&spec)
235                 assert.NoError(t, err)
236                 assert.True(t, new)
237                 defer tt.Drop()
238         }
239 }
240
241 type FileCacheClientStorageFactoryParams struct {
242         Capacity    int64
243         SetCapacity bool
244         Wrapper     func(*filecache.Cache) storage.ClientImpl
245 }
246
247 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
248         return func(dataDir string) storage.ClientImpl {
249                 fc, err := filecache.NewCache(dataDir)
250                 if err != nil {
251                         panic(err)
252                 }
253                 if ps.SetCapacity {
254                         fc.SetCapacity(ps.Capacity)
255                 }
256                 return ps.Wrapper(fc)
257         }
258 }
259
260 type storageFactory func(string) storage.ClientImpl
261
262 func TestClientTransferDefault(t *testing.T) {
263         testClientTransfer(t, testClientTransferParams{
264                 ExportClientStatus: true,
265                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
266                         Wrapper: fileCachePieceResourceStorage,
267                 }),
268         })
269 }
270
271 func TestClientTransferRateLimitedUpload(t *testing.T) {
272         started := time.Now()
273         testClientTransfer(t, testClientTransferParams{
274                 // We are uploading 13 bytes (the length of the greeting torrent). The
275                 // chunks are 2 bytes in length. Then the smallest burst we can run
276                 // with is 2. Time taken is (13-burst)/rate.
277                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
278                 ExportClientStatus:      true,
279         })
280         require.True(t, time.Since(started) > time.Second)
281 }
282
283 func TestClientTransferRateLimitedDownload(t *testing.T) {
284         testClientTransfer(t, testClientTransferParams{
285                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
286         })
287 }
288
289 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
290         return storage.NewResourcePieces(fc.AsResourceProvider())
291 }
292
293 func TestClientTransferSmallCache(t *testing.T) {
294         testClientTransfer(t, testClientTransferParams{
295                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
296                         SetCapacity: true,
297                         // Going below the piece length means it can't complete a piece so
298                         // that it can be hashed.
299                         Capacity: 5,
300                         Wrapper:  fileCachePieceResourceStorage,
301                 }),
302                 SetReadahead: true,
303                 // Can't readahead too far or the cache will thrash and drop data we
304                 // thought we had.
305                 Readahead:          0,
306                 ExportClientStatus: true,
307         })
308 }
309
310 func TestClientTransferVarious(t *testing.T) {
311         // Leecher storage
312         for _, ls := range []storageFactory{
313                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
314                         Wrapper: fileCachePieceResourceStorage,
315                 }),
316                 storage.NewBoltDB,
317         } {
318                 // Seeder storage
319                 for _, ss := range []func(string) storage.ClientImpl{
320                         storage.NewFile,
321                         storage.NewMMap,
322                 } {
323                         for _, responsive := range []bool{false, true} {
324                                 testClientTransfer(t, testClientTransferParams{
325                                         Responsive:     responsive,
326                                         SeederStorage:  ss,
327                                         LeecherStorage: ls,
328                                 })
329                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
330                                         testClientTransfer(t, testClientTransferParams{
331                                                 SeederStorage:  ss,
332                                                 Responsive:     responsive,
333                                                 SetReadahead:   true,
334                                                 Readahead:      readahead,
335                                                 LeecherStorage: ls,
336                                         })
337                                 }
338                         }
339                 }
340         }
341 }
342
343 type testClientTransferParams struct {
344         Responsive                 bool
345         Readahead                  int64
346         SetReadahead               bool
347         ExportClientStatus         bool
348         LeecherStorage             func(string) storage.ClientImpl
349         SeederStorage              func(string) storage.ClientImpl
350         SeederUploadRateLimiter    *rate.Limiter
351         LeecherDownloadRateLimiter *rate.Limiter
352 }
353
354 // Creates a seeder and a leecher, and ensures the data transfers when a read
355 // is attempted on the leecher.
356 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
357         greetingTempDir, mi := testutil.GreetingTestTorrent()
358         defer os.RemoveAll(greetingTempDir)
359         // Create seeder and a Torrent.
360         cfg := TestingConfig()
361         cfg.Seed = true
362         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
363         // cfg.ListenAddr = "localhost:4000"
364         if ps.SeederStorage != nil {
365                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
366                 defer cfg.DefaultStorage.Close()
367         } else {
368                 cfg.DataDir = greetingTempDir
369         }
370         seeder, err := NewClient(cfg)
371         require.NoError(t, err)
372         if ps.ExportClientStatus {
373                 testutil.ExportStatusWriter(seeder, "s")
374         }
375         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
376         // Run a Stats right after Closing the Client. This will trigger the Stats
377         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
378         defer seederTorrent.Stats()
379         defer seeder.Close()
380         seederTorrent.VerifyData()
381         // Create leecher and a Torrent.
382         leecherDataDir, err := ioutil.TempDir("", "")
383         require.NoError(t, err)
384         defer os.RemoveAll(leecherDataDir)
385         if ps.LeecherStorage == nil {
386                 cfg.DataDir = leecherDataDir
387         } else {
388                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
389         }
390         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
391         cfg.Seed = false
392         leecher, err := NewClient(cfg)
393         require.NoError(t, err)
394         defer leecher.Close()
395         if ps.ExportClientStatus {
396                 testutil.ExportStatusWriter(leecher, "l")
397         }
398         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
399                 ret = TorrentSpecFromMetaInfo(mi)
400                 ret.ChunkSize = 2
401                 return
402         }())
403         require.NoError(t, err)
404         assert.True(t, new)
405         // Now do some things with leecher and seeder.
406         addClientPeer(leecherTorrent, seeder)
407         // The Torrent should not be interested in obtaining peers, so the one we
408         // just added should be the only one.
409         assert.False(t, leecherTorrent.Seeding())
410         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
411         r := leecherTorrent.NewReader()
412         defer r.Close()
413         if ps.Responsive {
414                 r.SetResponsive()
415         }
416         if ps.SetReadahead {
417                 r.SetReadahead(ps.Readahead)
418         }
419         assertReadAllGreeting(t, r)
420         assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData)
421         assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten)
422         assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData)
423         assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead)
424         // Try reading through again for the cases where the torrent data size
425         // exceeds the size of the cache.
426         assertReadAllGreeting(t, r)
427 }
428
429 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
430         pos, err := r.Seek(0, io.SeekStart)
431         assert.NoError(t, err)
432         assert.EqualValues(t, 0, pos)
433         _greeting, err := ioutil.ReadAll(r)
434         assert.NoError(t, err)
435         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
436 }
437
438 // Check that after completing leeching, a leecher transitions to a seeding
439 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
440 func TestSeedAfterDownloading(t *testing.T) {
441         greetingTempDir, mi := testutil.GreetingTestTorrent()
442         defer os.RemoveAll(greetingTempDir)
443         cfg := TestingConfig()
444         cfg.Seed = true
445         cfg.DataDir = greetingTempDir
446         seeder, err := NewClient(cfg)
447         require.NoError(t, err)
448         defer seeder.Close()
449         testutil.ExportStatusWriter(seeder, "s")
450         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
451         seederTorrent.VerifyData()
452         cfg.DataDir, err = ioutil.TempDir("", "")
453         require.NoError(t, err)
454         defer os.RemoveAll(cfg.DataDir)
455         leecher, err := NewClient(cfg)
456         require.NoError(t, err)
457         defer leecher.Close()
458         testutil.ExportStatusWriter(leecher, "l")
459         cfg.Seed = false
460         cfg.DataDir, err = ioutil.TempDir("", "")
461         require.NoError(t, err)
462         defer os.RemoveAll(cfg.DataDir)
463         leecherLeecher, _ := NewClient(cfg)
464         defer leecherLeecher.Close()
465         testutil.ExportStatusWriter(leecherLeecher, "ll")
466         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
467                 ret = TorrentSpecFromMetaInfo(mi)
468                 ret.ChunkSize = 2
469                 return
470         }())
471         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
472                 ret = TorrentSpecFromMetaInfo(mi)
473                 ret.ChunkSize = 3
474                 return
475         }())
476         // Simultaneously DownloadAll in Leecher, and read the contents
477         // consecutively in LeecherLeecher. This non-deterministically triggered a
478         // case where the leecher wouldn't unchoke the LeecherLeecher.
479         var wg sync.WaitGroup
480         wg.Add(1)
481         go func() {
482                 defer wg.Done()
483                 r := llg.NewReader()
484                 defer r.Close()
485                 b, err := ioutil.ReadAll(r)
486                 require.NoError(t, err)
487                 assert.EqualValues(t, testutil.GreetingFileContents, b)
488         }()
489         addClientPeer(leecherGreeting, seeder)
490         addClientPeer(leecherGreeting, leecherLeecher)
491         wg.Add(1)
492         go func() {
493                 defer wg.Done()
494                 leecherGreeting.DownloadAll()
495                 leecher.WaitAll()
496         }()
497         wg.Wait()
498 }
499
500 func TestMergingTrackersByAddingSpecs(t *testing.T) {
501         cl, err := NewClient(TestingConfig())
502         require.NoError(t, err)
503         defer cl.Close()
504         spec := TorrentSpec{}
505         T, new, _ := cl.AddTorrentSpec(&spec)
506         if !new {
507                 t.FailNow()
508         }
509         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
510         _, new, _ = cl.AddTorrentSpec(&spec)
511         assert.False(t, new)
512         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
513         // Because trackers are disabled in TestingConfig.
514         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
515 }
516
517 // We read from a piece which is marked completed, but is missing data.
518 func TestCompletedPieceWrongSize(t *testing.T) {
519         cfg := TestingConfig()
520         cfg.DefaultStorage = badStorage{}
521         cl, err := NewClient(cfg)
522         require.NoError(t, err)
523         defer cl.Close()
524         info := metainfo.Info{
525                 PieceLength: 15,
526                 Pieces:      make([]byte, 20),
527                 Files: []metainfo.FileInfo{
528                         {Path: []string{"greeting"}, Length: 13},
529                 },
530         }
531         b, err := bencode.Marshal(info)
532         require.NoError(t, err)
533         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
534                 InfoBytes: b,
535                 InfoHash:  metainfo.HashBytes(b),
536         })
537         require.NoError(t, err)
538         defer tt.Drop()
539         assert.True(t, new)
540         r := tt.NewReader()
541         defer r.Close()
542         b, err = ioutil.ReadAll(r)
543         assert.Len(t, b, 13)
544         assert.NoError(t, err)
545 }
546
547 func BenchmarkAddLargeTorrent(b *testing.B) {
548         cfg := TestingConfig()
549         cfg.DisableTCP = true
550         cfg.DisableUTP = true
551         cfg.ListenAddr = "redonk"
552         cl, err := NewClient(cfg)
553         require.NoError(b, err)
554         defer cl.Close()
555         for range iter.N(b.N) {
556                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
557                 if err != nil {
558                         b.Fatal(err)
559                 }
560                 t.Drop()
561         }
562 }
563
564 func TestResponsive(t *testing.T) {
565         seederDataDir, mi := testutil.GreetingTestTorrent()
566         defer os.RemoveAll(seederDataDir)
567         cfg := TestingConfig()
568         cfg.Seed = true
569         cfg.DataDir = seederDataDir
570         seeder, err := NewClient(cfg)
571         require.Nil(t, err)
572         defer seeder.Close()
573         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
574         seederTorrent.VerifyData()
575         leecherDataDir, err := ioutil.TempDir("", "")
576         require.Nil(t, err)
577         defer os.RemoveAll(leecherDataDir)
578         cfg = TestingConfig()
579         cfg.DataDir = leecherDataDir
580         leecher, err := NewClient(cfg)
581         require.Nil(t, err)
582         defer leecher.Close()
583         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
584                 ret = TorrentSpecFromMetaInfo(mi)
585                 ret.ChunkSize = 2
586                 return
587         }())
588         addClientPeer(leecherTorrent, seeder)
589         reader := leecherTorrent.NewReader()
590         defer reader.Close()
591         reader.SetReadahead(0)
592         reader.SetResponsive()
593         b := make([]byte, 2)
594         _, err = reader.Seek(3, io.SeekStart)
595         require.NoError(t, err)
596         _, err = io.ReadFull(reader, b)
597         assert.Nil(t, err)
598         assert.EqualValues(t, "lo", string(b))
599         _, err = reader.Seek(11, io.SeekStart)
600         require.NoError(t, err)
601         n, err := io.ReadFull(reader, b)
602         assert.Nil(t, err)
603         assert.EqualValues(t, 2, n)
604         assert.EqualValues(t, "d\n", string(b))
605 }
606
607 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
608         seederDataDir, mi := testutil.GreetingTestTorrent()
609         defer os.RemoveAll(seederDataDir)
610         cfg := TestingConfig()
611         cfg.Seed = true
612         cfg.DataDir = seederDataDir
613         seeder, err := NewClient(cfg)
614         require.Nil(t, err)
615         defer seeder.Close()
616         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
617         seederTorrent.VerifyData()
618         leecherDataDir, err := ioutil.TempDir("", "")
619         require.Nil(t, err)
620         defer os.RemoveAll(leecherDataDir)
621         cfg = TestingConfig()
622         cfg.DataDir = leecherDataDir
623         leecher, err := NewClient(cfg)
624         require.Nil(t, err)
625         defer leecher.Close()
626         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
627                 ret = TorrentSpecFromMetaInfo(mi)
628                 ret.ChunkSize = 2
629                 return
630         }())
631         addClientPeer(leecherTorrent, seeder)
632         reader := leecherTorrent.NewReader()
633         defer reader.Close()
634         reader.SetReadahead(0)
635         reader.SetResponsive()
636         b := make([]byte, 2)
637         _, err = reader.Seek(3, io.SeekStart)
638         require.NoError(t, err)
639         _, err = io.ReadFull(reader, b)
640         assert.Nil(t, err)
641         assert.EqualValues(t, "lo", string(b))
642         go leecherTorrent.Drop()
643         _, err = reader.Seek(11, io.SeekStart)
644         require.NoError(t, err)
645         n, err := reader.Read(b)
646         assert.EqualError(t, err, "torrent closed")
647         assert.EqualValues(t, 0, n)
648 }
649
650 func TestDHTInheritBlocklist(t *testing.T) {
651         ipl := iplist.New(nil)
652         require.NotNil(t, ipl)
653         cfg := TestingConfig()
654         cfg.IPBlocklist = ipl
655         cfg.NoDHT = false
656         cl, err := NewClient(cfg)
657         require.NoError(t, err)
658         defer cl.Close()
659         require.Equal(t, ipl, cl.DHT().IPBlocklist())
660 }
661
662 // Check that stuff is merged in subsequent AddTorrentSpec for the same
663 // infohash.
664 func TestAddTorrentSpecMerging(t *testing.T) {
665         cl, err := NewClient(TestingConfig())
666         require.NoError(t, err)
667         defer cl.Close()
668         dir, mi := testutil.GreetingTestTorrent()
669         defer os.RemoveAll(dir)
670         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
671                 InfoHash: mi.HashInfoBytes(),
672         })
673         require.NoError(t, err)
674         require.True(t, new)
675         require.Nil(t, tt.Info())
676         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
677         require.NoError(t, err)
678         require.False(t, new)
679         require.NotNil(t, tt.Info())
680 }
681
682 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
683         dir, mi := testutil.GreetingTestTorrent()
684         os.RemoveAll(dir)
685         cl, _ := NewClient(TestingConfig())
686         defer cl.Close()
687         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
688                 InfoHash: mi.HashInfoBytes(),
689         })
690         tt.Drop()
691         assert.EqualValues(t, 0, len(cl.Torrents()))
692         select {
693         case <-tt.GotInfo():
694                 t.FailNow()
695         default:
696         }
697 }
698
699 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
700         for i := range iter.N(info.NumPieces()) {
701                 p := info.Piece(i)
702                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
703         }
704 }
705
706 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
707         fileCacheDir, err := ioutil.TempDir("", "")
708         require.NoError(t, err)
709         defer os.RemoveAll(fileCacheDir)
710         fileCache, err := filecache.NewCache(fileCacheDir)
711         require.NoError(t, err)
712         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
713         defer os.RemoveAll(greetingDataTempDir)
714         filePieceStore := csf(fileCache)
715         defer filePieceStore.Close()
716         info, err := greetingMetainfo.UnmarshalInfo()
717         require.NoError(t, err)
718         ih := greetingMetainfo.HashInfoBytes()
719         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
720         require.NoError(t, err)
721         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
722         // require.Equal(t, len(testutil.GreetingFileContents), written)
723         // require.NoError(t, err)
724         for i := 0; i < info.NumPieces(); i++ {
725                 p := info.Piece(i)
726                 if alreadyCompleted {
727                         require.NoError(t, greetingData.Piece(p).MarkComplete())
728                 }
729         }
730         cfg := TestingConfig()
731         // TODO: Disable network option?
732         cfg.DisableTCP = true
733         cfg.DisableUTP = true
734         cfg.DefaultStorage = filePieceStore
735         cl, err := NewClient(cfg)
736         require.NoError(t, err)
737         defer cl.Close()
738         tt, err := cl.AddTorrent(greetingMetainfo)
739         require.NoError(t, err)
740         psrs := tt.PieceStateRuns()
741         assert.Len(t, psrs, 1)
742         assert.EqualValues(t, 3, psrs[0].Length)
743         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
744         if alreadyCompleted {
745                 r := tt.NewReader()
746                 b, err := ioutil.ReadAll(r)
747                 assert.NoError(t, err)
748                 assert.EqualValues(t, testutil.GreetingFileContents, b)
749         }
750 }
751
752 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
753         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
754 }
755
756 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
757         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
758 }
759
760 func TestAddMetainfoWithNodes(t *testing.T) {
761         cfg := TestingConfig()
762         cfg.ListenAddr = ":0"
763         cfg.NoDHT = false
764         // For now, we want to just jam the nodes into the table, without
765         // verifying them first. Also the DHT code doesn't support mixing secure
766         // and insecure nodes if security is enabled (yet).
767         cfg.DHTConfig.NoSecurity = true
768         cl, err := NewClient(cfg)
769         require.NoError(t, err)
770         defer cl.Close()
771         assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
772         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
773         require.NoError(t, err)
774         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
775         // check if the announce-list is here instead. TODO: Add nodes.
776         assert.Len(t, tt.metainfo.AnnounceList, 5)
777         // There are 6 nodes in the torrent file.
778         assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
779 }
780
781 type testDownloadCancelParams struct {
782         ExportClientStatus        bool
783         SetLeecherStorageCapacity bool
784         LeecherStorageCapacity    int64
785         Cancel                    bool
786 }
787
788 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
789         greetingTempDir, mi := testutil.GreetingTestTorrent()
790         defer os.RemoveAll(greetingTempDir)
791         cfg := TestingConfig()
792         cfg.Seed = true
793         cfg.DataDir = greetingTempDir
794         seeder, err := NewClient(cfg)
795         require.NoError(t, err)
796         defer seeder.Close()
797         if ps.ExportClientStatus {
798                 testutil.ExportStatusWriter(seeder, "s")
799         }
800         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
801         seederTorrent.VerifyData()
802         leecherDataDir, err := ioutil.TempDir("", "")
803         require.NoError(t, err)
804         defer os.RemoveAll(leecherDataDir)
805         fc, err := filecache.NewCache(leecherDataDir)
806         require.NoError(t, err)
807         if ps.SetLeecherStorageCapacity {
808                 fc.SetCapacity(ps.LeecherStorageCapacity)
809         }
810         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
811         cfg.DataDir = leecherDataDir
812         leecher, _ := NewClient(cfg)
813         defer leecher.Close()
814         if ps.ExportClientStatus {
815                 testutil.ExportStatusWriter(leecher, "l")
816         }
817         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
818                 ret = TorrentSpecFromMetaInfo(mi)
819                 ret.ChunkSize = 2
820                 return
821         }())
822         require.NoError(t, err)
823         assert.True(t, new)
824         psc := leecherGreeting.SubscribePieceStateChanges()
825         defer psc.Close()
826
827         leecherGreeting.cl.mu.Lock()
828         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
829         if ps.Cancel {
830                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
831         }
832         leecherGreeting.cl.mu.Unlock()
833
834         addClientPeer(leecherGreeting, seeder)
835         completes := make(map[int]bool, 3)
836 values:
837         for {
838                 // started := time.Now()
839                 select {
840                 case _v := <-psc.Values:
841                         // log.Print(time.Since(started))
842                         v := _v.(PieceStateChange)
843                         completes[v.Index] = v.Complete
844                 case <-time.After(100 * time.Millisecond):
845                         break values
846                 }
847         }
848         if ps.Cancel {
849                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
850         } else {
851                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
852         }
853
854 }
855
856 func TestTorrentDownloadAll(t *testing.T) {
857         testDownloadCancel(t, testDownloadCancelParams{})
858 }
859
860 func TestTorrentDownloadAllThenCancel(t *testing.T) {
861         testDownloadCancel(t, testDownloadCancelParams{
862                 Cancel: true,
863         })
864 }
865
866 // Ensure that it's an error for a peer to send an invalid have message.
867 func TestPeerInvalidHave(t *testing.T) {
868         cl, err := NewClient(TestingConfig())
869         require.NoError(t, err)
870         defer cl.Close()
871         info := metainfo.Info{
872                 PieceLength: 1,
873                 Pieces:      make([]byte, 20),
874                 Files:       []metainfo.FileInfo{{Length: 1}},
875         }
876         infoBytes, err := bencode.Marshal(info)
877         require.NoError(t, err)
878         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
879                 InfoBytes: infoBytes,
880                 InfoHash:  metainfo.HashBytes(infoBytes),
881                 Storage:   badStorage{},
882         })
883         require.NoError(t, err)
884         assert.True(t, _new)
885         defer tt.Drop()
886         cn := &connection{
887                 t: tt,
888         }
889         assert.NoError(t, cn.peerSentHave(0))
890         assert.Error(t, cn.peerSentHave(1))
891 }
892
893 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
894         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
895         defer os.RemoveAll(greetingTempDir)
896         cfg := TestingConfig()
897         cfg.DataDir = greetingTempDir
898         seeder, err := NewClient(TestingConfig())
899         require.NoError(t, err)
900         seeder.AddTorrentSpec(&TorrentSpec{
901                 InfoBytes: greetingMetainfo.InfoBytes,
902         })
903 }
904
905 func TestPrepareTrackerAnnounce(t *testing.T) {
906         cl := &Client{}
907         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
908         require.NoError(t, err)
909         assert.False(t, blocked)
910         assert.EqualValues(t, "localhost:1234", host)
911         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
912 }
913
914 // Check that when the listen port is 0, all the protocols listened on have
915 // the same port, and it isn't zero.
916 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
917         cl, err := NewClient(TestingConfig())
918         require.NoError(t, err)
919         defer cl.Close()
920         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
921         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
922 }
923
924 func TestClientDynamicListenTCPOnly(t *testing.T) {
925         cfg := TestingConfig()
926         cfg.DisableUTP = true
927         cl, err := NewClient(cfg)
928         require.NoError(t, err)
929         defer cl.Close()
930         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
931         assert.Nil(t, cl.utpSock)
932 }
933
934 func TestClientDynamicListenUTPOnly(t *testing.T) {
935         cfg := TestingConfig()
936         cfg.DisableTCP = true
937         cl, err := NewClient(cfg)
938         require.NoError(t, err)
939         defer cl.Close()
940         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
941         assert.Nil(t, cl.tcpListener)
942 }
943
944 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
945         cfg := TestingConfig()
946         cfg.DisableTCP = true
947         cfg.DisableUTP = true
948         cl, err := NewClient(cfg)
949         require.NoError(t, err)
950         defer cl.Close()
951         assert.Nil(t, cl.ListenAddr())
952 }
953
954 func addClientPeer(t *Torrent, cl *Client) {
955         t.AddPeers([]Peer{
956                 {
957                         IP:   missinggo.AddrIP(cl.ListenAddr()),
958                         Port: missinggo.AddrPort(cl.ListenAddr()),
959                 },
960         })
961 }
962
963 func totalConns(tts []*Torrent) (ret int) {
964         for _, tt := range tts {
965                 tt.cl.mu.Lock()
966                 ret += len(tt.conns)
967                 tt.cl.mu.Unlock()
968         }
969         return
970 }
971
972 func TestSetMaxEstablishedConn(t *testing.T) {
973         ss := testutil.NewStatusServer(t)
974         defer ss.Close()
975         var tts []*Torrent
976         ih := testutil.GreetingMetaInfo().HashInfoBytes()
977         for i := range iter.N(3) {
978                 cl, err := NewClient(TestingConfig())
979                 require.NoError(t, err)
980                 defer cl.Close()
981                 tt, _ := cl.AddTorrentInfoHash(ih)
982                 tt.SetMaxEstablishedConns(2)
983                 ss.HandleStatusWriter(cl, fmt.Sprintf("/%d", i))
984                 tts = append(tts, tt)
985         }
986         addPeers := func() {
987                 for _, tt := range tts {
988                         for _, _tt := range tts {
989                                 // if tt != _tt {
990                                 addClientPeer(tt, _tt.cl)
991                                 // }
992                         }
993                 }
994         }
995         waitTotalConns := func(num int) {
996                 for totalConns(tts) != num {
997                         addPeers()
998                         time.Sleep(time.Millisecond)
999                 }
1000         }
1001         addPeers()
1002         waitTotalConns(6)
1003         tts[0].SetMaxEstablishedConns(1)
1004         waitTotalConns(4)
1005         tts[0].SetMaxEstablishedConns(0)
1006         waitTotalConns(2)
1007         tts[0].SetMaxEstablishedConns(1)
1008         addPeers()
1009         waitTotalConns(4)
1010         tts[0].SetMaxEstablishedConns(2)
1011         addPeers()
1012         waitTotalConns(6)
1013 }
1014
1015 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1016         os.MkdirAll(dir, 0770)
1017         file, err := os.Create(filepath.Join(dir, name))
1018         require.NoError(t, err)
1019         file.Write([]byte(name))
1020         file.Close()
1021         mi := metainfo.MetaInfo{}
1022         mi.SetDefaults()
1023         info := metainfo.Info{PieceLength: 256 * 1024}
1024         err = info.BuildFromFilePath(filepath.Join(dir, name))
1025         require.NoError(t, err)
1026         mi.InfoBytes, err = bencode.Marshal(info)
1027         require.NoError(t, err)
1028         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1029         tr, err := cl.AddTorrent(&mi)
1030         require.NoError(t, err)
1031         require.True(t, tr.Seeding())
1032         tr.VerifyData()
1033         return magnet
1034 }
1035
1036 // https://github.com/anacrolix/torrent/issues/114
1037 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1038         cfg := TestingConfig()
1039         cfg.DisableUTP = true
1040         cfg.Seed = true
1041         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1042         cfg.ForceEncryption = true
1043         os.Mkdir(cfg.DataDir, 0755)
1044         server, err := NewClient(cfg)
1045         require.NoError(t, err)
1046         defer server.Close()
1047         testutil.ExportStatusWriter(server, "s")
1048         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1049         makeMagnet(t, server, cfg.DataDir, "test2")
1050         cfg = TestingConfig()
1051         cfg.DisableUTP = true
1052         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1053         cfg.ForceEncryption = true
1054         client, err := NewClient(cfg)
1055         require.NoError(t, err)
1056         defer client.Close()
1057         testutil.ExportStatusWriter(client, "c")
1058         tr, err := client.AddMagnet(magnet1)
1059         require.NoError(t, err)
1060         tr.AddPeers([]Peer{{
1061                 IP:   missinggo.AddrIP(server.ListenAddr()),
1062                 Port: missinggo.AddrPort(server.ListenAddr()),
1063         }})
1064         <-tr.GotInfo()
1065         tr.DownloadAll()
1066         client.WaitAll()
1067 }
1068
1069 func TestClientAddressInUse(t *testing.T) {
1070         s, _ := NewUtpSocket("udp", ":50007")
1071         if s != nil {
1072                 defer s.Close()
1073         }
1074         cfg := TestingConfig()
1075         cfg.ListenAddr = ":50007"
1076         cl, err := NewClient(cfg)
1077         require.Error(t, err)
1078         require.Nil(t, cl)
1079 }