]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Merge commit 'cadbacb956fa38a61f3656dc08d71bd68fc966ec' into dev
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "context"
5         "encoding/binary"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "net"
10         "os"
11         "path/filepath"
12         "reflect"
13         "sync"
14         "testing"
15         "time"
16
17         "github.com/anacrolix/dht"
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/bradfitz/iter"
22         "github.com/stretchr/testify/assert"
23         "github.com/stretchr/testify/require"
24         "golang.org/x/time/rate"
25
26         "github.com/anacrolix/torrent/bencode"
27         "github.com/anacrolix/torrent/internal/testutil"
28         "github.com/anacrolix/torrent/iplist"
29         "github.com/anacrolix/torrent/metainfo"
30         "github.com/anacrolix/torrent/storage"
31 )
32
33 func TestingConfig() *Config {
34         return &Config{
35                 ListenHost:              LoopbackListenHost,
36                 NoDHT:                   true,
37                 DataDir:                 tempDir(),
38                 DisableTrackers:         true,
39                 NoDefaultPortForwarding: true,
40                 // Debug:           true,
41         }
42 }
43
44 func TestClientDefault(t *testing.T) {
45         cl, err := NewClient(TestingConfig())
46         require.NoError(t, err)
47         cl.Close()
48 }
49
50 func TestClientNilConfig(t *testing.T) {
51         cl, err := NewClient(nil)
52         require.NoError(t, err)
53         cl.Close()
54 }
55
56 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
57         cfg := TestingConfig()
58         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
59         require.NoError(t, err)
60         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
61         defer ci.Close()
62         cfg.DefaultStorage = ci
63         cl, err := NewClient(cfg)
64         require.NoError(t, err)
65         cl.Close()
66         // And again, https://github.com/anacrolix/torrent/issues/158
67         cl, err = NewClient(cfg)
68         require.NoError(t, err)
69         cl.Close()
70 }
71
72 func TestAddDropTorrent(t *testing.T) {
73         cl, err := NewClient(TestingConfig())
74         require.NoError(t, err)
75         defer cl.Close()
76         dir, mi := testutil.GreetingTestTorrent()
77         defer os.RemoveAll(dir)
78         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
79         require.NoError(t, err)
80         assert.True(t, new)
81         tt.SetMaxEstablishedConns(0)
82         tt.SetMaxEstablishedConns(1)
83         tt.Drop()
84 }
85
86 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
87         // TODO?
88         t.SkipNow()
89 }
90
91 func TestAddTorrentNoUsableURLs(t *testing.T) {
92         // TODO?
93         t.SkipNow()
94 }
95
96 func TestAddPeersToUnknownTorrent(t *testing.T) {
97         // TODO?
98         t.SkipNow()
99 }
100
101 func TestPieceHashSize(t *testing.T) {
102         assert.Equal(t, 20, pieceHash.Size())
103 }
104
105 func TestTorrentInitialState(t *testing.T) {
106         dir, mi := testutil.GreetingTestTorrent()
107         defer os.RemoveAll(dir)
108         cl := &Client{}
109         cl.initLogger()
110         tor := cl.newTorrent(
111                 mi.HashInfoBytes(),
112                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
113         )
114         tor.setChunkSize(2)
115         tor.cl.mu.Lock()
116         err := tor.setInfoBytes(mi.InfoBytes)
117         tor.cl.mu.Unlock()
118         require.NoError(t, err)
119         require.Len(t, tor.pieces, 3)
120         tor.pendAllChunkSpecs(0)
121         tor.cl.mu.Lock()
122         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
123         tor.cl.mu.Unlock()
124         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
125 }
126
127 func TestUnmarshalPEXMsg(t *testing.T) {
128         var m peerExchangeMessage
129         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
130                 t.Fatal(err)
131         }
132         if len(m.Added) != 2 {
133                 t.FailNow()
134         }
135         if m.Added[0].Port != 0x506 {
136                 t.FailNow()
137         }
138 }
139
140 func TestReducedDialTimeout(t *testing.T) {
141         cfg := &Config{}
142         cfg.setDefaults()
143         for _, _case := range []struct {
144                 Max             time.Duration
145                 HalfOpenLimit   int
146                 PendingPeers    int
147                 ExpectedReduced time.Duration
148         }{
149                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
150                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
151                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
152                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
153                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
154                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
155         } {
156                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
157                 expected := _case.ExpectedReduced
158                 if expected < cfg.MinDialTimeout {
159                         expected = cfg.MinDialTimeout
160                 }
161                 if reduced != expected {
162                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
163                 }
164         }
165 }
166
167 func TestUTPRawConn(t *testing.T) {
168         l, err := NewUtpSocket("udp", "")
169         require.NoError(t, err)
170         defer l.Close()
171         go func() {
172                 for {
173                         _, err := l.Accept()
174                         if err != nil {
175                                 break
176                         }
177                 }
178         }()
179         // Connect a UTP peer to see if the RawConn will still work.
180         s, err := NewUtpSocket("udp", "")
181         require.NoError(t, err)
182         defer s.Close()
183         utpPeer, err := s.DialContext(context.Background(), "", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
184         require.NoError(t, err)
185         defer utpPeer.Close()
186         peer, err := net.ListenPacket("udp", ":0")
187         require.NoError(t, err)
188         defer peer.Close()
189
190         msgsReceived := 0
191         // How many messages to send. I've set this to double the channel buffer
192         // size in the raw packetConn.
193         const N = 200
194         readerStopped := make(chan struct{})
195         // The reader goroutine.
196         go func() {
197                 defer close(readerStopped)
198                 b := make([]byte, 500)
199                 for i := 0; i < N; i++ {
200                         n, _, err := l.ReadFrom(b)
201                         require.NoError(t, err)
202                         msgsReceived++
203                         var d int
204                         fmt.Sscan(string(b[:n]), &d)
205                         assert.Equal(t, i, d)
206                 }
207         }()
208         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
209         require.NoError(t, err)
210         for i := 0; i < N; i++ {
211                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
212                 require.NoError(t, err)
213                 time.Sleep(time.Millisecond)
214         }
215         select {
216         case <-readerStopped:
217         case <-time.After(time.Second):
218                 t.Fatal("reader timed out")
219         }
220         if msgsReceived != N {
221                 t.Fatalf("messages received: %d", msgsReceived)
222         }
223 }
224
225 func TestAddDropManyTorrents(t *testing.T) {
226         cl, err := NewClient(TestingConfig())
227         require.NoError(t, err)
228         defer cl.Close()
229         for i := range iter.N(1000) {
230                 var spec TorrentSpec
231                 binary.PutVarint(spec.InfoHash[:], int64(i))
232                 tt, new, err := cl.AddTorrentSpec(&spec)
233                 assert.NoError(t, err)
234                 assert.True(t, new)
235                 defer tt.Drop()
236         }
237 }
238
239 type FileCacheClientStorageFactoryParams struct {
240         Capacity    int64
241         SetCapacity bool
242         Wrapper     func(*filecache.Cache) storage.ClientImpl
243 }
244
245 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
246         return func(dataDir string) storage.ClientImpl {
247                 fc, err := filecache.NewCache(dataDir)
248                 if err != nil {
249                         panic(err)
250                 }
251                 if ps.SetCapacity {
252                         fc.SetCapacity(ps.Capacity)
253                 }
254                 return ps.Wrapper(fc)
255         }
256 }
257
258 type storageFactory func(string) storage.ClientImpl
259
260 func TestClientTransferDefault(t *testing.T) {
261         testClientTransfer(t, testClientTransferParams{
262                 ExportClientStatus: true,
263                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
264                         Wrapper: fileCachePieceResourceStorage,
265                 }),
266         })
267 }
268
269 func TestClientTransferRateLimitedUpload(t *testing.T) {
270         started := time.Now()
271         testClientTransfer(t, testClientTransferParams{
272                 // We are uploading 13 bytes (the length of the greeting torrent). The
273                 // chunks are 2 bytes in length. Then the smallest burst we can run
274                 // with is 2. Time taken is (13-burst)/rate.
275                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
276                 ExportClientStatus:      true,
277         })
278         require.True(t, time.Since(started) > time.Second)
279 }
280
281 func TestClientTransferRateLimitedDownload(t *testing.T) {
282         testClientTransfer(t, testClientTransferParams{
283                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
284         })
285 }
286
287 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
288         return storage.NewResourcePieces(fc.AsResourceProvider())
289 }
290
291 func TestClientTransferSmallCache(t *testing.T) {
292         testClientTransfer(t, testClientTransferParams{
293                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
294                         SetCapacity: true,
295                         // Going below the piece length means it can't complete a piece so
296                         // that it can be hashed.
297                         Capacity: 5,
298                         Wrapper:  fileCachePieceResourceStorage,
299                 }),
300                 SetReadahead: true,
301                 // Can't readahead too far or the cache will thrash and drop data we
302                 // thought we had.
303                 Readahead:          0,
304                 ExportClientStatus: true,
305         })
306 }
307
308 func TestClientTransferVarious(t *testing.T) {
309         // Leecher storage
310         for _, ls := range []storageFactory{
311                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
312                         Wrapper: fileCachePieceResourceStorage,
313                 }),
314                 storage.NewBoltDB,
315         } {
316                 // Seeder storage
317                 for _, ss := range []func(string) storage.ClientImpl{
318                         storage.NewFile,
319                         storage.NewMMap,
320                 } {
321                         for _, responsive := range []bool{false, true} {
322                                 testClientTransfer(t, testClientTransferParams{
323                                         Responsive:     responsive,
324                                         SeederStorage:  ss,
325                                         LeecherStorage: ls,
326                                 })
327                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
328                                         testClientTransfer(t, testClientTransferParams{
329                                                 SeederStorage:  ss,
330                                                 Responsive:     responsive,
331                                                 SetReadahead:   true,
332                                                 Readahead:      readahead,
333                                                 LeecherStorage: ls,
334                                         })
335                                 }
336                         }
337                 }
338         }
339 }
340
341 type testClientTransferParams struct {
342         Responsive                 bool
343         Readahead                  int64
344         SetReadahead               bool
345         ExportClientStatus         bool
346         LeecherStorage             func(string) storage.ClientImpl
347         SeederStorage              func(string) storage.ClientImpl
348         SeederUploadRateLimiter    *rate.Limiter
349         LeecherDownloadRateLimiter *rate.Limiter
350 }
351
352 // Creates a seeder and a leecher, and ensures the data transfers when a read
353 // is attempted on the leecher.
354 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
355         greetingTempDir, mi := testutil.GreetingTestTorrent()
356         defer os.RemoveAll(greetingTempDir)
357         // Create seeder and a Torrent.
358         cfg := TestingConfig()
359         cfg.Seed = true
360         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
361         // cfg.ListenAddr = "localhost:4000"
362         if ps.SeederStorage != nil {
363                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
364                 defer cfg.DefaultStorage.Close()
365         } else {
366                 cfg.DataDir = greetingTempDir
367         }
368         seeder, err := NewClient(cfg)
369         require.NoError(t, err)
370         if ps.ExportClientStatus {
371                 testutil.ExportStatusWriter(seeder, "s")
372         }
373         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
374         // Run a Stats right after Closing the Client. This will trigger the Stats
375         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
376         defer seederTorrent.Stats()
377         defer seeder.Close()
378         seederTorrent.VerifyData()
379         // Create leecher and a Torrent.
380         leecherDataDir, err := ioutil.TempDir("", "")
381         require.NoError(t, err)
382         defer os.RemoveAll(leecherDataDir)
383         if ps.LeecherStorage == nil {
384                 cfg.DataDir = leecherDataDir
385         } else {
386                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
387         }
388         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
389         cfg.Seed = false
390         leecher, err := NewClient(cfg)
391         require.NoError(t, err)
392         defer leecher.Close()
393         if ps.ExportClientStatus {
394                 testutil.ExportStatusWriter(leecher, "l")
395         }
396         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
397                 ret = TorrentSpecFromMetaInfo(mi)
398                 ret.ChunkSize = 2
399                 return
400         }())
401         require.NoError(t, err)
402         assert.True(t, new)
403         // Now do some things with leecher and seeder.
404         leecherTorrent.AddClientPeer(seeder)
405         // The Torrent should not be interested in obtaining peers, so the one we
406         // just added should be the only one.
407         assert.False(t, leecherTorrent.Seeding())
408         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
409         r := leecherTorrent.NewReader()
410         defer r.Close()
411         if ps.Responsive {
412                 r.SetResponsive()
413         }
414         if ps.SetReadahead {
415                 r.SetReadahead(ps.Readahead)
416         }
417         assertReadAllGreeting(t, r)
418         assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData)
419         assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten)
420         assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData)
421         assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead)
422         // Try reading through again for the cases where the torrent data size
423         // exceeds the size of the cache.
424         assertReadAllGreeting(t, r)
425 }
426
427 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
428         pos, err := r.Seek(0, io.SeekStart)
429         assert.NoError(t, err)
430         assert.EqualValues(t, 0, pos)
431         _greeting, err := ioutil.ReadAll(r)
432         assert.NoError(t, err)
433         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
434 }
435
436 // Check that after completing leeching, a leecher transitions to a seeding
437 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
438 func TestSeedAfterDownloading(t *testing.T) {
439         greetingTempDir, mi := testutil.GreetingTestTorrent()
440         defer os.RemoveAll(greetingTempDir)
441         cfg := TestingConfig()
442         cfg.Seed = true
443         cfg.DataDir = greetingTempDir
444         seeder, err := NewClient(cfg)
445         require.NoError(t, err)
446         defer seeder.Close()
447         testutil.ExportStatusWriter(seeder, "s")
448         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
449         require.NoError(t, err)
450         assert.True(t, ok)
451         seederTorrent.VerifyData()
452         cfg.DataDir, err = ioutil.TempDir("", "")
453         require.NoError(t, err)
454         defer os.RemoveAll(cfg.DataDir)
455         leecher, err := NewClient(cfg)
456         require.NoError(t, err)
457         defer leecher.Close()
458         testutil.ExportStatusWriter(leecher, "l")
459         cfg.Seed = false
460         cfg.DataDir, err = ioutil.TempDir("", "")
461         require.NoError(t, err)
462         defer os.RemoveAll(cfg.DataDir)
463         leecherLeecher, _ := NewClient(cfg)
464         require.NoError(t, err)
465         defer leecherLeecher.Close()
466         testutil.ExportStatusWriter(leecherLeecher, "ll")
467         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
468                 ret = TorrentSpecFromMetaInfo(mi)
469                 ret.ChunkSize = 2
470                 return
471         }())
472         require.NoError(t, err)
473         assert.True(t, ok)
474         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
475                 ret = TorrentSpecFromMetaInfo(mi)
476                 ret.ChunkSize = 3
477                 return
478         }())
479         require.NoError(t, err)
480         assert.True(t, ok)
481         // Simultaneously DownloadAll in Leecher, and read the contents
482         // consecutively in LeecherLeecher. This non-deterministically triggered a
483         // case where the leecher wouldn't unchoke the LeecherLeecher.
484         var wg sync.WaitGroup
485         wg.Add(1)
486         go func() {
487                 defer wg.Done()
488                 r := llg.NewReader()
489                 defer r.Close()
490                 b, err := ioutil.ReadAll(r)
491                 require.NoError(t, err)
492                 assert.EqualValues(t, testutil.GreetingFileContents, b)
493         }()
494         done := make(chan struct{})
495         defer close(done)
496         go func() {
497                 for {
498                         go leecherGreeting.AddClientPeer(seeder)
499                         go leecherGreeting.AddClientPeer(leecherLeecher)
500                         select {
501                         case <-done:
502                                 return
503                         case <-time.After(time.Second):
504                         }
505                 }
506         }()
507         wg.Add(1)
508         go func() {
509                 defer wg.Done()
510                 leecherGreeting.DownloadAll()
511                 leecher.WaitAll()
512         }()
513         wg.Wait()
514 }
515
516 func TestMergingTrackersByAddingSpecs(t *testing.T) {
517         cl, err := NewClient(TestingConfig())
518         require.NoError(t, err)
519         defer cl.Close()
520         spec := TorrentSpec{}
521         T, new, _ := cl.AddTorrentSpec(&spec)
522         if !new {
523                 t.FailNow()
524         }
525         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
526         _, new, _ = cl.AddTorrentSpec(&spec)
527         assert.False(t, new)
528         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
529         // Because trackers are disabled in TestingConfig.
530         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
531 }
532
533 // We read from a piece which is marked completed, but is missing data.
534 func TestCompletedPieceWrongSize(t *testing.T) {
535         cfg := TestingConfig()
536         cfg.DefaultStorage = badStorage{}
537         cl, err := NewClient(cfg)
538         require.NoError(t, err)
539         defer cl.Close()
540         info := metainfo.Info{
541                 PieceLength: 15,
542                 Pieces:      make([]byte, 20),
543                 Files: []metainfo.FileInfo{
544                         {Path: []string{"greeting"}, Length: 13},
545                 },
546         }
547         b, err := bencode.Marshal(info)
548         require.NoError(t, err)
549         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
550                 InfoBytes: b,
551                 InfoHash:  metainfo.HashBytes(b),
552         })
553         require.NoError(t, err)
554         defer tt.Drop()
555         assert.True(t, new)
556         r := tt.NewReader()
557         defer r.Close()
558         b, err = ioutil.ReadAll(r)
559         assert.Len(t, b, 13)
560         assert.NoError(t, err)
561 }
562
563 func BenchmarkAddLargeTorrent(b *testing.B) {
564         cfg := TestingConfig()
565         cfg.DisableTCP = true
566         cfg.DisableUTP = true
567         cfg.ListenHost = func(string) string { return "redonk" }
568         cl, err := NewClient(cfg)
569         require.NoError(b, err)
570         defer cl.Close()
571         for range iter.N(b.N) {
572                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
573                 if err != nil {
574                         b.Fatal(err)
575                 }
576                 t.Drop()
577         }
578 }
579
580 func TestResponsive(t *testing.T) {
581         seederDataDir, mi := testutil.GreetingTestTorrent()
582         defer os.RemoveAll(seederDataDir)
583         cfg := TestingConfig()
584         cfg.Seed = true
585         cfg.DataDir = seederDataDir
586         seeder, err := NewClient(cfg)
587         require.Nil(t, err)
588         defer seeder.Close()
589         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
590         seederTorrent.VerifyData()
591         leecherDataDir, err := ioutil.TempDir("", "")
592         require.Nil(t, err)
593         defer os.RemoveAll(leecherDataDir)
594         cfg = TestingConfig()
595         cfg.DataDir = leecherDataDir
596         leecher, err := NewClient(cfg)
597         require.Nil(t, err)
598         defer leecher.Close()
599         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
600                 ret = TorrentSpecFromMetaInfo(mi)
601                 ret.ChunkSize = 2
602                 return
603         }())
604         leecherTorrent.AddClientPeer(seeder)
605         reader := leecherTorrent.NewReader()
606         defer reader.Close()
607         reader.SetReadahead(0)
608         reader.SetResponsive()
609         b := make([]byte, 2)
610         _, err = reader.Seek(3, io.SeekStart)
611         require.NoError(t, err)
612         _, err = io.ReadFull(reader, b)
613         assert.Nil(t, err)
614         assert.EqualValues(t, "lo", string(b))
615         _, err = reader.Seek(11, io.SeekStart)
616         require.NoError(t, err)
617         n, err := io.ReadFull(reader, b)
618         assert.Nil(t, err)
619         assert.EqualValues(t, 2, n)
620         assert.EqualValues(t, "d\n", string(b))
621 }
622
623 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
624         seederDataDir, mi := testutil.GreetingTestTorrent()
625         defer os.RemoveAll(seederDataDir)
626         cfg := TestingConfig()
627         cfg.Seed = true
628         cfg.DataDir = seederDataDir
629         seeder, err := NewClient(cfg)
630         require.Nil(t, err)
631         defer seeder.Close()
632         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
633         seederTorrent.VerifyData()
634         leecherDataDir, err := ioutil.TempDir("", "")
635         require.Nil(t, err)
636         defer os.RemoveAll(leecherDataDir)
637         cfg = TestingConfig()
638         cfg.DataDir = leecherDataDir
639         leecher, err := NewClient(cfg)
640         require.Nil(t, err)
641         defer leecher.Close()
642         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
643                 ret = TorrentSpecFromMetaInfo(mi)
644                 ret.ChunkSize = 2
645                 return
646         }())
647         leecherTorrent.AddClientPeer(seeder)
648         reader := leecherTorrent.NewReader()
649         defer reader.Close()
650         reader.SetReadahead(0)
651         reader.SetResponsive()
652         b := make([]byte, 2)
653         _, err = reader.Seek(3, io.SeekStart)
654         require.NoError(t, err)
655         _, err = io.ReadFull(reader, b)
656         assert.Nil(t, err)
657         assert.EqualValues(t, "lo", string(b))
658         go leecherTorrent.Drop()
659         _, err = reader.Seek(11, io.SeekStart)
660         require.NoError(t, err)
661         n, err := reader.Read(b)
662         assert.EqualError(t, err, "torrent closed")
663         assert.EqualValues(t, 0, n)
664 }
665
666 func TestDHTInheritBlocklist(t *testing.T) {
667         ipl := iplist.New(nil)
668         require.NotNil(t, ipl)
669         cfg := TestingConfig()
670         cfg.IPBlocklist = ipl
671         cfg.NoDHT = false
672         cl, err := NewClient(cfg)
673         require.NoError(t, err)
674         defer cl.Close()
675         numServers := 0
676         cl.eachDhtServer(func(s *dht.Server) {
677                 assert.Equal(t, ipl, s.IPBlocklist())
678                 numServers++
679         })
680         assert.EqualValues(t, 2, numServers)
681 }
682
683 // Check that stuff is merged in subsequent AddTorrentSpec for the same
684 // infohash.
685 func TestAddTorrentSpecMerging(t *testing.T) {
686         cl, err := NewClient(TestingConfig())
687         require.NoError(t, err)
688         defer cl.Close()
689         dir, mi := testutil.GreetingTestTorrent()
690         defer os.RemoveAll(dir)
691         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
692                 InfoHash: mi.HashInfoBytes(),
693         })
694         require.NoError(t, err)
695         require.True(t, new)
696         require.Nil(t, tt.Info())
697         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
698         require.NoError(t, err)
699         require.False(t, new)
700         require.NotNil(t, tt.Info())
701 }
702
703 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
704         dir, mi := testutil.GreetingTestTorrent()
705         os.RemoveAll(dir)
706         cl, _ := NewClient(TestingConfig())
707         defer cl.Close()
708         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
709                 InfoHash: mi.HashInfoBytes(),
710         })
711         tt.Drop()
712         assert.EqualValues(t, 0, len(cl.Torrents()))
713         select {
714         case <-tt.GotInfo():
715                 t.FailNow()
716         default:
717         }
718 }
719
720 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
721         for i := range iter.N(info.NumPieces()) {
722                 p := info.Piece(i)
723                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
724         }
725 }
726
727 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
728         fileCacheDir, err := ioutil.TempDir("", "")
729         require.NoError(t, err)
730         defer os.RemoveAll(fileCacheDir)
731         fileCache, err := filecache.NewCache(fileCacheDir)
732         require.NoError(t, err)
733         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
734         defer os.RemoveAll(greetingDataTempDir)
735         filePieceStore := csf(fileCache)
736         defer filePieceStore.Close()
737         info, err := greetingMetainfo.UnmarshalInfo()
738         require.NoError(t, err)
739         ih := greetingMetainfo.HashInfoBytes()
740         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
741         require.NoError(t, err)
742         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
743         // require.Equal(t, len(testutil.GreetingFileContents), written)
744         // require.NoError(t, err)
745         for i := 0; i < info.NumPieces(); i++ {
746                 p := info.Piece(i)
747                 if alreadyCompleted {
748                         require.NoError(t, greetingData.Piece(p).MarkComplete())
749                 }
750         }
751         cfg := TestingConfig()
752         // TODO: Disable network option?
753         cfg.DisableTCP = true
754         cfg.DisableUTP = true
755         cfg.DefaultStorage = filePieceStore
756         cl, err := NewClient(cfg)
757         require.NoError(t, err)
758         defer cl.Close()
759         tt, err := cl.AddTorrent(greetingMetainfo)
760         require.NoError(t, err)
761         psrs := tt.PieceStateRuns()
762         assert.Len(t, psrs, 1)
763         assert.EqualValues(t, 3, psrs[0].Length)
764         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
765         if alreadyCompleted {
766                 r := tt.NewReader()
767                 b, err := ioutil.ReadAll(r)
768                 assert.NoError(t, err)
769                 assert.EqualValues(t, testutil.GreetingFileContents, b)
770         }
771 }
772
773 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
774         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
775 }
776
777 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
778         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
779 }
780
781 func TestAddMetainfoWithNodes(t *testing.T) {
782         cfg := TestingConfig()
783         cfg.ListenHost = func(string) string { return "" }
784         cfg.NoDHT = false
785         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
786         // For now, we want to just jam the nodes into the table, without
787         // verifying them first. Also the DHT code doesn't support mixing secure
788         // and insecure nodes if security is enabled (yet).
789         // cfg.DHTConfig.NoSecurity = true
790         cl, err := NewClient(cfg)
791         require.NoError(t, err)
792         defer cl.Close()
793         sum := func() (ret int64) {
794                 cl.eachDhtServer(func(s *dht.Server) {
795                         ret += s.Stats().OutboundQueriesAttempted
796                 })
797                 return
798         }
799         assert.EqualValues(t, 0, sum())
800         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
801         require.NoError(t, err)
802         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
803         // check if the announce-list is here instead. TODO: Add nodes.
804         assert.Len(t, tt.metainfo.AnnounceList, 5)
805         // There are 6 nodes in the torrent file.
806         assert.EqualValues(t, 6*len(cl.dhtServers), sum())
807 }
808
809 type testDownloadCancelParams struct {
810         ExportClientStatus        bool
811         SetLeecherStorageCapacity bool
812         LeecherStorageCapacity    int64
813         Cancel                    bool
814 }
815
816 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
817         greetingTempDir, mi := testutil.GreetingTestTorrent()
818         defer os.RemoveAll(greetingTempDir)
819         cfg := TestingConfig()
820         cfg.Seed = true
821         cfg.DataDir = greetingTempDir
822         seeder, err := NewClient(cfg)
823         require.NoError(t, err)
824         defer seeder.Close()
825         if ps.ExportClientStatus {
826                 testutil.ExportStatusWriter(seeder, "s")
827         }
828         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
829         seederTorrent.VerifyData()
830         leecherDataDir, err := ioutil.TempDir("", "")
831         require.NoError(t, err)
832         defer os.RemoveAll(leecherDataDir)
833         fc, err := filecache.NewCache(leecherDataDir)
834         require.NoError(t, err)
835         if ps.SetLeecherStorageCapacity {
836                 fc.SetCapacity(ps.LeecherStorageCapacity)
837         }
838         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
839         cfg.DataDir = leecherDataDir
840         leecher, _ := NewClient(cfg)
841         defer leecher.Close()
842         if ps.ExportClientStatus {
843                 testutil.ExportStatusWriter(leecher, "l")
844         }
845         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
846                 ret = TorrentSpecFromMetaInfo(mi)
847                 ret.ChunkSize = 2
848                 return
849         }())
850         require.NoError(t, err)
851         assert.True(t, new)
852         psc := leecherGreeting.SubscribePieceStateChanges()
853         defer psc.Close()
854
855         leecherGreeting.cl.mu.Lock()
856         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
857         if ps.Cancel {
858                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
859         }
860         leecherGreeting.cl.mu.Unlock()
861
862         leecherGreeting.AddClientPeer(seeder)
863         completes := make(map[int]bool, 3)
864         expected := func() map[int]bool {
865                 if ps.Cancel {
866                         return map[int]bool{0: false, 1: false, 2: false}
867                 } else {
868                         return map[int]bool{0: true, 1: true, 2: true}
869                 }
870         }()
871         for !reflect.DeepEqual(completes, expected) {
872                 select {
873                 case _v := <-psc.Values:
874                         v := _v.(PieceStateChange)
875                         completes[v.Index] = v.Complete
876                 }
877         }
878 }
879
880 func TestTorrentDownloadAll(t *testing.T) {
881         testDownloadCancel(t, testDownloadCancelParams{})
882 }
883
884 func TestTorrentDownloadAllThenCancel(t *testing.T) {
885         testDownloadCancel(t, testDownloadCancelParams{
886                 Cancel: true,
887         })
888 }
889
890 // Ensure that it's an error for a peer to send an invalid have message.
891 func TestPeerInvalidHave(t *testing.T) {
892         cl, err := NewClient(TestingConfig())
893         require.NoError(t, err)
894         defer cl.Close()
895         info := metainfo.Info{
896                 PieceLength: 1,
897                 Pieces:      make([]byte, 20),
898                 Files:       []metainfo.FileInfo{{Length: 1}},
899         }
900         infoBytes, err := bencode.Marshal(info)
901         require.NoError(t, err)
902         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
903                 InfoBytes: infoBytes,
904                 InfoHash:  metainfo.HashBytes(infoBytes),
905                 Storage:   badStorage{},
906         })
907         require.NoError(t, err)
908         assert.True(t, _new)
909         defer tt.Drop()
910         cn := &connection{
911                 t: tt,
912         }
913         assert.NoError(t, cn.peerSentHave(0))
914         assert.Error(t, cn.peerSentHave(1))
915 }
916
917 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
918         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
919         defer os.RemoveAll(greetingTempDir)
920         cfg := TestingConfig()
921         cfg.DataDir = greetingTempDir
922         seeder, err := NewClient(TestingConfig())
923         require.NoError(t, err)
924         seeder.AddTorrentSpec(&TorrentSpec{
925                 InfoBytes: greetingMetainfo.InfoBytes,
926         })
927 }
928
929 // Check that when the listen port is 0, all the protocols listened on have
930 // the same port, and it isn't zero.
931 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
932         cl, err := NewClient(TestingConfig())
933         require.NoError(t, err)
934         defer cl.Close()
935         port := cl.LocalPort()
936         assert.NotEqual(t, 0, port)
937         cl.eachListener(func(s socket) bool {
938                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
939                 return true
940         })
941 }
942
943 func TestClientDynamicListenTCPOnly(t *testing.T) {
944         cfg := TestingConfig()
945         cfg.DisableUTP = true
946         cl, err := NewClient(cfg)
947         require.NoError(t, err)
948         defer cl.Close()
949         assert.NotEqual(t, 0, cl.LocalPort())
950         cl.eachListener(func(s socket) bool {
951                 assert.True(t, isTcpNetwork(s.Addr().Network()))
952                 return true
953         })
954 }
955
956 func TestClientDynamicListenUTPOnly(t *testing.T) {
957         cfg := TestingConfig()
958         cfg.DisableTCP = true
959         cl, err := NewClient(cfg)
960         require.NoError(t, err)
961         defer cl.Close()
962         assert.NotEqual(t, 0, cl.LocalPort())
963         cl.eachListener(func(s socket) bool {
964                 assert.True(t, isUtpNetwork(s.Addr().Network()))
965                 return true
966         })
967 }
968
969 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
970         cfg := TestingConfig()
971         cfg.DisableTCP = true
972         cfg.DisableUTP = true
973         cl, err := NewClient(cfg)
974         require.NoError(t, err)
975         defer cl.Close()
976         assert.Equal(t, 0, cl.LocalPort())
977 }
978
979 func totalConns(tts []*Torrent) (ret int) {
980         for _, tt := range tts {
981                 tt.cl.mu.Lock()
982                 ret += len(tt.conns)
983                 tt.cl.mu.Unlock()
984         }
985         return
986 }
987
988 func TestSetMaxEstablishedConn(t *testing.T) {
989         ss := testutil.NewStatusServer(t)
990         defer ss.Close()
991         var tts []*Torrent
992         ih := testutil.GreetingMetaInfo().HashInfoBytes()
993         for i := range iter.N(3) {
994                 cl, err := NewClient(TestingConfig())
995                 require.NoError(t, err)
996                 defer cl.Close()
997                 tt, _ := cl.AddTorrentInfoHash(ih)
998                 tt.SetMaxEstablishedConns(2)
999                 ss.HandleStatusWriter(cl, fmt.Sprintf("/%d", i))
1000                 tts = append(tts, tt)
1001         }
1002         addPeers := func() {
1003                 for _, tt := range tts {
1004                         for _, _tt := range tts {
1005                                 // if tt != _tt {
1006                                 tt.AddClientPeer(_tt.cl)
1007                                 // }
1008                         }
1009                 }
1010         }
1011         waitTotalConns := func(num int) {
1012                 for totalConns(tts) != num {
1013                         addPeers()
1014                         time.Sleep(time.Millisecond)
1015                 }
1016         }
1017         addPeers()
1018         waitTotalConns(6)
1019         tts[0].SetMaxEstablishedConns(1)
1020         waitTotalConns(4)
1021         tts[0].SetMaxEstablishedConns(0)
1022         waitTotalConns(2)
1023         tts[0].SetMaxEstablishedConns(1)
1024         addPeers()
1025         waitTotalConns(4)
1026         tts[0].SetMaxEstablishedConns(2)
1027         addPeers()
1028         waitTotalConns(6)
1029 }
1030
1031 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1032         os.MkdirAll(dir, 0770)
1033         file, err := os.Create(filepath.Join(dir, name))
1034         require.NoError(t, err)
1035         file.Write([]byte(name))
1036         file.Close()
1037         mi := metainfo.MetaInfo{}
1038         mi.SetDefaults()
1039         info := metainfo.Info{PieceLength: 256 * 1024}
1040         err = info.BuildFromFilePath(filepath.Join(dir, name))
1041         require.NoError(t, err)
1042         mi.InfoBytes, err = bencode.Marshal(info)
1043         require.NoError(t, err)
1044         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1045         tr, err := cl.AddTorrent(&mi)
1046         require.NoError(t, err)
1047         require.True(t, tr.Seeding())
1048         tr.VerifyData()
1049         return magnet
1050 }
1051
1052 // https://github.com/anacrolix/torrent/issues/114
1053 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1054         cfg := TestingConfig()
1055         cfg.DisableUTP = true
1056         cfg.Seed = true
1057         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1058         cfg.ForceEncryption = true
1059         os.Mkdir(cfg.DataDir, 0755)
1060         server, err := NewClient(cfg)
1061         require.NoError(t, err)
1062         defer server.Close()
1063         testutil.ExportStatusWriter(server, "s")
1064         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1065         makeMagnet(t, server, cfg.DataDir, "test2")
1066         cfg = TestingConfig()
1067         cfg.DisableUTP = true
1068         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1069         cfg.ForceEncryption = true
1070         client, err := NewClient(cfg)
1071         require.NoError(t, err)
1072         defer client.Close()
1073         testutil.ExportStatusWriter(client, "c")
1074         tr, err := client.AddMagnet(magnet1)
1075         require.NoError(t, err)
1076         tr.AddClientPeer(server)
1077         <-tr.GotInfo()
1078         tr.DownloadAll()
1079         client.WaitAll()
1080 }
1081
1082 func TestClientAddressInUse(t *testing.T) {
1083         s, _ := NewUtpSocket("udp", ":50007")
1084         if s != nil {
1085                 defer s.Close()
1086         }
1087         cfg := TestingConfig().SetListenAddr(":50007")
1088         cl, err := NewClient(cfg)
1089         require.Error(t, err)
1090         require.Nil(t, cl)
1091 }