]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Rework conns to/and allow multiple DHT servers
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "context"
5         "encoding/binary"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "net"
10         "os"
11         "path/filepath"
12         "sync"
13         "testing"
14         "time"
15
16         "github.com/anacrolix/dht"
17         _ "github.com/anacrolix/envpprof"
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/filecache"
20         "github.com/bradfitz/iter"
21         "github.com/stretchr/testify/assert"
22         "github.com/stretchr/testify/require"
23         "golang.org/x/time/rate"
24
25         "github.com/anacrolix/torrent/bencode"
26         "github.com/anacrolix/torrent/internal/testutil"
27         "github.com/anacrolix/torrent/iplist"
28         "github.com/anacrolix/torrent/metainfo"
29         "github.com/anacrolix/torrent/storage"
30 )
31
32 func TestingConfig() *Config {
33         return &Config{
34                 ListenAddr:              "localhost:0",
35                 NoDHT:                   true,
36                 DataDir:                 tempDir(),
37                 DisableTrackers:         true,
38                 NoDefaultPortForwarding: true,
39                 // Debug:           true,
40         }
41 }
42
43 func TestClientDefault(t *testing.T) {
44         cl, err := NewClient(TestingConfig())
45         require.NoError(t, err)
46         cl.Close()
47 }
48
49 func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) {
50         cfg := TestingConfig()
51         pc, err := storage.NewBoltPieceCompletion(cfg.DataDir)
52         require.NoError(t, err)
53         ci := storage.NewFileWithCompletion(cfg.DataDir, pc)
54         defer ci.Close()
55         cfg.DefaultStorage = ci
56         cl, err := NewClient(cfg)
57         require.NoError(t, err)
58         cl.Close()
59         // And again, https://github.com/anacrolix/torrent/issues/158
60         cl, err = NewClient(cfg)
61         require.NoError(t, err)
62         cl.Close()
63 }
64
65 func TestAddDropTorrent(t *testing.T) {
66         cl, err := NewClient(TestingConfig())
67         require.NoError(t, err)
68         defer cl.Close()
69         dir, mi := testutil.GreetingTestTorrent()
70         defer os.RemoveAll(dir)
71         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
72         require.NoError(t, err)
73         assert.True(t, new)
74         tt.SetMaxEstablishedConns(0)
75         tt.SetMaxEstablishedConns(1)
76         tt.Drop()
77 }
78
79 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
80         // TODO?
81         t.SkipNow()
82 }
83
84 func TestAddTorrentNoUsableURLs(t *testing.T) {
85         // TODO?
86         t.SkipNow()
87 }
88
89 func TestAddPeersToUnknownTorrent(t *testing.T) {
90         // TODO?
91         t.SkipNow()
92 }
93
94 func TestPieceHashSize(t *testing.T) {
95         assert.Equal(t, 20, pieceHash.Size())
96 }
97
98 func TestTorrentInitialState(t *testing.T) {
99         dir, mi := testutil.GreetingTestTorrent()
100         defer os.RemoveAll(dir)
101         cl := &Client{}
102         cl.initLogger()
103         tor := cl.newTorrent(
104                 mi.HashInfoBytes(),
105                 storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
106         )
107         tor.setChunkSize(2)
108         tor.cl.mu.Lock()
109         err := tor.setInfoBytes(mi.InfoBytes)
110         tor.cl.mu.Unlock()
111         require.NoError(t, err)
112         require.Len(t, tor.pieces, 3)
113         tor.pendAllChunkSpecs(0)
114         tor.cl.mu.Lock()
115         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
116         tor.cl.mu.Unlock()
117         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
118 }
119
120 func TestUnmarshalPEXMsg(t *testing.T) {
121         var m peerExchangeMessage
122         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
123                 t.Fatal(err)
124         }
125         if len(m.Added) != 2 {
126                 t.FailNow()
127         }
128         if m.Added[0].Port != 0x506 {
129                 t.FailNow()
130         }
131 }
132
133 func TestReducedDialTimeout(t *testing.T) {
134         cfg := &Config{}
135         cfg.setDefaults()
136         for _, _case := range []struct {
137                 Max             time.Duration
138                 HalfOpenLimit   int
139                 PendingPeers    int
140                 ExpectedReduced time.Duration
141         }{
142                 {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout},
143                 {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout},
144                 {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout},
145                 {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2},
146                 {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3},
147                 {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101},
148         } {
149                 reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers)
150                 expected := _case.ExpectedReduced
151                 if expected < cfg.MinDialTimeout {
152                         expected = cfg.MinDialTimeout
153                 }
154                 if reduced != expected {
155                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
156                 }
157         }
158 }
159
160 func TestUTPRawConn(t *testing.T) {
161         l, err := NewUtpSocket("udp", "")
162         require.NoError(t, err)
163         defer l.Close()
164         go func() {
165                 for {
166                         _, err := l.Accept()
167                         if err != nil {
168                                 break
169                         }
170                 }
171         }()
172         // Connect a UTP peer to see if the RawConn will still work.
173         s, err := NewUtpSocket("udp", "")
174         require.NoError(t, err)
175         defer s.Close()
176         utpPeer, err := s.DialContext(context.Background(), "", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
177         require.NoError(t, err)
178         defer utpPeer.Close()
179         peer, err := net.ListenPacket("udp", ":0")
180         require.NoError(t, err)
181         defer peer.Close()
182
183         msgsReceived := 0
184         // How many messages to send. I've set this to double the channel buffer
185         // size in the raw packetConn.
186         const N = 200
187         readerStopped := make(chan struct{})
188         // The reader goroutine.
189         go func() {
190                 defer close(readerStopped)
191                 b := make([]byte, 500)
192                 for i := 0; i < N; i++ {
193                         n, _, err := l.ReadFrom(b)
194                         require.NoError(t, err)
195                         msgsReceived++
196                         var d int
197                         fmt.Sscan(string(b[:n]), &d)
198                         assert.Equal(t, i, d)
199                 }
200         }()
201         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
202         require.NoError(t, err)
203         for i := 0; i < N; i++ {
204                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
205                 require.NoError(t, err)
206                 time.Sleep(time.Millisecond)
207         }
208         select {
209         case <-readerStopped:
210         case <-time.After(time.Second):
211                 t.Fatal("reader timed out")
212         }
213         if msgsReceived != N {
214                 t.Fatalf("messages received: %d", msgsReceived)
215         }
216 }
217
218 func TestAddDropManyTorrents(t *testing.T) {
219         cl, err := NewClient(TestingConfig())
220         require.NoError(t, err)
221         defer cl.Close()
222         for i := range iter.N(1000) {
223                 var spec TorrentSpec
224                 binary.PutVarint(spec.InfoHash[:], int64(i))
225                 tt, new, err := cl.AddTorrentSpec(&spec)
226                 assert.NoError(t, err)
227                 assert.True(t, new)
228                 defer tt.Drop()
229         }
230 }
231
232 type FileCacheClientStorageFactoryParams struct {
233         Capacity    int64
234         SetCapacity bool
235         Wrapper     func(*filecache.Cache) storage.ClientImpl
236 }
237
238 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
239         return func(dataDir string) storage.ClientImpl {
240                 fc, err := filecache.NewCache(dataDir)
241                 if err != nil {
242                         panic(err)
243                 }
244                 if ps.SetCapacity {
245                         fc.SetCapacity(ps.Capacity)
246                 }
247                 return ps.Wrapper(fc)
248         }
249 }
250
251 type storageFactory func(string) storage.ClientImpl
252
253 func TestClientTransferDefault(t *testing.T) {
254         testClientTransfer(t, testClientTransferParams{
255                 ExportClientStatus: true,
256                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
257                         Wrapper: fileCachePieceResourceStorage,
258                 }),
259         })
260 }
261
262 func TestClientTransferRateLimitedUpload(t *testing.T) {
263         started := time.Now()
264         testClientTransfer(t, testClientTransferParams{
265                 // We are uploading 13 bytes (the length of the greeting torrent). The
266                 // chunks are 2 bytes in length. Then the smallest burst we can run
267                 // with is 2. Time taken is (13-burst)/rate.
268                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
269                 ExportClientStatus:      true,
270         })
271         require.True(t, time.Since(started) > time.Second)
272 }
273
274 func TestClientTransferRateLimitedDownload(t *testing.T) {
275         testClientTransfer(t, testClientTransferParams{
276                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
277         })
278 }
279
280 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
281         return storage.NewResourcePieces(fc.AsResourceProvider())
282 }
283
284 func TestClientTransferSmallCache(t *testing.T) {
285         testClientTransfer(t, testClientTransferParams{
286                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
287                         SetCapacity: true,
288                         // Going below the piece length means it can't complete a piece so
289                         // that it can be hashed.
290                         Capacity: 5,
291                         Wrapper:  fileCachePieceResourceStorage,
292                 }),
293                 SetReadahead: true,
294                 // Can't readahead too far or the cache will thrash and drop data we
295                 // thought we had.
296                 Readahead:          0,
297                 ExportClientStatus: true,
298         })
299 }
300
301 func TestClientTransferVarious(t *testing.T) {
302         // Leecher storage
303         for _, ls := range []storageFactory{
304                 NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
305                         Wrapper: fileCachePieceResourceStorage,
306                 }),
307                 storage.NewBoltDB,
308         } {
309                 // Seeder storage
310                 for _, ss := range []func(string) storage.ClientImpl{
311                         storage.NewFile,
312                         storage.NewMMap,
313                 } {
314                         for _, responsive := range []bool{false, true} {
315                                 testClientTransfer(t, testClientTransferParams{
316                                         Responsive:     responsive,
317                                         SeederStorage:  ss,
318                                         LeecherStorage: ls,
319                                 })
320                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
321                                         testClientTransfer(t, testClientTransferParams{
322                                                 SeederStorage:  ss,
323                                                 Responsive:     responsive,
324                                                 SetReadahead:   true,
325                                                 Readahead:      readahead,
326                                                 LeecherStorage: ls,
327                                         })
328                                 }
329                         }
330                 }
331         }
332 }
333
334 type testClientTransferParams struct {
335         Responsive                 bool
336         Readahead                  int64
337         SetReadahead               bool
338         ExportClientStatus         bool
339         LeecherStorage             func(string) storage.ClientImpl
340         SeederStorage              func(string) storage.ClientImpl
341         SeederUploadRateLimiter    *rate.Limiter
342         LeecherDownloadRateLimiter *rate.Limiter
343 }
344
345 // Creates a seeder and a leecher, and ensures the data transfers when a read
346 // is attempted on the leecher.
347 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
348         greetingTempDir, mi := testutil.GreetingTestTorrent()
349         defer os.RemoveAll(greetingTempDir)
350         // Create seeder and a Torrent.
351         cfg := TestingConfig()
352         cfg.Seed = true
353         cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
354         // cfg.ListenAddr = "localhost:4000"
355         if ps.SeederStorage != nil {
356                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
357                 defer cfg.DefaultStorage.Close()
358         } else {
359                 cfg.DataDir = greetingTempDir
360         }
361         seeder, err := NewClient(cfg)
362         require.NoError(t, err)
363         if ps.ExportClientStatus {
364                 testutil.ExportStatusWriter(seeder, "s")
365         }
366         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
367         // Run a Stats right after Closing the Client. This will trigger the Stats
368         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
369         defer seederTorrent.Stats()
370         defer seeder.Close()
371         seederTorrent.VerifyData()
372         // Create leecher and a Torrent.
373         leecherDataDir, err := ioutil.TempDir("", "")
374         require.NoError(t, err)
375         defer os.RemoveAll(leecherDataDir)
376         if ps.LeecherStorage == nil {
377                 cfg.DataDir = leecherDataDir
378         } else {
379                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
380         }
381         cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
382         cfg.Seed = false
383         leecher, err := NewClient(cfg)
384         require.NoError(t, err)
385         defer leecher.Close()
386         if ps.ExportClientStatus {
387                 testutil.ExportStatusWriter(leecher, "l")
388         }
389         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
390                 ret = TorrentSpecFromMetaInfo(mi)
391                 ret.ChunkSize = 2
392                 return
393         }())
394         require.NoError(t, err)
395         assert.True(t, new)
396         // Now do some things with leecher and seeder.
397         leecherTorrent.AddClientPeer(seeder)
398         // The Torrent should not be interested in obtaining peers, so the one we
399         // just added should be the only one.
400         assert.False(t, leecherTorrent.Seeding())
401         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
402         r := leecherTorrent.NewReader()
403         defer r.Close()
404         if ps.Responsive {
405                 r.SetResponsive()
406         }
407         if ps.SetReadahead {
408                 r.SetReadahead(ps.Readahead)
409         }
410         assertReadAllGreeting(t, r)
411         assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData)
412         assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten)
413         assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData)
414         assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead)
415         // Try reading through again for the cases where the torrent data size
416         // exceeds the size of the cache.
417         assertReadAllGreeting(t, r)
418 }
419
420 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
421         pos, err := r.Seek(0, io.SeekStart)
422         assert.NoError(t, err)
423         assert.EqualValues(t, 0, pos)
424         _greeting, err := ioutil.ReadAll(r)
425         assert.NoError(t, err)
426         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
427 }
428
429 // Check that after completing leeching, a leecher transitions to a seeding
430 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
431 func TestSeedAfterDownloading(t *testing.T) {
432         greetingTempDir, mi := testutil.GreetingTestTorrent()
433         defer os.RemoveAll(greetingTempDir)
434         cfg := TestingConfig()
435         cfg.Seed = true
436         cfg.DataDir = greetingTempDir
437         seeder, err := NewClient(cfg)
438         require.NoError(t, err)
439         defer seeder.Close()
440         testutil.ExportStatusWriter(seeder, "s")
441         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
442         seederTorrent.VerifyData()
443         cfg.DataDir, err = ioutil.TempDir("", "")
444         require.NoError(t, err)
445         defer os.RemoveAll(cfg.DataDir)
446         leecher, err := NewClient(cfg)
447         require.NoError(t, err)
448         defer leecher.Close()
449         testutil.ExportStatusWriter(leecher, "l")
450         cfg.Seed = false
451         cfg.DataDir, err = ioutil.TempDir("", "")
452         require.NoError(t, err)
453         defer os.RemoveAll(cfg.DataDir)
454         leecherLeecher, _ := NewClient(cfg)
455         require.NoError(t, err)
456         defer leecherLeecher.Close()
457         testutil.ExportStatusWriter(leecherLeecher, "ll")
458         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
459                 ret = TorrentSpecFromMetaInfo(mi)
460                 ret.ChunkSize = 2
461                 return
462         }())
463         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
464                 ret = TorrentSpecFromMetaInfo(mi)
465                 ret.ChunkSize = 3
466                 return
467         }())
468         // Simultaneously DownloadAll in Leecher, and read the contents
469         // consecutively in LeecherLeecher. This non-deterministically triggered a
470         // case where the leecher wouldn't unchoke the LeecherLeecher.
471         var wg sync.WaitGroup
472         wg.Add(1)
473         go func() {
474                 defer wg.Done()
475                 r := llg.NewReader()
476                 defer r.Close()
477                 b, err := ioutil.ReadAll(r)
478                 require.NoError(t, err)
479                 assert.EqualValues(t, testutil.GreetingFileContents, b)
480         }()
481         leecherGreeting.AddClientPeer(seeder)
482         leecherGreeting.AddClientPeer(leecherLeecher)
483         wg.Add(1)
484         go func() {
485                 defer wg.Done()
486                 leecherGreeting.DownloadAll()
487                 leecher.WaitAll()
488         }()
489         wg.Wait()
490 }
491
492 func TestMergingTrackersByAddingSpecs(t *testing.T) {
493         cl, err := NewClient(TestingConfig())
494         require.NoError(t, err)
495         defer cl.Close()
496         spec := TorrentSpec{}
497         T, new, _ := cl.AddTorrentSpec(&spec)
498         if !new {
499                 t.FailNow()
500         }
501         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
502         _, new, _ = cl.AddTorrentSpec(&spec)
503         assert.False(t, new)
504         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
505         // Because trackers are disabled in TestingConfig.
506         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
507 }
508
509 // We read from a piece which is marked completed, but is missing data.
510 func TestCompletedPieceWrongSize(t *testing.T) {
511         cfg := TestingConfig()
512         cfg.DefaultStorage = badStorage{}
513         cl, err := NewClient(cfg)
514         require.NoError(t, err)
515         defer cl.Close()
516         info := metainfo.Info{
517                 PieceLength: 15,
518                 Pieces:      make([]byte, 20),
519                 Files: []metainfo.FileInfo{
520                         {Path: []string{"greeting"}, Length: 13},
521                 },
522         }
523         b, err := bencode.Marshal(info)
524         require.NoError(t, err)
525         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
526                 InfoBytes: b,
527                 InfoHash:  metainfo.HashBytes(b),
528         })
529         require.NoError(t, err)
530         defer tt.Drop()
531         assert.True(t, new)
532         r := tt.NewReader()
533         defer r.Close()
534         b, err = ioutil.ReadAll(r)
535         assert.Len(t, b, 13)
536         assert.NoError(t, err)
537 }
538
539 func BenchmarkAddLargeTorrent(b *testing.B) {
540         cfg := TestingConfig()
541         cfg.DisableTCP = true
542         cfg.DisableUTP = true
543         cfg.ListenAddr = "redonk"
544         cl, err := NewClient(cfg)
545         require.NoError(b, err)
546         defer cl.Close()
547         for range iter.N(b.N) {
548                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
549                 if err != nil {
550                         b.Fatal(err)
551                 }
552                 t.Drop()
553         }
554 }
555
556 func TestResponsive(t *testing.T) {
557         seederDataDir, mi := testutil.GreetingTestTorrent()
558         defer os.RemoveAll(seederDataDir)
559         cfg := TestingConfig()
560         cfg.Seed = true
561         cfg.DataDir = seederDataDir
562         seeder, err := NewClient(cfg)
563         require.Nil(t, err)
564         defer seeder.Close()
565         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
566         seederTorrent.VerifyData()
567         leecherDataDir, err := ioutil.TempDir("", "")
568         require.Nil(t, err)
569         defer os.RemoveAll(leecherDataDir)
570         cfg = TestingConfig()
571         cfg.DataDir = leecherDataDir
572         leecher, err := NewClient(cfg)
573         require.Nil(t, err)
574         defer leecher.Close()
575         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
576                 ret = TorrentSpecFromMetaInfo(mi)
577                 ret.ChunkSize = 2
578                 return
579         }())
580         leecherTorrent.AddClientPeer(seeder)
581         reader := leecherTorrent.NewReader()
582         defer reader.Close()
583         reader.SetReadahead(0)
584         reader.SetResponsive()
585         b := make([]byte, 2)
586         _, err = reader.Seek(3, io.SeekStart)
587         require.NoError(t, err)
588         _, err = io.ReadFull(reader, b)
589         assert.Nil(t, err)
590         assert.EqualValues(t, "lo", string(b))
591         _, err = reader.Seek(11, io.SeekStart)
592         require.NoError(t, err)
593         n, err := io.ReadFull(reader, b)
594         assert.Nil(t, err)
595         assert.EqualValues(t, 2, n)
596         assert.EqualValues(t, "d\n", string(b))
597 }
598
599 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
600         seederDataDir, mi := testutil.GreetingTestTorrent()
601         defer os.RemoveAll(seederDataDir)
602         cfg := TestingConfig()
603         cfg.Seed = true
604         cfg.DataDir = seederDataDir
605         seeder, err := NewClient(cfg)
606         require.Nil(t, err)
607         defer seeder.Close()
608         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
609         seederTorrent.VerifyData()
610         leecherDataDir, err := ioutil.TempDir("", "")
611         require.Nil(t, err)
612         defer os.RemoveAll(leecherDataDir)
613         cfg = TestingConfig()
614         cfg.DataDir = leecherDataDir
615         leecher, err := NewClient(cfg)
616         require.Nil(t, err)
617         defer leecher.Close()
618         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
619                 ret = TorrentSpecFromMetaInfo(mi)
620                 ret.ChunkSize = 2
621                 return
622         }())
623         leecherTorrent.AddClientPeer(seeder)
624         reader := leecherTorrent.NewReader()
625         defer reader.Close()
626         reader.SetReadahead(0)
627         reader.SetResponsive()
628         b := make([]byte, 2)
629         _, err = reader.Seek(3, io.SeekStart)
630         require.NoError(t, err)
631         _, err = io.ReadFull(reader, b)
632         assert.Nil(t, err)
633         assert.EqualValues(t, "lo", string(b))
634         go leecherTorrent.Drop()
635         _, err = reader.Seek(11, io.SeekStart)
636         require.NoError(t, err)
637         n, err := reader.Read(b)
638         assert.EqualError(t, err, "torrent closed")
639         assert.EqualValues(t, 0, n)
640 }
641
642 func TestDHTInheritBlocklist(t *testing.T) {
643         ipl := iplist.New(nil)
644         require.NotNil(t, ipl)
645         cfg := TestingConfig()
646         cfg.IPBlocklist = ipl
647         cfg.NoDHT = false
648         cl, err := NewClient(cfg)
649         require.NoError(t, err)
650         defer cl.Close()
651         numServers := 0
652         cl.eachDhtServer(func(s *dht.Server) {
653                 assert.Equal(t, ipl, s.IPBlocklist())
654                 numServers++
655         })
656         assert.EqualValues(t, 2, numServers)
657 }
658
659 // Check that stuff is merged in subsequent AddTorrentSpec for the same
660 // infohash.
661 func TestAddTorrentSpecMerging(t *testing.T) {
662         cl, err := NewClient(TestingConfig())
663         require.NoError(t, err)
664         defer cl.Close()
665         dir, mi := testutil.GreetingTestTorrent()
666         defer os.RemoveAll(dir)
667         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
668                 InfoHash: mi.HashInfoBytes(),
669         })
670         require.NoError(t, err)
671         require.True(t, new)
672         require.Nil(t, tt.Info())
673         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
674         require.NoError(t, err)
675         require.False(t, new)
676         require.NotNil(t, tt.Info())
677 }
678
679 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
680         dir, mi := testutil.GreetingTestTorrent()
681         os.RemoveAll(dir)
682         cl, _ := NewClient(TestingConfig())
683         defer cl.Close()
684         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
685                 InfoHash: mi.HashInfoBytes(),
686         })
687         tt.Drop()
688         assert.EqualValues(t, 0, len(cl.Torrents()))
689         select {
690         case <-tt.GotInfo():
691                 t.FailNow()
692         default:
693         }
694 }
695
696 func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
697         for i := range iter.N(info.NumPieces()) {
698                 p := info.Piece(i)
699                 ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
700         }
701 }
702
703 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
704         fileCacheDir, err := ioutil.TempDir("", "")
705         require.NoError(t, err)
706         defer os.RemoveAll(fileCacheDir)
707         fileCache, err := filecache.NewCache(fileCacheDir)
708         require.NoError(t, err)
709         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
710         defer os.RemoveAll(greetingDataTempDir)
711         filePieceStore := csf(fileCache)
712         defer filePieceStore.Close()
713         info, err := greetingMetainfo.UnmarshalInfo()
714         require.NoError(t, err)
715         ih := greetingMetainfo.HashInfoBytes()
716         greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
717         require.NoError(t, err)
718         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
719         // require.Equal(t, len(testutil.GreetingFileContents), written)
720         // require.NoError(t, err)
721         for i := 0; i < info.NumPieces(); i++ {
722                 p := info.Piece(i)
723                 if alreadyCompleted {
724                         require.NoError(t, greetingData.Piece(p).MarkComplete())
725                 }
726         }
727         cfg := TestingConfig()
728         // TODO: Disable network option?
729         cfg.DisableTCP = true
730         cfg.DisableUTP = true
731         cfg.DefaultStorage = filePieceStore
732         cl, err := NewClient(cfg)
733         require.NoError(t, err)
734         defer cl.Close()
735         tt, err := cl.AddTorrent(greetingMetainfo)
736         require.NoError(t, err)
737         psrs := tt.PieceStateRuns()
738         assert.Len(t, psrs, 1)
739         assert.EqualValues(t, 3, psrs[0].Length)
740         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
741         if alreadyCompleted {
742                 r := tt.NewReader()
743                 b, err := ioutil.ReadAll(r)
744                 assert.NoError(t, err)
745                 assert.EqualValues(t, testutil.GreetingFileContents, b)
746         }
747 }
748
749 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
750         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
751 }
752
753 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
754         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
755 }
756
757 func TestAddMetainfoWithNodes(t *testing.T) {
758         cfg := TestingConfig()
759         cfg.ListenAddr = ":0"
760         cfg.NoDHT = false
761         cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
762         // For now, we want to just jam the nodes into the table, without
763         // verifying them first. Also the DHT code doesn't support mixing secure
764         // and insecure nodes if security is enabled (yet).
765         // cfg.DHTConfig.NoSecurity = true
766         cl, err := NewClient(cfg)
767         require.NoError(t, err)
768         defer cl.Close()
769         sum := func() (ret int) {
770                 cl.eachDhtServer(func(s *dht.Server) {
771                         ret += s.NumNodes()
772                         ret += s.Stats().OutstandingTransactions
773                 })
774                 return
775         }
776         assert.EqualValues(t, 0, sum())
777         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
778         require.NoError(t, err)
779         // Nodes are not added or exposed in Torrent's metainfo. We just randomly
780         // check if the announce-list is here instead. TODO: Add nodes.
781         assert.Len(t, tt.metainfo.AnnounceList, 5)
782         // There are 6 nodes in the torrent file.
783         assert.EqualValues(t, 6*len(cl.dhtServers), sum())
784 }
785
786 type testDownloadCancelParams struct {
787         ExportClientStatus        bool
788         SetLeecherStorageCapacity bool
789         LeecherStorageCapacity    int64
790         Cancel                    bool
791 }
792
793 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
794         greetingTempDir, mi := testutil.GreetingTestTorrent()
795         defer os.RemoveAll(greetingTempDir)
796         cfg := TestingConfig()
797         cfg.Seed = true
798         cfg.DataDir = greetingTempDir
799         seeder, err := NewClient(cfg)
800         require.NoError(t, err)
801         defer seeder.Close()
802         if ps.ExportClientStatus {
803                 testutil.ExportStatusWriter(seeder, "s")
804         }
805         seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
806         seederTorrent.VerifyData()
807         leecherDataDir, err := ioutil.TempDir("", "")
808         require.NoError(t, err)
809         defer os.RemoveAll(leecherDataDir)
810         fc, err := filecache.NewCache(leecherDataDir)
811         require.NoError(t, err)
812         if ps.SetLeecherStorageCapacity {
813                 fc.SetCapacity(ps.LeecherStorageCapacity)
814         }
815         cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider())
816         cfg.DataDir = leecherDataDir
817         leecher, _ := NewClient(cfg)
818         defer leecher.Close()
819         if ps.ExportClientStatus {
820                 testutil.ExportStatusWriter(leecher, "l")
821         }
822         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
823                 ret = TorrentSpecFromMetaInfo(mi)
824                 ret.ChunkSize = 2
825                 return
826         }())
827         require.NoError(t, err)
828         assert.True(t, new)
829         psc := leecherGreeting.SubscribePieceStateChanges()
830         defer psc.Close()
831
832         leecherGreeting.cl.mu.Lock()
833         leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
834         if ps.Cancel {
835                 leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
836         }
837         leecherGreeting.cl.mu.Unlock()
838
839         leecherGreeting.AddClientPeer(seeder)
840         completes := make(map[int]bool, 3)
841 values:
842         for {
843                 // started := time.Now()
844                 select {
845                 case _v := <-psc.Values:
846                         // log.Print(time.Since(started))
847                         v := _v.(PieceStateChange)
848                         completes[v.Index] = v.Complete
849                 case <-time.After(100 * time.Millisecond):
850                         break values
851                 }
852         }
853         if ps.Cancel {
854                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
855         } else {
856                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
857         }
858
859 }
860
861 func TestTorrentDownloadAll(t *testing.T) {
862         testDownloadCancel(t, testDownloadCancelParams{})
863 }
864
865 func TestTorrentDownloadAllThenCancel(t *testing.T) {
866         testDownloadCancel(t, testDownloadCancelParams{
867                 Cancel: true,
868         })
869 }
870
871 // Ensure that it's an error for a peer to send an invalid have message.
872 func TestPeerInvalidHave(t *testing.T) {
873         cl, err := NewClient(TestingConfig())
874         require.NoError(t, err)
875         defer cl.Close()
876         info := metainfo.Info{
877                 PieceLength: 1,
878                 Pieces:      make([]byte, 20),
879                 Files:       []metainfo.FileInfo{{Length: 1}},
880         }
881         infoBytes, err := bencode.Marshal(info)
882         require.NoError(t, err)
883         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
884                 InfoBytes: infoBytes,
885                 InfoHash:  metainfo.HashBytes(infoBytes),
886                 Storage:   badStorage{},
887         })
888         require.NoError(t, err)
889         assert.True(t, _new)
890         defer tt.Drop()
891         cn := &connection{
892                 t: tt,
893         }
894         assert.NoError(t, cn.peerSentHave(0))
895         assert.Error(t, cn.peerSentHave(1))
896 }
897
898 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
899         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
900         defer os.RemoveAll(greetingTempDir)
901         cfg := TestingConfig()
902         cfg.DataDir = greetingTempDir
903         seeder, err := NewClient(TestingConfig())
904         require.NoError(t, err)
905         seeder.AddTorrentSpec(&TorrentSpec{
906                 InfoBytes: greetingMetainfo.InfoBytes,
907         })
908 }
909
910 // Check that when the listen port is 0, all the protocols listened on have
911 // the same port, and it isn't zero.
912 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
913         cl, err := NewClient(TestingConfig())
914         require.NoError(t, err)
915         defer cl.Close()
916         port := cl.LocalPort()
917         assert.NotEqual(t, 0, port)
918         cl.eachListener(func(s socket) bool {
919                 assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
920                 return true
921         })
922 }
923
924 func TestClientDynamicListenTCPOnly(t *testing.T) {
925         cfg := TestingConfig()
926         cfg.DisableUTP = true
927         cl, err := NewClient(cfg)
928         require.NoError(t, err)
929         defer cl.Close()
930         assert.NotEqual(t, 0, cl.LocalPort())
931         cl.eachListener(func(s socket) bool {
932                 assert.True(t, isTcpNetwork(s.Addr().Network()))
933                 return true
934         })
935 }
936
937 func TestClientDynamicListenUTPOnly(t *testing.T) {
938         cfg := TestingConfig()
939         cfg.DisableTCP = true
940         cl, err := NewClient(cfg)
941         require.NoError(t, err)
942         defer cl.Close()
943         assert.NotEqual(t, 0, cl.LocalPort())
944         cl.eachListener(func(s socket) bool {
945                 assert.True(t, isUtpNetwork(s.Addr().Network()))
946                 return true
947         })
948 }
949
950 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
951         cfg := TestingConfig()
952         cfg.DisableTCP = true
953         cfg.DisableUTP = true
954         cl, err := NewClient(cfg)
955         require.NoError(t, err)
956         defer cl.Close()
957         assert.Equal(t, 0, cl.LocalPort())
958 }
959
960 func totalConns(tts []*Torrent) (ret int) {
961         for _, tt := range tts {
962                 tt.cl.mu.Lock()
963                 ret += len(tt.conns)
964                 tt.cl.mu.Unlock()
965         }
966         return
967 }
968
969 func TestSetMaxEstablishedConn(t *testing.T) {
970         ss := testutil.NewStatusServer(t)
971         defer ss.Close()
972         var tts []*Torrent
973         ih := testutil.GreetingMetaInfo().HashInfoBytes()
974         for i := range iter.N(3) {
975                 cl, err := NewClient(TestingConfig())
976                 require.NoError(t, err)
977                 defer cl.Close()
978                 tt, _ := cl.AddTorrentInfoHash(ih)
979                 tt.SetMaxEstablishedConns(2)
980                 ss.HandleStatusWriter(cl, fmt.Sprintf("/%d", i))
981                 tts = append(tts, tt)
982         }
983         addPeers := func() {
984                 for _, tt := range tts {
985                         for _, _tt := range tts {
986                                 // if tt != _tt {
987                                 tt.AddClientPeer(_tt.cl)
988                                 // }
989                         }
990                 }
991         }
992         waitTotalConns := func(num int) {
993                 for totalConns(tts) != num {
994                         addPeers()
995                         time.Sleep(time.Millisecond)
996                 }
997         }
998         addPeers()
999         waitTotalConns(6)
1000         tts[0].SetMaxEstablishedConns(1)
1001         waitTotalConns(4)
1002         tts[0].SetMaxEstablishedConns(0)
1003         waitTotalConns(2)
1004         tts[0].SetMaxEstablishedConns(1)
1005         addPeers()
1006         waitTotalConns(4)
1007         tts[0].SetMaxEstablishedConns(2)
1008         addPeers()
1009         waitTotalConns(6)
1010 }
1011
1012 func makeMagnet(t *testing.T, cl *Client, dir string, name string) string {
1013         os.MkdirAll(dir, 0770)
1014         file, err := os.Create(filepath.Join(dir, name))
1015         require.NoError(t, err)
1016         file.Write([]byte(name))
1017         file.Close()
1018         mi := metainfo.MetaInfo{}
1019         mi.SetDefaults()
1020         info := metainfo.Info{PieceLength: 256 * 1024}
1021         err = info.BuildFromFilePath(filepath.Join(dir, name))
1022         require.NoError(t, err)
1023         mi.InfoBytes, err = bencode.Marshal(info)
1024         require.NoError(t, err)
1025         magnet := mi.Magnet(name, mi.HashInfoBytes()).String()
1026         tr, err := cl.AddTorrent(&mi)
1027         require.NoError(t, err)
1028         require.True(t, tr.Seeding())
1029         tr.VerifyData()
1030         return magnet
1031 }
1032
1033 // https://github.com/anacrolix/torrent/issues/114
1034 func TestMultipleTorrentsWithEncryption(t *testing.T) {
1035         cfg := TestingConfig()
1036         cfg.DisableUTP = true
1037         cfg.Seed = true
1038         cfg.DataDir = filepath.Join(cfg.DataDir, "server")
1039         cfg.ForceEncryption = true
1040         os.Mkdir(cfg.DataDir, 0755)
1041         server, err := NewClient(cfg)
1042         require.NoError(t, err)
1043         defer server.Close()
1044         testutil.ExportStatusWriter(server, "s")
1045         magnet1 := makeMagnet(t, server, cfg.DataDir, "test1")
1046         makeMagnet(t, server, cfg.DataDir, "test2")
1047         cfg = TestingConfig()
1048         cfg.DisableUTP = true
1049         cfg.DataDir = filepath.Join(cfg.DataDir, "client")
1050         cfg.ForceEncryption = true
1051         client, err := NewClient(cfg)
1052         require.NoError(t, err)
1053         defer client.Close()
1054         testutil.ExportStatusWriter(client, "c")
1055         tr, err := client.AddMagnet(magnet1)
1056         require.NoError(t, err)
1057         tr.AddClientPeer(server)
1058         <-tr.GotInfo()
1059         tr.DownloadAll()
1060         client.WaitAll()
1061 }
1062
1063 func TestClientAddressInUse(t *testing.T) {
1064         s, _ := NewUtpSocket("udp", ":50007")
1065         if s != nil {
1066                 defer s.Close()
1067         }
1068         cfg := TestingConfig()
1069         cfg.ListenAddr = ":50007"
1070         cl, err := NewClient(cfg)
1071         require.Error(t, err)
1072         require.Nil(t, cl)
1073 }