]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Remove the InfoEx type, and don't generate its infohash on the fly
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "math/rand"
11         "net"
12         "os"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/anacrolix/utp"
23         "github.com/bradfitz/iter"
24         "github.com/stretchr/testify/assert"
25         "github.com/stretchr/testify/require"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/dht"
29         "github.com/anacrolix/torrent/internal/testutil"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32         "github.com/anacrolix/torrent/storage"
33 )
34
35 func init() {
36         log.SetFlags(log.LstdFlags | log.Llongfile)
37 }
38
39 var TestingConfig = Config{
40         ListenAddr:      "localhost:0",
41         NoDHT:           true,
42         DisableTrackers: true,
43         DataDir:         "/dev/null",
44         DHTConfig: dht.ServerConfig{
45                 NoDefaultBootstrap: true,
46         },
47 }
48
49 func TestClientDefault(t *testing.T) {
50         cl, err := NewClient(&TestingConfig)
51         require.NoError(t, err)
52         cl.Close()
53 }
54
55 func TestAddDropTorrent(t *testing.T) {
56         cl, err := NewClient(&TestingConfig)
57         require.NoError(t, err)
58         defer cl.Close()
59         dir, mi := testutil.GreetingTestTorrent()
60         defer os.RemoveAll(dir)
61         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
62         require.NoError(t, err)
63         assert.True(t, new)
64         tt.SetMaxEstablishedConns(0)
65         tt.SetMaxEstablishedConns(1)
66         tt.Drop()
67 }
68
69 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
70         t.SkipNow()
71 }
72
73 func TestAddTorrentNoUsableURLs(t *testing.T) {
74         t.SkipNow()
75 }
76
77 func TestAddPeersToUnknownTorrent(t *testing.T) {
78         t.SkipNow()
79 }
80
81 func TestPieceHashSize(t *testing.T) {
82         if pieceHash.Size() != 20 {
83                 t.FailNow()
84         }
85 }
86
87 func TestTorrentInitialState(t *testing.T) {
88         dir, mi := testutil.GreetingTestTorrent()
89         defer os.RemoveAll(dir)
90         tor := &Torrent{
91                 infoHash:          mi.HashInfoBytes(),
92                 pieceStateChanges: pubsub.NewPubSub(),
93         }
94         tor.chunkSize = 2
95         tor.storageOpener = storage.NewFile("/dev/null")
96         // Needed to lock for asynchronous piece verification.
97         tor.cl = new(Client)
98         err := tor.setInfoBytes(mi.InfoBytes)
99         require.NoError(t, err)
100         require.Len(t, tor.pieces, 3)
101         tor.pendAllChunkSpecs(0)
102         tor.cl.mu.Lock()
103         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
104         tor.cl.mu.Unlock()
105         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
106 }
107
108 func TestUnmarshalPEXMsg(t *testing.T) {
109         var m peerExchangeMessage
110         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
111                 t.Fatal(err)
112         }
113         if len(m.Added) != 2 {
114                 t.FailNow()
115         }
116         if m.Added[0].Port != 0x506 {
117                 t.FailNow()
118         }
119 }
120
121 func TestReducedDialTimeout(t *testing.T) {
122         for _, _case := range []struct {
123                 Max             time.Duration
124                 HalfOpenLimit   int
125                 PendingPeers    int
126                 ExpectedReduced time.Duration
127         }{
128                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
129                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
130                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
131                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
132                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
133                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
134         } {
135                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
136                 expected := _case.ExpectedReduced
137                 if expected < minDialTimeout {
138                         expected = minDialTimeout
139                 }
140                 if reduced != expected {
141                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
142                 }
143         }
144 }
145
146 func TestUTPRawConn(t *testing.T) {
147         l, err := utp.NewSocket("udp", "")
148         if err != nil {
149                 t.Fatal(err)
150         }
151         defer l.Close()
152         go func() {
153                 for {
154                         _, err := l.Accept()
155                         if err != nil {
156                                 break
157                         }
158                 }
159         }()
160         // Connect a UTP peer to see if the RawConn will still work.
161         s, _ := utp.NewSocket("udp", "")
162         defer s.Close()
163         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
164         if err != nil {
165                 t.Fatalf("error dialing utp listener: %s", err)
166         }
167         defer utpPeer.Close()
168         peer, err := net.ListenPacket("udp", ":0")
169         if err != nil {
170                 t.Fatal(err)
171         }
172         defer peer.Close()
173
174         msgsReceived := 0
175         // How many messages to send. I've set this to double the channel buffer
176         // size in the raw packetConn.
177         const N = 200
178         readerStopped := make(chan struct{})
179         // The reader goroutine.
180         go func() {
181                 defer close(readerStopped)
182                 b := make([]byte, 500)
183                 for i := 0; i < N; i++ {
184                         n, _, err := l.ReadFrom(b)
185                         if err != nil {
186                                 t.Fatalf("error reading from raw conn: %s", err)
187                         }
188                         msgsReceived++
189                         var d int
190                         fmt.Sscan(string(b[:n]), &d)
191                         if d != i {
192                                 log.Printf("got wrong number: expected %d, got %d", i, d)
193                         }
194                 }
195         }()
196         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
197         if err != nil {
198                 t.Fatal(err)
199         }
200         for i := 0; i < N; i++ {
201                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
202                 if err != nil {
203                         t.Fatal(err)
204                 }
205                 time.Sleep(time.Microsecond)
206         }
207         select {
208         case <-readerStopped:
209         case <-time.After(time.Second):
210                 t.Fatal("reader timed out")
211         }
212         if msgsReceived != N {
213                 t.Fatalf("messages received: %d", msgsReceived)
214         }
215 }
216
217 func TestTwoClientsArbitraryPorts(t *testing.T) {
218         for i := 0; i < 2; i++ {
219                 cl, err := NewClient(&TestingConfig)
220                 if err != nil {
221                         t.Fatal(err)
222                 }
223                 defer cl.Close()
224         }
225 }
226
227 func TestAddDropManyTorrents(t *testing.T) {
228         cl, err := NewClient(&TestingConfig)
229         require.NoError(t, err)
230         defer cl.Close()
231         for i := range iter.N(1000) {
232                 var spec TorrentSpec
233                 binary.PutVarint(spec.InfoHash[:], int64(i))
234                 tt, new, err := cl.AddTorrentSpec(&spec)
235                 assert.NoError(t, err)
236                 assert.True(t, new)
237                 defer tt.Drop()
238         }
239 }
240
241 type FileCacheClientStorageFactoryParams struct {
242         Capacity    int64
243         SetCapacity bool
244         Wrapper     func(*filecache.Cache) storage.Client
245 }
246
247 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
248         return func(dataDir string) storage.Client {
249                 fc, err := filecache.NewCache(dataDir)
250                 if err != nil {
251                         panic(err)
252                 }
253                 if ps.SetCapacity {
254                         fc.SetCapacity(ps.Capacity)
255                 }
256                 return ps.Wrapper(fc)
257         }
258 }
259
260 type storageFactory func(string) storage.Client
261
262 func TestClientTransferDefault(t *testing.T) {
263         testClientTransfer(t, testClientTransferParams{
264                 ExportClientStatus: true,
265                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
266                         Wrapper: fileCachePieceResourceStorage,
267                 }),
268         })
269 }
270
271 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.Client {
272         return storage.NewResourcePieces(fc.AsResourceProvider())
273 }
274
275 func fileCachePieceFileStorage(fc *filecache.Cache) storage.Client {
276         return storage.NewFileStorePieces(fc.AsFileStore())
277 }
278
279 func TestClientTransferSmallCache(t *testing.T) {
280         testClientTransfer(t, testClientTransferParams{
281                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
282                         SetCapacity: true,
283                         // Going below the piece length means it can't complete a piece so
284                         // that it can be hashed.
285                         Capacity: 5,
286                         Wrapper:  fileCachePieceResourceStorage,
287                 }),
288                 SetReadahead: true,
289                 // Can't readahead too far or the cache will thrash and drop data we
290                 // thought we had.
291                 Readahead:          0,
292                 ExportClientStatus: true,
293         })
294 }
295
296 func TestClientTransferVarious(t *testing.T) {
297         for _, lsf := range []func(*filecache.Cache) storage.Client{
298                 fileCachePieceFileStorage,
299                 fileCachePieceResourceStorage,
300         } {
301                 for _, ss := range []func(string) storage.Client{
302                         storage.NewFile,
303                         storage.NewMMap,
304                 } {
305                         for _, responsive := range []bool{false, true} {
306                                 testClientTransfer(t, testClientTransferParams{
307                                         Responsive:    responsive,
308                                         SeederStorage: ss,
309                                         LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
310                                                 Wrapper: lsf,
311                                         }),
312                                 })
313                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
314                                         testClientTransfer(t, testClientTransferParams{
315                                                 SeederStorage: ss,
316                                                 Responsive:    responsive,
317                                                 SetReadahead:  true,
318                                                 Readahead:     readahead,
319                                                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
320                                                         Wrapper: lsf,
321                                                 }),
322                                         })
323                                 }
324                         }
325                 }
326         }
327 }
328
329 type testClientTransferParams struct {
330         Responsive         bool
331         Readahead          int64
332         SetReadahead       bool
333         ExportClientStatus bool
334         LeecherStorage     func(string) storage.Client
335         SeederStorage      func(string) storage.Client
336 }
337
338 // Creates a seeder and a leecher, and ensures the data transfers when a read
339 // is attempted on the leecher.
340 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
341         greetingTempDir, mi := testutil.GreetingTestTorrent()
342         defer os.RemoveAll(greetingTempDir)
343         // Create seeder and a Torrent.
344         cfg := TestingConfig
345         cfg.Seed = true
346         cfg.ListenAddr = "localhost:4000"
347         if ps.SeederStorage != nil {
348                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
349         } else {
350                 cfg.DataDir = greetingTempDir
351         }
352         seeder, err := NewClient(&cfg)
353         require.NoError(t, err)
354         defer seeder.Close()
355         if ps.ExportClientStatus {
356                 testutil.ExportStatusWriter(seeder, "s")
357         }
358         seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
359         require.NoError(t, err)
360         assert.True(t, new)
361         // Create leecher and a Torrent.
362         leecherDataDir, err := ioutil.TempDir("", "")
363         require.NoError(t, err)
364         defer os.RemoveAll(leecherDataDir)
365         cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
366         cfg.ListenAddr = "localhost:4001"
367         leecher, err := NewClient(&cfg)
368         require.NoError(t, err)
369         defer leecher.Close()
370         if ps.ExportClientStatus {
371                 testutil.ExportStatusWriter(leecher, "l")
372         }
373         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
374                 ret = TorrentSpecFromMetaInfo(mi)
375                 ret.ChunkSize = 2
376                 ret.Storage = storage.NewFile(leecherDataDir)
377                 return
378         }())
379         require.NoError(t, err)
380         assert.True(t, new)
381         // Now do some things with leecher and seeder.
382         addClientPeer(leecherGreeting, seeder)
383         r := leecherGreeting.NewReader()
384         defer r.Close()
385         if ps.Responsive {
386                 r.SetResponsive()
387         }
388         if ps.SetReadahead {
389                 r.SetReadahead(ps.Readahead)
390         }
391         assertReadAllGreeting(t, r)
392         // After one read through, we can assume certain torrent statistics.
393         // These are not a strict requirement. It is however interesting to
394         // follow.
395         t.Logf("%#v", seederTorrent.Stats())
396         assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
397         assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
398         assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
399         assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
400         // Read through again for the cases where the torrent data size exceeds
401         // the size of the cache.
402         assertReadAllGreeting(t, r)
403 }
404
405 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
406         pos, err := r.Seek(0, os.SEEK_SET)
407         assert.NoError(t, err)
408         assert.EqualValues(t, 0, pos)
409         _greeting, err := ioutil.ReadAll(r)
410         assert.NoError(t, err)
411         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
412 }
413
414 // Check that after completing leeching, a leecher transitions to a seeding
415 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
416 func TestSeedAfterDownloading(t *testing.T) {
417         greetingTempDir, mi := testutil.GreetingTestTorrent()
418         defer os.RemoveAll(greetingTempDir)
419         cfg := TestingConfig
420         cfg.Seed = true
421         cfg.DataDir = greetingTempDir
422         seeder, err := NewClient(&cfg)
423         require.NoError(t, err)
424         defer seeder.Close()
425         testutil.ExportStatusWriter(seeder, "s")
426         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
427         cfg.DataDir, err = ioutil.TempDir("", "")
428         require.NoError(t, err)
429         defer os.RemoveAll(cfg.DataDir)
430         leecher, err := NewClient(&cfg)
431         require.NoError(t, err)
432         defer leecher.Close()
433         testutil.ExportStatusWriter(leecher, "l")
434         cfg.Seed = false
435         // cfg.TorrentDataOpener = nil
436         cfg.DataDir, err = ioutil.TempDir("", "")
437         require.NoError(t, err)
438         defer os.RemoveAll(cfg.DataDir)
439         leecherLeecher, _ := NewClient(&cfg)
440         defer leecherLeecher.Close()
441         testutil.ExportStatusWriter(leecherLeecher, "ll")
442         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
443                 ret = TorrentSpecFromMetaInfo(mi)
444                 ret.ChunkSize = 2
445                 return
446         }())
447         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
448                 ret = TorrentSpecFromMetaInfo(mi)
449                 ret.ChunkSize = 3
450                 return
451         }())
452         // Simultaneously DownloadAll in Leecher, and read the contents
453         // consecutively in LeecherLeecher. This non-deterministically triggered a
454         // case where the leecher wouldn't unchoke the LeecherLeecher.
455         var wg sync.WaitGroup
456         wg.Add(1)
457         go func() {
458                 defer wg.Done()
459                 r := llg.NewReader()
460                 defer r.Close()
461                 b, err := ioutil.ReadAll(r)
462                 require.NoError(t, err)
463                 assert.EqualValues(t, testutil.GreetingFileContents, b)
464         }()
465         addClientPeer(leecherGreeting, seeder)
466         addClientPeer(leecherGreeting, leecherLeecher)
467         wg.Add(1)
468         go func() {
469                 defer wg.Done()
470                 leecherGreeting.DownloadAll()
471                 leecher.WaitAll()
472         }()
473         wg.Wait()
474 }
475
476 func TestMergingTrackersByAddingSpecs(t *testing.T) {
477         cl, err := NewClient(&TestingConfig)
478         require.NoError(t, err)
479         defer cl.Close()
480         spec := TorrentSpec{}
481         T, new, _ := cl.AddTorrentSpec(&spec)
482         if !new {
483                 t.FailNow()
484         }
485         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
486         _, new, _ = cl.AddTorrentSpec(&spec)
487         assert.False(t, new)
488         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
489         // Because trackers are disabled in TestingConfig.
490         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
491 }
492
493 type badStorage struct{}
494
495 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.Torrent, error) {
496         return bs, nil
497 }
498
499 func (bs badStorage) Close() error {
500         return nil
501 }
502
503 func (bs badStorage) Piece(p metainfo.Piece) storage.Piece {
504         return badStoragePiece{p}
505 }
506
507 type badStoragePiece struct {
508         p metainfo.Piece
509 }
510
511 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
512         return 0, nil
513 }
514
515 func (p badStoragePiece) GetIsComplete() bool {
516         return true
517 }
518
519 func (p badStoragePiece) MarkComplete() error {
520         return errors.New("psyyyyyyyche")
521 }
522
523 func (p badStoragePiece) randomlyTruncatedDataString() string {
524         return "hello, world\n"[:rand.Intn(14)]
525 }
526
527 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
528         r := strings.NewReader(p.randomlyTruncatedDataString())
529         return r.ReadAt(b, off+p.p.Offset())
530 }
531
532 // We read from a piece which is marked completed, but is missing data.
533 func TestCompletedPieceWrongSize(t *testing.T) {
534         cfg := TestingConfig
535         cfg.DefaultStorage = badStorage{}
536         cl, err := NewClient(&cfg)
537         require.NoError(t, err)
538         defer cl.Close()
539         info := metainfo.Info{
540                 PieceLength: 15,
541                 Pieces:      make([]byte, 20),
542                 Files: []metainfo.FileInfo{
543                         metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
544                 },
545         }
546         b, err := bencode.Marshal(info)
547         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
548                 InfoBytes: b,
549                 InfoHash:  metainfo.HashBytes(b),
550         })
551         require.NoError(t, err)
552         defer tt.Drop()
553         assert.True(t, new)
554         r := tt.NewReader()
555         defer r.Close()
556         b, err = ioutil.ReadAll(r)
557         assert.Len(t, b, 13)
558         assert.NoError(t, err)
559 }
560
561 func BenchmarkAddLargeTorrent(b *testing.B) {
562         cfg := TestingConfig
563         cfg.DisableTCP = true
564         cfg.DisableUTP = true
565         cfg.ListenAddr = "redonk"
566         cl, _ := NewClient(&cfg)
567         defer cl.Close()
568         for range iter.N(b.N) {
569                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
570                 if err != nil {
571                         b.Fatal(err)
572                 }
573                 t.Drop()
574         }
575 }
576
577 func TestResponsive(t *testing.T) {
578         seederDataDir, mi := testutil.GreetingTestTorrent()
579         defer os.RemoveAll(seederDataDir)
580         cfg := TestingConfig
581         cfg.Seed = true
582         cfg.DataDir = seederDataDir
583         seeder, err := NewClient(&cfg)
584         require.Nil(t, err)
585         defer seeder.Close()
586         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
587         leecherDataDir, err := ioutil.TempDir("", "")
588         require.Nil(t, err)
589         defer os.RemoveAll(leecherDataDir)
590         cfg = TestingConfig
591         cfg.DataDir = leecherDataDir
592         leecher, err := NewClient(&cfg)
593         require.Nil(t, err)
594         defer leecher.Close()
595         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
596                 ret = TorrentSpecFromMetaInfo(mi)
597                 ret.ChunkSize = 2
598                 return
599         }())
600         addClientPeer(leecherTorrent, seeder)
601         reader := leecherTorrent.NewReader()
602         defer reader.Close()
603         reader.SetReadahead(0)
604         reader.SetResponsive()
605         b := make([]byte, 2)
606         _, err = reader.Seek(3, os.SEEK_SET)
607         require.NoError(t, err)
608         _, err = io.ReadFull(reader, b)
609         assert.Nil(t, err)
610         assert.EqualValues(t, "lo", string(b))
611         _, err = reader.Seek(11, os.SEEK_SET)
612         require.NoError(t, err)
613         n, err := io.ReadFull(reader, b)
614         assert.Nil(t, err)
615         assert.EqualValues(t, 2, n)
616         assert.EqualValues(t, "d\n", string(b))
617 }
618
619 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
620         seederDataDir, mi := testutil.GreetingTestTorrent()
621         defer os.RemoveAll(seederDataDir)
622         cfg := TestingConfig
623         cfg.Seed = true
624         cfg.DataDir = seederDataDir
625         seeder, err := NewClient(&cfg)
626         require.Nil(t, err)
627         defer seeder.Close()
628         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
629         leecherDataDir, err := ioutil.TempDir("", "")
630         require.Nil(t, err)
631         defer os.RemoveAll(leecherDataDir)
632         cfg = TestingConfig
633         cfg.DataDir = leecherDataDir
634         leecher, err := NewClient(&cfg)
635         require.Nil(t, err)
636         defer leecher.Close()
637         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
638                 ret = TorrentSpecFromMetaInfo(mi)
639                 ret.ChunkSize = 2
640                 return
641         }())
642         addClientPeer(leecherTorrent, seeder)
643         reader := leecherTorrent.NewReader()
644         defer reader.Close()
645         reader.SetReadahead(0)
646         reader.SetResponsive()
647         b := make([]byte, 2)
648         _, err = reader.Seek(3, os.SEEK_SET)
649         require.NoError(t, err)
650         _, err = io.ReadFull(reader, b)
651         assert.Nil(t, err)
652         assert.EqualValues(t, "lo", string(b))
653         go leecherTorrent.Drop()
654         _, err = reader.Seek(11, os.SEEK_SET)
655         require.NoError(t, err)
656         n, err := reader.Read(b)
657         assert.EqualError(t, err, "torrent closed")
658         assert.EqualValues(t, 0, n)
659 }
660
661 func TestDHTInheritBlocklist(t *testing.T) {
662         ipl := iplist.New(nil)
663         require.NotNil(t, ipl)
664         cfg := TestingConfig
665         cfg.IPBlocklist = ipl
666         cfg.NoDHT = false
667         cl, err := NewClient(&cfg)
668         require.NoError(t, err)
669         defer cl.Close()
670         require.Equal(t, ipl, cl.DHT().IPBlocklist())
671 }
672
673 // Check that stuff is merged in subsequent AddTorrentSpec for the same
674 // infohash.
675 func TestAddTorrentSpecMerging(t *testing.T) {
676         cl, err := NewClient(&TestingConfig)
677         require.NoError(t, err)
678         defer cl.Close()
679         dir, mi := testutil.GreetingTestTorrent()
680         defer os.RemoveAll(dir)
681         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
682                 InfoHash: mi.HashInfoBytes(),
683         })
684         require.NoError(t, err)
685         require.True(t, new)
686         require.Nil(t, tt.Info())
687         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
688         require.NoError(t, err)
689         require.False(t, new)
690         require.NotNil(t, tt.Info())
691 }
692
693 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
694         dir, mi := testutil.GreetingTestTorrent()
695         os.RemoveAll(dir)
696         cl, _ := NewClient(&TestingConfig)
697         defer cl.Close()
698         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
699                 InfoHash: mi.HashInfoBytes(),
700         })
701         tt.Drop()
702         assert.EqualValues(t, 0, len(cl.Torrents()))
703         select {
704         case <-tt.GotInfo():
705                 t.FailNow()
706         default:
707         }
708 }
709
710 func writeTorrentData(ts storage.Torrent, info metainfo.Info, b []byte) {
711         for i := range iter.N(info.NumPieces()) {
712                 n, _ := ts.Piece(info.Piece(i)).WriteAt(b, 0)
713                 b = b[n:]
714         }
715 }
716
717 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.Client) {
718         fileCacheDir, err := ioutil.TempDir("", "")
719         require.NoError(t, err)
720         defer os.RemoveAll(fileCacheDir)
721         fileCache, err := filecache.NewCache(fileCacheDir)
722         require.NoError(t, err)
723         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
724         defer os.RemoveAll(greetingDataTempDir)
725         filePieceStore := csf(fileCache)
726         info := greetingMetainfo.UnmarshalInfo()
727         ih := greetingMetainfo.HashInfoBytes()
728         greetingData, err := filePieceStore.OpenTorrent(&info, ih)
729         require.NoError(t, err)
730         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
731         // require.Equal(t, len(testutil.GreetingFileContents), written)
732         // require.NoError(t, err)
733         for i := 0; i < info.NumPieces(); i++ {
734                 p := info.Piece(i)
735                 if alreadyCompleted {
736                         err := greetingData.Piece(p).MarkComplete()
737                         assert.NoError(t, err)
738                 }
739         }
740         cfg := TestingConfig
741         // TODO: Disable network option?
742         cfg.DisableTCP = true
743         cfg.DisableUTP = true
744         cfg.DefaultStorage = filePieceStore
745         cl, err := NewClient(&cfg)
746         require.NoError(t, err)
747         defer cl.Close()
748         tt, err := cl.AddTorrent(greetingMetainfo)
749         require.NoError(t, err)
750         psrs := tt.PieceStateRuns()
751         assert.Len(t, psrs, 1)
752         assert.EqualValues(t, 3, psrs[0].Length)
753         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
754         if alreadyCompleted {
755                 r := tt.NewReader()
756                 b, err := ioutil.ReadAll(r)
757                 assert.NoError(t, err)
758                 assert.EqualValues(t, testutil.GreetingFileContents, b)
759         }
760 }
761
762 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
763         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceFileStorage)
764         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
765 }
766
767 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
768         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceFileStorage)
769         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
770 }
771
772 func TestAddMetainfoWithNodes(t *testing.T) {
773         cfg := TestingConfig
774         cfg.NoDHT = false
775         // For now, we want to just jam the nodes into the table, without
776         // verifying them first. Also the DHT code doesn't support mixing secure
777         // and insecure nodes if security is enabled (yet).
778         cfg.DHTConfig.NoSecurity = true
779         cl, err := NewClient(&cfg)
780         require.NoError(t, err)
781         defer cl.Close()
782         assert.EqualValues(t, cl.DHT().NumNodes(), 0)
783         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
784         require.NoError(t, err)
785         assert.Len(t, tt.metainfo.AnnounceList, 5)
786         assert.EqualValues(t, 6, cl.DHT().NumNodes())
787 }
788
789 type testDownloadCancelParams struct {
790         ExportClientStatus        bool
791         SetLeecherStorageCapacity bool
792         LeecherStorageCapacity    int64
793         Cancel                    bool
794 }
795
796 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
797         greetingTempDir, mi := testutil.GreetingTestTorrent()
798         defer os.RemoveAll(greetingTempDir)
799         cfg := TestingConfig
800         cfg.Seed = true
801         cfg.DataDir = greetingTempDir
802         seeder, err := NewClient(&cfg)
803         require.NoError(t, err)
804         defer seeder.Close()
805         if ps.ExportClientStatus {
806                 testutil.ExportStatusWriter(seeder, "s")
807         }
808         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
809         leecherDataDir, err := ioutil.TempDir("", "")
810         require.NoError(t, err)
811         defer os.RemoveAll(leecherDataDir)
812         fc, err := filecache.NewCache(leecherDataDir)
813         require.NoError(t, err)
814         if ps.SetLeecherStorageCapacity {
815                 fc.SetCapacity(ps.LeecherStorageCapacity)
816         }
817         cfg.DefaultStorage = storage.NewFileStorePieces(fc.AsFileStore())
818         cfg.DataDir = leecherDataDir
819         leecher, _ := NewClient(&cfg)
820         defer leecher.Close()
821         if ps.ExportClientStatus {
822                 testutil.ExportStatusWriter(leecher, "l")
823         }
824         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
825                 ret = TorrentSpecFromMetaInfo(mi)
826                 ret.ChunkSize = 2
827                 return
828         }())
829         require.NoError(t, err)
830         assert.True(t, new)
831         psc := leecherGreeting.SubscribePieceStateChanges()
832         defer psc.Close()
833         leecherGreeting.DownloadAll()
834         if ps.Cancel {
835                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
836         }
837         addClientPeer(leecherGreeting, seeder)
838         completes := make(map[int]bool, 3)
839 values:
840         for {
841                 // started := time.Now()
842                 select {
843                 case _v := <-psc.Values:
844                         // log.Print(time.Since(started))
845                         v := _v.(PieceStateChange)
846                         completes[v.Index] = v.Complete
847                 case <-time.After(100 * time.Millisecond):
848                         break values
849                 }
850         }
851         if ps.Cancel {
852                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
853         } else {
854                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
855         }
856
857 }
858
859 func TestTorrentDownloadAll(t *testing.T) {
860         testDownloadCancel(t, testDownloadCancelParams{})
861 }
862
863 func TestTorrentDownloadAllThenCancel(t *testing.T) {
864         testDownloadCancel(t, testDownloadCancelParams{
865                 Cancel: true,
866         })
867 }
868
869 // Ensure that it's an error for a peer to send an invalid have message.
870 func TestPeerInvalidHave(t *testing.T) {
871         cl, err := NewClient(&TestingConfig)
872         require.NoError(t, err)
873         defer cl.Close()
874         info := metainfo.Info{
875                 PieceLength: 1,
876                 Pieces:      make([]byte, 20),
877                 Files:       []metainfo.FileInfo{{Length: 1}},
878         }
879         infoBytes, err := bencode.Marshal(info)
880         require.NoError(t, err)
881         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
882                 InfoBytes: infoBytes,
883                 InfoHash:  metainfo.HashBytes(infoBytes),
884         })
885         require.NoError(t, err)
886         assert.True(t, _new)
887         defer tt.Drop()
888         cn := &connection{
889                 t: tt,
890         }
891         assert.NoError(t, cn.peerSentHave(0))
892         assert.Error(t, cn.peerSentHave(1))
893 }
894
895 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
896         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
897         defer os.RemoveAll(greetingTempDir)
898         cfg := TestingConfig
899         cfg.DataDir = greetingTempDir
900         seeder, err := NewClient(&TestingConfig)
901         require.NoError(t, err)
902         seeder.AddTorrentSpec(&TorrentSpec{
903                 InfoBytes: greetingMetainfo.InfoBytes,
904         })
905 }
906
907 func TestPrepareTrackerAnnounce(t *testing.T) {
908         cl := &Client{}
909         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
910         require.NoError(t, err)
911         assert.False(t, blocked)
912         assert.EqualValues(t, "localhost:1234", host)
913         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
914 }
915
916 // Check that when the listen port is 0, all the protocols listened on have
917 // the same port, and it isn't zero.
918 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
919         cl, err := NewClient(&TestingConfig)
920         require.NoError(t, err)
921         defer cl.Close()
922         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
923         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
924 }
925
926 func TestClientDynamicListenTCPOnly(t *testing.T) {
927         cfg := TestingConfig
928         cfg.DisableUTP = true
929         cl, err := NewClient(&cfg)
930         require.NoError(t, err)
931         defer cl.Close()
932         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
933         assert.Nil(t, cl.utpSock)
934 }
935
936 func TestClientDynamicListenUTPOnly(t *testing.T) {
937         cfg := TestingConfig
938         cfg.DisableTCP = true
939         cl, err := NewClient(&cfg)
940         require.NoError(t, err)
941         defer cl.Close()
942         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
943         assert.Nil(t, cl.tcpListener)
944 }
945
946 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
947         cfg := TestingConfig
948         cfg.DisableTCP = true
949         cfg.DisableUTP = true
950         cl, err := NewClient(&cfg)
951         require.NoError(t, err)
952         defer cl.Close()
953         assert.Nil(t, cl.ListenAddr())
954 }
955
956 func addClientPeer(t *Torrent, cl *Client) {
957         t.AddPeers([]Peer{
958                 Peer{
959                         IP:   missinggo.AddrIP(cl.ListenAddr()),
960                         Port: missinggo.AddrPort(cl.ListenAddr()),
961                 },
962         })
963 }
964
965 func printConnPeerCounts(t *Torrent) {
966         t.cl.mu.Lock()
967         log.Println(len(t.conns), len(t.peers))
968         t.cl.mu.Unlock()
969 }
970
971 func totalConns(tts []*Torrent) (ret int) {
972         for _, tt := range tts {
973                 tt.cl.mu.Lock()
974                 ret += len(tt.conns)
975                 tt.cl.mu.Unlock()
976         }
977         return
978 }
979
980 func TestSetMaxEstablishedConn(t *testing.T) {
981         var tts []*Torrent
982         ih := testutil.GreetingMetaInfo().HashInfoBytes()
983         cfg := TestingConfig
984         for i := range iter.N(3) {
985                 cl, err := NewClient(&cfg)
986                 require.NoError(t, err)
987                 defer cl.Close()
988                 tt, _ := cl.AddTorrentInfoHash(ih)
989                 tt.SetMaxEstablishedConns(2)
990                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
991                 tts = append(tts, tt)
992         }
993         addPeers := func() {
994                 for i, tt := range tts {
995                         for _, _tt := range tts[:i] {
996                                 addClientPeer(tt, _tt.cl)
997                         }
998                 }
999         }
1000         waitTotalConns := func(num int) {
1001                 for totalConns(tts) != num {
1002                         time.Sleep(time.Millisecond)
1003                 }
1004         }
1005         addPeers()
1006         waitTotalConns(6)
1007         tts[0].SetMaxEstablishedConns(1)
1008         waitTotalConns(4)
1009         tts[0].SetMaxEstablishedConns(0)
1010         waitTotalConns(2)
1011         tts[0].SetMaxEstablishedConns(1)
1012         addPeers()
1013         waitTotalConns(4)
1014         tts[0].SetMaxEstablishedConns(2)
1015         addPeers()
1016         waitTotalConns(6)
1017 }