]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Probably fix test failure
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "math/rand"
11         "net"
12         "os"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/anacrolix/utp"
23         "github.com/bradfitz/iter"
24         "github.com/stretchr/testify/assert"
25         "github.com/stretchr/testify/require"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/dht"
29         "github.com/anacrolix/torrent/internal/testutil"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32         "github.com/anacrolix/torrent/storage"
33 )
34
35 func init() {
36         log.SetFlags(log.LstdFlags | log.Llongfile)
37 }
38
39 var TestingConfig = Config{
40         ListenAddr:      "localhost:0",
41         NoDHT:           true,
42         DisableTrackers: true,
43         DataDir:         "/dev/null",
44         DHTConfig: dht.ServerConfig{
45                 NoDefaultBootstrap: true,
46         },
47 }
48
49 func TestClientDefault(t *testing.T) {
50         cl, err := NewClient(&TestingConfig)
51         require.NoError(t, err)
52         cl.Close()
53 }
54
55 func TestAddDropTorrent(t *testing.T) {
56         cl, err := NewClient(&TestingConfig)
57         require.NoError(t, err)
58         defer cl.Close()
59         dir, mi := testutil.GreetingTestTorrent()
60         defer os.RemoveAll(dir)
61         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
62         require.NoError(t, err)
63         assert.True(t, new)
64         tt.Drop()
65 }
66
67 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
68         t.SkipNow()
69 }
70
71 func TestAddTorrentNoUsableURLs(t *testing.T) {
72         t.SkipNow()
73 }
74
75 func TestAddPeersToUnknownTorrent(t *testing.T) {
76         t.SkipNow()
77 }
78
79 func TestPieceHashSize(t *testing.T) {
80         if pieceHash.Size() != 20 {
81                 t.FailNow()
82         }
83 }
84
85 func TestTorrentInitialState(t *testing.T) {
86         dir, mi := testutil.GreetingTestTorrent()
87         defer os.RemoveAll(dir)
88         tor := &Torrent{
89                 infoHash:          mi.Info.Hash(),
90                 pieceStateChanges: pubsub.NewPubSub(),
91         }
92         tor.chunkSize = 2
93         tor.storageOpener = storage.NewFile("/dev/null")
94         // Needed to lock for asynchronous piece verification.
95         tor.cl = new(Client)
96         err := tor.setInfoBytes(mi.Info.Bytes)
97         require.NoError(t, err)
98         require.Len(t, tor.pieces, 3)
99         tor.pendAllChunkSpecs(0)
100         tor.cl.mu.Lock()
101         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
102         tor.cl.mu.Unlock()
103         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
104 }
105
106 func TestUnmarshalPEXMsg(t *testing.T) {
107         var m peerExchangeMessage
108         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
109                 t.Fatal(err)
110         }
111         if len(m.Added) != 2 {
112                 t.FailNow()
113         }
114         if m.Added[0].Port != 0x506 {
115                 t.FailNow()
116         }
117 }
118
119 func TestReducedDialTimeout(t *testing.T) {
120         for _, _case := range []struct {
121                 Max             time.Duration
122                 HalfOpenLimit   int
123                 PendingPeers    int
124                 ExpectedReduced time.Duration
125         }{
126                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
127                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
128                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
129                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
130                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
131                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
132         } {
133                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
134                 expected := _case.ExpectedReduced
135                 if expected < minDialTimeout {
136                         expected = minDialTimeout
137                 }
138                 if reduced != expected {
139                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
140                 }
141         }
142 }
143
144 func TestUTPRawConn(t *testing.T) {
145         l, err := utp.NewSocket("udp", "")
146         if err != nil {
147                 t.Fatal(err)
148         }
149         defer l.Close()
150         go func() {
151                 for {
152                         _, err := l.Accept()
153                         if err != nil {
154                                 break
155                         }
156                 }
157         }()
158         // Connect a UTP peer to see if the RawConn will still work.
159         s, _ := utp.NewSocket("udp", "")
160         defer s.Close()
161         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
162         if err != nil {
163                 t.Fatalf("error dialing utp listener: %s", err)
164         }
165         defer utpPeer.Close()
166         peer, err := net.ListenPacket("udp", ":0")
167         if err != nil {
168                 t.Fatal(err)
169         }
170         defer peer.Close()
171
172         msgsReceived := 0
173         // How many messages to send. I've set this to double the channel buffer
174         // size in the raw packetConn.
175         const N = 200
176         readerStopped := make(chan struct{})
177         // The reader goroutine.
178         go func() {
179                 defer close(readerStopped)
180                 b := make([]byte, 500)
181                 for i := 0; i < N; i++ {
182                         n, _, err := l.ReadFrom(b)
183                         if err != nil {
184                                 t.Fatalf("error reading from raw conn: %s", err)
185                         }
186                         msgsReceived++
187                         var d int
188                         fmt.Sscan(string(b[:n]), &d)
189                         if d != i {
190                                 log.Printf("got wrong number: expected %d, got %d", i, d)
191                         }
192                 }
193         }()
194         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
195         if err != nil {
196                 t.Fatal(err)
197         }
198         for i := 0; i < N; i++ {
199                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
200                 if err != nil {
201                         t.Fatal(err)
202                 }
203                 time.Sleep(time.Microsecond)
204         }
205         select {
206         case <-readerStopped:
207         case <-time.After(time.Second):
208                 t.Fatal("reader timed out")
209         }
210         if msgsReceived != N {
211                 t.Fatalf("messages received: %d", msgsReceived)
212         }
213 }
214
215 func TestTwoClientsArbitraryPorts(t *testing.T) {
216         for i := 0; i < 2; i++ {
217                 cl, err := NewClient(&TestingConfig)
218                 if err != nil {
219                         t.Fatal(err)
220                 }
221                 defer cl.Close()
222         }
223 }
224
225 func TestAddDropManyTorrents(t *testing.T) {
226         cl, err := NewClient(&TestingConfig)
227         require.NoError(t, err)
228         defer cl.Close()
229         for i := range iter.N(1000) {
230                 var spec TorrentSpec
231                 binary.PutVarint(spec.InfoHash[:], int64(i))
232                 tt, new, err := cl.AddTorrentSpec(&spec)
233                 assert.NoError(t, err)
234                 assert.True(t, new)
235                 defer tt.Drop()
236         }
237 }
238
239 func TestClientTransferDefault(t *testing.T) {
240         testClientTransfer(t, testClientTransferParams{
241                 ExportClientStatus: true,
242         })
243 }
244
245 func TestClientTransferSmallCache(t *testing.T) {
246         testClientTransfer(t, testClientTransferParams{
247                 SetLeecherStorageCapacity: true,
248                 // Going below the piece length means it can't complete a piece so
249                 // that it can be hashed.
250                 LeecherStorageCapacity: 5,
251                 SetReadahead:           true,
252                 // Can't readahead too far or the cache will thrash and drop data we
253                 // thought we had.
254                 Readahead:          0,
255                 ExportClientStatus: true,
256         })
257 }
258
259 func TestClientTransferVarious(t *testing.T) {
260         for _, ss := range []func(string) storage.I{
261                 storage.NewFile,
262                 storage.NewMMap,
263         } {
264                 for _, responsive := range []bool{false, true} {
265                         testClientTransfer(t, testClientTransferParams{
266                                 Responsive:    responsive,
267                                 SeederStorage: ss,
268                         })
269                         for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
270                                 testClientTransfer(t, testClientTransferParams{
271                                         SeederStorage: ss,
272                                         Responsive:    responsive,
273                                         SetReadahead:  true,
274                                         Readahead:     readahead,
275                                 })
276                         }
277                 }
278         }
279 }
280
281 type testClientTransferParams struct {
282         Responsive                bool
283         Readahead                 int64
284         SetReadahead              bool
285         ExportClientStatus        bool
286         SetLeecherStorageCapacity bool
287         LeecherStorageCapacity    int64
288         SeederStorage             func(string) storage.I
289 }
290
291 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
292         greetingTempDir, mi := testutil.GreetingTestTorrent()
293         defer os.RemoveAll(greetingTempDir)
294         cfg := TestingConfig
295         cfg.Seed = true
296         if ps.SeederStorage != nil {
297                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
298         } else {
299                 cfg.DataDir = greetingTempDir
300         }
301         seeder, err := NewClient(&cfg)
302         require.NoError(t, err)
303         defer seeder.Close()
304         if ps.ExportClientStatus {
305                 testutil.ExportStatusWriter(seeder, "s")
306         }
307         _, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
308         require.NoError(t, err)
309         assert.True(t, new)
310         leecherDataDir, err := ioutil.TempDir("", "")
311         require.NoError(t, err)
312         defer os.RemoveAll(leecherDataDir)
313         fc, err := filecache.NewCache(leecherDataDir)
314         require.NoError(t, err)
315         if ps.SetLeecherStorageCapacity {
316                 fc.SetCapacity(ps.LeecherStorageCapacity)
317         }
318         cfg.DefaultStorage = storage.NewPieceFileStorage(fc.AsFileStore())
319         leecher, err := NewClient(&cfg)
320         require.NoError(t, err)
321         defer leecher.Close()
322         if ps.ExportClientStatus {
323                 testutil.ExportStatusWriter(leecher, "l")
324         }
325         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
326                 ret = TorrentSpecFromMetaInfo(mi)
327                 ret.ChunkSize = 2
328                 ret.Storage = storage.NewFile(leecherDataDir)
329                 return
330         }())
331         require.NoError(t, err)
332         assert.True(t, new)
333         leecherGreeting.AddPeers([]Peer{
334                 Peer{
335                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
336                         Port: missinggo.AddrPort(seeder.ListenAddr()),
337                 },
338         })
339         r := leecherGreeting.NewReader()
340         defer r.Close()
341         if ps.Responsive {
342                 r.SetResponsive()
343         }
344         if ps.SetReadahead {
345                 r.SetReadahead(ps.Readahead)
346         }
347         for range iter.N(2) {
348                 pos, err := r.Seek(0, os.SEEK_SET)
349                 assert.NoError(t, err)
350                 assert.EqualValues(t, 0, pos)
351                 _greeting, err := ioutil.ReadAll(r)
352                 assert.NoError(t, err)
353                 assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
354         }
355 }
356
357 // Check that after completing leeching, a leecher transitions to a seeding
358 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
359 func TestSeedAfterDownloading(t *testing.T) {
360         greetingTempDir, mi := testutil.GreetingTestTorrent()
361         defer os.RemoveAll(greetingTempDir)
362         cfg := TestingConfig
363         cfg.Seed = true
364         cfg.DataDir = greetingTempDir
365         seeder, err := NewClient(&cfg)
366         require.NoError(t, err)
367         defer seeder.Close()
368         testutil.ExportStatusWriter(seeder, "s")
369         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
370         cfg.DataDir, err = ioutil.TempDir("", "")
371         require.NoError(t, err)
372         defer os.RemoveAll(cfg.DataDir)
373         leecher, err := NewClient(&cfg)
374         require.NoError(t, err)
375         defer leecher.Close()
376         testutil.ExportStatusWriter(leecher, "l")
377         cfg.Seed = false
378         // cfg.TorrentDataOpener = nil
379         cfg.DataDir, err = ioutil.TempDir("", "")
380         require.NoError(t, err)
381         defer os.RemoveAll(cfg.DataDir)
382         leecherLeecher, _ := NewClient(&cfg)
383         defer leecherLeecher.Close()
384         testutil.ExportStatusWriter(leecherLeecher, "ll")
385         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
386                 ret = TorrentSpecFromMetaInfo(mi)
387                 ret.ChunkSize = 2
388                 return
389         }())
390         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
391                 ret = TorrentSpecFromMetaInfo(mi)
392                 ret.ChunkSize = 3
393                 return
394         }())
395         // Simultaneously DownloadAll in Leecher, and read the contents
396         // consecutively in LeecherLeecher. This non-deterministically triggered a
397         // case where the leecher wouldn't unchoke the LeecherLeecher.
398         var wg sync.WaitGroup
399         wg.Add(1)
400         go func() {
401                 defer wg.Done()
402                 r := llg.NewReader()
403                 defer r.Close()
404                 b, err := ioutil.ReadAll(r)
405                 require.NoError(t, err)
406                 assert.EqualValues(t, testutil.GreetingFileContents, b)
407         }()
408         leecherGreeting.AddPeers([]Peer{
409                 Peer{
410                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
411                         Port: missinggo.AddrPort(seeder.ListenAddr()),
412                 },
413                 Peer{
414                         IP:   missinggo.AddrIP(leecherLeecher.ListenAddr()),
415                         Port: missinggo.AddrPort(leecherLeecher.ListenAddr()),
416                 },
417         })
418         wg.Add(1)
419         go func() {
420                 defer wg.Done()
421                 leecherGreeting.DownloadAll()
422                 leecher.WaitAll()
423         }()
424         wg.Wait()
425 }
426
427 func TestMergingTrackersByAddingSpecs(t *testing.T) {
428         cl, err := NewClient(&TestingConfig)
429         require.NoError(t, err)
430         defer cl.Close()
431         spec := TorrentSpec{}
432         T, new, _ := cl.AddTorrentSpec(&spec)
433         if !new {
434                 t.FailNow()
435         }
436         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
437         _, new, _ = cl.AddTorrentSpec(&spec)
438         if new {
439                 t.FailNow()
440         }
441         assert.EqualValues(t, T.trackers[0][0], "http://a")
442         assert.EqualValues(t, T.trackers[1][0], "udp://b")
443 }
444
445 type badStorage struct{}
446
447 func (bs badStorage) OpenTorrent(*metainfo.InfoEx) (storage.Torrent, error) {
448         return bs, nil
449 }
450
451 func (bs badStorage) Close() error {
452         return nil
453 }
454
455 func (bs badStorage) Piece(p metainfo.Piece) storage.Piece {
456         return badStoragePiece{p}
457 }
458
459 type badStoragePiece struct {
460         p metainfo.Piece
461 }
462
463 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
464         return 0, nil
465 }
466
467 func (p badStoragePiece) GetIsComplete() bool {
468         return true
469 }
470
471 func (p badStoragePiece) MarkComplete() error {
472         return errors.New("psyyyyyyyche")
473 }
474
475 func (p badStoragePiece) randomlyTruncatedDataString() string {
476         return "hello, world\n"[:rand.Intn(14)]
477 }
478
479 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
480         r := strings.NewReader(p.randomlyTruncatedDataString())
481         return r.ReadAt(b, off+p.p.Offset())
482 }
483
484 // We read from a piece which is marked completed, but is missing data.
485 func TestCompletedPieceWrongSize(t *testing.T) {
486         cfg := TestingConfig
487         cfg.DefaultStorage = badStorage{}
488         cl, err := NewClient(&cfg)
489         require.NoError(t, err)
490         defer cl.Close()
491         ie := metainfo.InfoEx{
492                 Info: metainfo.Info{
493                         PieceLength: 15,
494                         Pieces:      make([]byte, 20),
495                         Files: []metainfo.FileInfo{
496                                 metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
497                         },
498                 },
499         }
500         ie.UpdateBytes()
501         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
502                 Info:     &ie,
503                 InfoHash: ie.Hash(),
504         })
505         require.NoError(t, err)
506         defer tt.Drop()
507         assert.True(t, new)
508         r := tt.NewReader()
509         defer r.Close()
510         b, err := ioutil.ReadAll(r)
511         assert.Len(t, b, 13)
512         assert.NoError(t, err)
513 }
514
515 func BenchmarkAddLargeTorrent(b *testing.B) {
516         cfg := TestingConfig
517         cfg.DisableTCP = true
518         cfg.DisableUTP = true
519         cfg.ListenAddr = "redonk"
520         cl, _ := NewClient(&cfg)
521         defer cl.Close()
522         for range iter.N(b.N) {
523                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
524                 if err != nil {
525                         b.Fatal(err)
526                 }
527                 t.Drop()
528         }
529 }
530
531 func TestResponsive(t *testing.T) {
532         seederDataDir, mi := testutil.GreetingTestTorrent()
533         defer os.RemoveAll(seederDataDir)
534         cfg := TestingConfig
535         cfg.Seed = true
536         cfg.DataDir = seederDataDir
537         seeder, err := NewClient(&cfg)
538         require.Nil(t, err)
539         defer seeder.Close()
540         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
541         leecherDataDir, err := ioutil.TempDir("", "")
542         require.Nil(t, err)
543         defer os.RemoveAll(leecherDataDir)
544         cfg = TestingConfig
545         cfg.DataDir = leecherDataDir
546         leecher, err := NewClient(&cfg)
547         require.Nil(t, err)
548         defer leecher.Close()
549         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
550                 ret = TorrentSpecFromMetaInfo(mi)
551                 ret.ChunkSize = 2
552                 return
553         }())
554         leecherTorrent.AddPeers([]Peer{
555                 Peer{
556                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
557                         Port: missinggo.AddrPort(seeder.ListenAddr()),
558                 },
559         })
560         reader := leecherTorrent.NewReader()
561         defer reader.Close()
562         reader.SetReadahead(0)
563         reader.SetResponsive()
564         b := make([]byte, 2)
565         _, err = reader.Seek(3, os.SEEK_SET)
566         require.NoError(t, err)
567         _, err = io.ReadFull(reader, b)
568         assert.Nil(t, err)
569         assert.EqualValues(t, "lo", string(b))
570         _, err = reader.Seek(11, os.SEEK_SET)
571         require.NoError(t, err)
572         n, err := io.ReadFull(reader, b)
573         assert.Nil(t, err)
574         assert.EqualValues(t, 2, n)
575         assert.EqualValues(t, "d\n", string(b))
576 }
577
578 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
579         seederDataDir, mi := testutil.GreetingTestTorrent()
580         defer os.RemoveAll(seederDataDir)
581         cfg := TestingConfig
582         cfg.Seed = true
583         cfg.DataDir = seederDataDir
584         seeder, err := NewClient(&cfg)
585         require.Nil(t, err)
586         defer seeder.Close()
587         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
588         leecherDataDir, err := ioutil.TempDir("", "")
589         require.Nil(t, err)
590         defer os.RemoveAll(leecherDataDir)
591         cfg = TestingConfig
592         cfg.DataDir = leecherDataDir
593         leecher, err := NewClient(&cfg)
594         require.Nil(t, err)
595         defer leecher.Close()
596         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
597                 ret = TorrentSpecFromMetaInfo(mi)
598                 ret.ChunkSize = 2
599                 return
600         }())
601         leecherTorrent.AddPeers([]Peer{
602                 Peer{
603                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
604                         Port: missinggo.AddrPort(seeder.ListenAddr()),
605                 },
606         })
607         reader := leecherTorrent.NewReader()
608         defer reader.Close()
609         reader.SetReadahead(0)
610         reader.SetResponsive()
611         b := make([]byte, 2)
612         _, err = reader.Seek(3, os.SEEK_SET)
613         require.NoError(t, err)
614         _, err = io.ReadFull(reader, b)
615         assert.Nil(t, err)
616         assert.EqualValues(t, "lo", string(b))
617         go leecherTorrent.Drop()
618         _, err = reader.Seek(11, os.SEEK_SET)
619         require.NoError(t, err)
620         n, err := reader.Read(b)
621         assert.EqualError(t, err, "torrent closed")
622         assert.EqualValues(t, 0, n)
623 }
624
625 func TestDHTInheritBlocklist(t *testing.T) {
626         ipl := iplist.New(nil)
627         require.NotNil(t, ipl)
628         cfg := TestingConfig
629         cfg.IPBlocklist = ipl
630         cfg.NoDHT = false
631         cl, err := NewClient(&cfg)
632         require.NoError(t, err)
633         defer cl.Close()
634         require.Equal(t, ipl, cl.DHT().IPBlocklist())
635 }
636
637 // Check that stuff is merged in subsequent AddTorrentSpec for the same
638 // infohash.
639 func TestAddTorrentSpecMerging(t *testing.T) {
640         cl, err := NewClient(&TestingConfig)
641         require.NoError(t, err)
642         defer cl.Close()
643         dir, mi := testutil.GreetingTestTorrent()
644         defer os.RemoveAll(dir)
645         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
646                 InfoHash: mi.Info.Hash(),
647         })
648         require.NoError(t, err)
649         require.True(t, new)
650         require.Nil(t, tt.Info())
651         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
652         require.NoError(t, err)
653         require.False(t, new)
654         require.NotNil(t, tt.Info())
655 }
656
657 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
658         dir, mi := testutil.GreetingTestTorrent()
659         os.RemoveAll(dir)
660         cl, _ := NewClient(&TestingConfig)
661         defer cl.Close()
662         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
663                 InfoHash: mi.Info.Hash(),
664         })
665         tt.Drop()
666         assert.EqualValues(t, 0, len(cl.Torrents()))
667         select {
668         case <-tt.GotInfo():
669                 t.FailNow()
670         default:
671         }
672 }
673
674 func writeTorrentData(ts storage.Torrent, info *metainfo.InfoEx, b []byte) {
675         for i := range iter.N(info.NumPieces()) {
676                 n, _ := ts.Piece(info.Piece(i)).WriteAt(b, 0)
677                 b = b[n:]
678         }
679 }
680
681 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) {
682         fileCacheDir, err := ioutil.TempDir("", "")
683         require.NoError(t, err)
684         defer os.RemoveAll(fileCacheDir)
685         fileCache, err := filecache.NewCache(fileCacheDir)
686         require.NoError(t, err)
687         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
688         defer os.RemoveAll(greetingDataTempDir)
689         filePieceStore := storage.NewPieceFileStorage(fileCache.AsFileStore())
690         greetingData, err := filePieceStore.OpenTorrent(&greetingMetainfo.Info)
691         require.NoError(t, err)
692         writeTorrentData(greetingData, &greetingMetainfo.Info, []byte(testutil.GreetingFileContents))
693         // require.Equal(t, len(testutil.GreetingFileContents), written)
694         // require.NoError(t, err)
695         for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
696                 p := greetingMetainfo.Info.Piece(i)
697                 if alreadyCompleted {
698                         err := greetingData.Piece(p).MarkComplete()
699                         assert.NoError(t, err)
700                 }
701         }
702         cfg := TestingConfig
703         // TODO: Disable network option?
704         cfg.DisableTCP = true
705         cfg.DisableUTP = true
706         cfg.DefaultStorage = filePieceStore
707         cl, err := NewClient(&cfg)
708         require.NoError(t, err)
709         defer cl.Close()
710         tt, err := cl.AddTorrent(greetingMetainfo)
711         require.NoError(t, err)
712         psrs := tt.PieceStateRuns()
713         assert.Len(t, psrs, 1)
714         assert.EqualValues(t, 3, psrs[0].Length)
715         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
716         if alreadyCompleted {
717                 r := tt.NewReader()
718                 b, err := ioutil.ReadAll(r)
719                 assert.NoError(t, err)
720                 assert.EqualValues(t, testutil.GreetingFileContents, b)
721         }
722 }
723
724 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
725         testAddTorrentPriorPieceCompletion(t, true)
726 }
727
728 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
729         testAddTorrentPriorPieceCompletion(t, false)
730 }
731
732 func TestAddMetainfoWithNodes(t *testing.T) {
733         cfg := TestingConfig
734         cfg.NoDHT = false
735         // For now, we want to just jam the nodes into the table, without
736         // verifying them first. Also the DHT code doesn't support mixing secure
737         // and insecure nodes if security is enabled (yet).
738         cfg.DHTConfig.NoSecurity = true
739         cl, err := NewClient(&cfg)
740         require.NoError(t, err)
741         defer cl.Close()
742         assert.EqualValues(t, cl.DHT().NumNodes(), 0)
743         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
744         require.NoError(t, err)
745         assert.Len(t, tt.trackers, 5)
746         assert.EqualValues(t, 6, cl.DHT().NumNodes())
747 }
748
749 type testDownloadCancelParams struct {
750         Responsive                bool
751         Readahead                 int64
752         SetReadahead              bool
753         ExportClientStatus        bool
754         SetLeecherStorageCapacity bool
755         LeecherStorageCapacity    int64
756         Cancel                    bool
757 }
758
759 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
760         greetingTempDir, mi := testutil.GreetingTestTorrent()
761         defer os.RemoveAll(greetingTempDir)
762         cfg := TestingConfig
763         cfg.Seed = true
764         cfg.DataDir = greetingTempDir
765         seeder, err := NewClient(&cfg)
766         require.NoError(t, err)
767         defer seeder.Close()
768         if ps.ExportClientStatus {
769                 testutil.ExportStatusWriter(seeder, "s")
770         }
771         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
772         leecherDataDir, err := ioutil.TempDir("", "")
773         require.NoError(t, err)
774         defer os.RemoveAll(leecherDataDir)
775         fc, err := filecache.NewCache(leecherDataDir)
776         require.NoError(t, err)
777         if ps.SetLeecherStorageCapacity {
778                 fc.SetCapacity(ps.LeecherStorageCapacity)
779         }
780         cfg.DefaultStorage = storage.NewPieceFileStorage(fc.AsFileStore())
781         cfg.DataDir = leecherDataDir
782         leecher, _ := NewClient(&cfg)
783         defer leecher.Close()
784         if ps.ExportClientStatus {
785                 testutil.ExportStatusWriter(leecher, "l")
786         }
787         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
788                 ret = TorrentSpecFromMetaInfo(mi)
789                 ret.ChunkSize = 2
790                 return
791         }())
792         require.NoError(t, err)
793         assert.True(t, new)
794         psc := leecherGreeting.SubscribePieceStateChanges()
795         defer psc.Close()
796         leecherGreeting.DownloadAll()
797         if ps.Cancel {
798                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
799         }
800         leecherGreeting.AddPeers([]Peer{
801                 Peer{
802                         IP:   missinggo.AddrIP(seeder.ListenAddr()),
803                         Port: missinggo.AddrPort(seeder.ListenAddr()),
804                 },
805         })
806         completes := make(map[int]bool, 3)
807 values:
808         for {
809                 // started := time.Now()
810                 select {
811                 case _v := <-psc.Values:
812                         // log.Print(time.Since(started))
813                         v := _v.(PieceStateChange)
814                         completes[v.Index] = v.Complete
815                 case <-time.After(100 * time.Millisecond):
816                         break values
817                 }
818         }
819         if ps.Cancel {
820                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
821         } else {
822                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
823         }
824
825 }
826
827 func TestTorrentDownloadAll(t *testing.T) {
828         testDownloadCancel(t, testDownloadCancelParams{})
829 }
830
831 func TestTorrentDownloadAllThenCancel(t *testing.T) {
832         testDownloadCancel(t, testDownloadCancelParams{
833                 Cancel: true,
834         })
835 }
836
837 // Ensure that it's an error for a peer to send an invalid have message.
838 func TestPeerInvalidHave(t *testing.T) {
839         cl, err := NewClient(&TestingConfig)
840         require.NoError(t, err)
841         defer cl.Close()
842         ie := metainfo.InfoEx{
843                 Info: metainfo.Info{
844                         PieceLength: 1,
845                         Pieces:      make([]byte, 20),
846                         Files:       []metainfo.FileInfo{{Length: 1}},
847                 },
848         }
849         ie.UpdateBytes()
850         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
851                 Info:     &ie,
852                 InfoHash: ie.Hash(),
853         })
854         require.NoError(t, err)
855         assert.True(t, _new)
856         defer tt.Drop()
857         cn := &connection{
858                 t: tt,
859         }
860         assert.NoError(t, cn.peerSentHave(0))
861         assert.Error(t, cn.peerSentHave(1))
862 }
863
864 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
865         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
866         defer os.RemoveAll(greetingTempDir)
867         cfg := TestingConfig
868         cfg.DataDir = greetingTempDir
869         seeder, err := NewClient(&TestingConfig)
870         require.NoError(t, err)
871         seeder.AddTorrentSpec(&TorrentSpec{
872                 Info: &greetingMetainfo.Info,
873         })
874 }