]> Sergey Matveev's repositories - btrtrc.git/blob - client_test.go
Add missing error return check in benchmark
[btrtrc.git] / client_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "math/rand"
11         "net"
12         "os"
13         "strings"
14         "sync"
15         "testing"
16         "time"
17
18         _ "github.com/anacrolix/envpprof"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/filecache"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/anacrolix/utp"
23         "github.com/bradfitz/iter"
24         "github.com/stretchr/testify/assert"
25         "github.com/stretchr/testify/require"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/dht"
29         "github.com/anacrolix/torrent/internal/testutil"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32         "github.com/anacrolix/torrent/storage"
33 )
34
35 func init() {
36         log.SetFlags(log.LstdFlags | log.Llongfile)
37 }
38
39 var TestingConfig = Config{
40         ListenAddr:      "localhost:0",
41         NoDHT:           true,
42         DisableTrackers: true,
43         DataDir:         "/dev/null",
44         DHTConfig: dht.ServerConfig{
45                 NoDefaultBootstrap: true,
46         },
47 }
48
49 func TestClientDefault(t *testing.T) {
50         cl, err := NewClient(&TestingConfig)
51         require.NoError(t, err)
52         cl.Close()
53 }
54
55 func TestAddDropTorrent(t *testing.T) {
56         cl, err := NewClient(&TestingConfig)
57         require.NoError(t, err)
58         defer cl.Close()
59         dir, mi := testutil.GreetingTestTorrent()
60         defer os.RemoveAll(dir)
61         tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
62         require.NoError(t, err)
63         assert.True(t, new)
64         tt.SetMaxEstablishedConns(0)
65         tt.SetMaxEstablishedConns(1)
66         tt.Drop()
67 }
68
69 func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
70         t.SkipNow()
71 }
72
73 func TestAddTorrentNoUsableURLs(t *testing.T) {
74         t.SkipNow()
75 }
76
77 func TestAddPeersToUnknownTorrent(t *testing.T) {
78         t.SkipNow()
79 }
80
81 func TestPieceHashSize(t *testing.T) {
82         if pieceHash.Size() != 20 {
83                 t.FailNow()
84         }
85 }
86
87 func TestTorrentInitialState(t *testing.T) {
88         dir, mi := testutil.GreetingTestTorrent()
89         defer os.RemoveAll(dir)
90         tor := &Torrent{
91                 infoHash:          mi.HashInfoBytes(),
92                 pieceStateChanges: pubsub.NewPubSub(),
93         }
94         tor.chunkSize = 2
95         tor.storageOpener = storage.NewFile("/dev/null")
96         // Needed to lock for asynchronous piece verification.
97         tor.cl = new(Client)
98         err := tor.setInfoBytes(mi.InfoBytes)
99         require.NoError(t, err)
100         require.Len(t, tor.pieces, 3)
101         tor.pendAllChunkSpecs(0)
102         tor.cl.mu.Lock()
103         assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
104         tor.cl.mu.Unlock()
105         assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
106 }
107
108 func TestUnmarshalPEXMsg(t *testing.T) {
109         var m peerExchangeMessage
110         if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
111                 t.Fatal(err)
112         }
113         if len(m.Added) != 2 {
114                 t.FailNow()
115         }
116         if m.Added[0].Port != 0x506 {
117                 t.FailNow()
118         }
119 }
120
121 func TestReducedDialTimeout(t *testing.T) {
122         for _, _case := range []struct {
123                 Max             time.Duration
124                 HalfOpenLimit   int
125                 PendingPeers    int
126                 ExpectedReduced time.Duration
127         }{
128                 {nominalDialTimeout, 40, 0, nominalDialTimeout},
129                 {nominalDialTimeout, 40, 1, nominalDialTimeout},
130                 {nominalDialTimeout, 40, 39, nominalDialTimeout},
131                 {nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
132                 {nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
133                 {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
134         } {
135                 reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
136                 expected := _case.ExpectedReduced
137                 if expected < minDialTimeout {
138                         expected = minDialTimeout
139                 }
140                 if reduced != expected {
141                         t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
142                 }
143         }
144 }
145
146 func TestUTPRawConn(t *testing.T) {
147         l, err := utp.NewSocket("udp", "")
148         if err != nil {
149                 t.Fatal(err)
150         }
151         defer l.Close()
152         go func() {
153                 for {
154                         _, err := l.Accept()
155                         if err != nil {
156                                 break
157                         }
158                 }
159         }()
160         // Connect a UTP peer to see if the RawConn will still work.
161         s, _ := utp.NewSocket("udp", "")
162         defer s.Close()
163         utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
164         if err != nil {
165                 t.Fatalf("error dialing utp listener: %s", err)
166         }
167         defer utpPeer.Close()
168         peer, err := net.ListenPacket("udp", ":0")
169         if err != nil {
170                 t.Fatal(err)
171         }
172         defer peer.Close()
173
174         msgsReceived := 0
175         // How many messages to send. I've set this to double the channel buffer
176         // size in the raw packetConn.
177         const N = 200
178         readerStopped := make(chan struct{})
179         // The reader goroutine.
180         go func() {
181                 defer close(readerStopped)
182                 b := make([]byte, 500)
183                 for i := 0; i < N; i++ {
184                         n, _, err := l.ReadFrom(b)
185                         if err != nil {
186                                 t.Fatalf("error reading from raw conn: %s", err)
187                         }
188                         msgsReceived++
189                         var d int
190                         fmt.Sscan(string(b[:n]), &d)
191                         if d != i {
192                                 log.Printf("got wrong number: expected %d, got %d", i, d)
193                         }
194                 }
195         }()
196         udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
197         if err != nil {
198                 t.Fatal(err)
199         }
200         for i := 0; i < N; i++ {
201                 _, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
202                 if err != nil {
203                         t.Fatal(err)
204                 }
205                 time.Sleep(time.Microsecond)
206         }
207         select {
208         case <-readerStopped:
209         case <-time.After(time.Second):
210                 t.Fatal("reader timed out")
211         }
212         if msgsReceived != N {
213                 t.Fatalf("messages received: %d", msgsReceived)
214         }
215 }
216
217 func TestTwoClientsArbitraryPorts(t *testing.T) {
218         for i := 0; i < 2; i++ {
219                 cl, err := NewClient(&TestingConfig)
220                 if err != nil {
221                         t.Fatal(err)
222                 }
223                 defer cl.Close()
224         }
225 }
226
227 func TestAddDropManyTorrents(t *testing.T) {
228         cl, err := NewClient(&TestingConfig)
229         require.NoError(t, err)
230         defer cl.Close()
231         for i := range iter.N(1000) {
232                 var spec TorrentSpec
233                 binary.PutVarint(spec.InfoHash[:], int64(i))
234                 tt, new, err := cl.AddTorrentSpec(&spec)
235                 assert.NoError(t, err)
236                 assert.True(t, new)
237                 defer tt.Drop()
238         }
239 }
240
241 type FileCacheClientStorageFactoryParams struct {
242         Capacity    int64
243         SetCapacity bool
244         Wrapper     func(*filecache.Cache) storage.Client
245 }
246
247 func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
248         return func(dataDir string) storage.Client {
249                 fc, err := filecache.NewCache(dataDir)
250                 if err != nil {
251                         panic(err)
252                 }
253                 if ps.SetCapacity {
254                         fc.SetCapacity(ps.Capacity)
255                 }
256                 return ps.Wrapper(fc)
257         }
258 }
259
260 type storageFactory func(string) storage.Client
261
262 func TestClientTransferDefault(t *testing.T) {
263         testClientTransfer(t, testClientTransferParams{
264                 ExportClientStatus: true,
265                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
266                         Wrapper: fileCachePieceResourceStorage,
267                 }),
268         })
269 }
270
271 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.Client {
272         return storage.NewResourcePieces(fc.AsResourceProvider())
273 }
274
275 func fileCachePieceFileStorage(fc *filecache.Cache) storage.Client {
276         return storage.NewFileStorePieces(fc.AsFileStore())
277 }
278
279 func TestClientTransferSmallCache(t *testing.T) {
280         testClientTransfer(t, testClientTransferParams{
281                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
282                         SetCapacity: true,
283                         // Going below the piece length means it can't complete a piece so
284                         // that it can be hashed.
285                         Capacity: 5,
286                         Wrapper:  fileCachePieceResourceStorage,
287                 }),
288                 SetReadahead: true,
289                 // Can't readahead too far or the cache will thrash and drop data we
290                 // thought we had.
291                 Readahead:          0,
292                 ExportClientStatus: true,
293         })
294 }
295
296 func TestClientTransferVarious(t *testing.T) {
297         for _, lsf := range []func(*filecache.Cache) storage.Client{
298                 fileCachePieceFileStorage,
299                 fileCachePieceResourceStorage,
300         } {
301                 for _, ss := range []func(string) storage.Client{
302                         storage.NewFile,
303                         storage.NewMMap,
304                 } {
305                         for _, responsive := range []bool{false, true} {
306                                 testClientTransfer(t, testClientTransferParams{
307                                         Responsive:    responsive,
308                                         SeederStorage: ss,
309                                         LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
310                                                 Wrapper: lsf,
311                                         }),
312                                 })
313                                 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
314                                         testClientTransfer(t, testClientTransferParams{
315                                                 SeederStorage: ss,
316                                                 Responsive:    responsive,
317                                                 SetReadahead:  true,
318                                                 Readahead:     readahead,
319                                                 LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
320                                                         Wrapper: lsf,
321                                                 }),
322                                         })
323                                 }
324                         }
325                 }
326         }
327 }
328
329 type testClientTransferParams struct {
330         Responsive         bool
331         Readahead          int64
332         SetReadahead       bool
333         ExportClientStatus bool
334         LeecherStorage     func(string) storage.Client
335         SeederStorage      func(string) storage.Client
336 }
337
338 // Creates a seeder and a leecher, and ensures the data transfers when a read
339 // is attempted on the leecher.
340 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
341         greetingTempDir, mi := testutil.GreetingTestTorrent()
342         defer os.RemoveAll(greetingTempDir)
343         // Create seeder and a Torrent.
344         cfg := TestingConfig
345         cfg.Seed = true
346         cfg.ListenAddr = "localhost:4000"
347         if ps.SeederStorage != nil {
348                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
349         } else {
350                 cfg.DataDir = greetingTempDir
351         }
352         seeder, err := NewClient(&cfg)
353         require.NoError(t, err)
354         defer seeder.Close()
355         if ps.ExportClientStatus {
356                 testutil.ExportStatusWriter(seeder, "s")
357         }
358         seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
359         require.NoError(t, err)
360         assert.True(t, new)
361         // Create leecher and a Torrent.
362         leecherDataDir, err := ioutil.TempDir("", "")
363         require.NoError(t, err)
364         defer os.RemoveAll(leecherDataDir)
365         cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
366         cfg.ListenAddr = "localhost:4001"
367         leecher, err := NewClient(&cfg)
368         require.NoError(t, err)
369         defer leecher.Close()
370         if ps.ExportClientStatus {
371                 testutil.ExportStatusWriter(leecher, "l")
372         }
373         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
374                 ret = TorrentSpecFromMetaInfo(mi)
375                 ret.ChunkSize = 2
376                 ret.Storage = storage.NewFile(leecherDataDir)
377                 return
378         }())
379         require.NoError(t, err)
380         assert.True(t, new)
381         // Now do some things with leecher and seeder.
382         addClientPeer(leecherGreeting, seeder)
383         r := leecherGreeting.NewReader()
384         defer r.Close()
385         if ps.Responsive {
386                 r.SetResponsive()
387         }
388         if ps.SetReadahead {
389                 r.SetReadahead(ps.Readahead)
390         }
391         assertReadAllGreeting(t, r)
392         // After one read through, we can assume certain torrent statistics.
393         // These are not a strict requirement. It is however interesting to
394         // follow.
395         t.Logf("%#v", seederTorrent.Stats())
396         assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
397         assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
398         assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
399         assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
400         // Read through again for the cases where the torrent data size exceeds
401         // the size of the cache.
402         assertReadAllGreeting(t, r)
403 }
404
405 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
406         pos, err := r.Seek(0, os.SEEK_SET)
407         assert.NoError(t, err)
408         assert.EqualValues(t, 0, pos)
409         _greeting, err := ioutil.ReadAll(r)
410         assert.NoError(t, err)
411         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
412 }
413
414 // Check that after completing leeching, a leecher transitions to a seeding
415 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
416 func TestSeedAfterDownloading(t *testing.T) {
417         greetingTempDir, mi := testutil.GreetingTestTorrent()
418         defer os.RemoveAll(greetingTempDir)
419         cfg := TestingConfig
420         cfg.Seed = true
421         cfg.DataDir = greetingTempDir
422         seeder, err := NewClient(&cfg)
423         require.NoError(t, err)
424         defer seeder.Close()
425         testutil.ExportStatusWriter(seeder, "s")
426         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
427         cfg.DataDir, err = ioutil.TempDir("", "")
428         require.NoError(t, err)
429         defer os.RemoveAll(cfg.DataDir)
430         leecher, err := NewClient(&cfg)
431         require.NoError(t, err)
432         defer leecher.Close()
433         testutil.ExportStatusWriter(leecher, "l")
434         cfg.Seed = false
435         // cfg.TorrentDataOpener = nil
436         cfg.DataDir, err = ioutil.TempDir("", "")
437         require.NoError(t, err)
438         defer os.RemoveAll(cfg.DataDir)
439         leecherLeecher, _ := NewClient(&cfg)
440         defer leecherLeecher.Close()
441         testutil.ExportStatusWriter(leecherLeecher, "ll")
442         leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
443                 ret = TorrentSpecFromMetaInfo(mi)
444                 ret.ChunkSize = 2
445                 return
446         }())
447         llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
448                 ret = TorrentSpecFromMetaInfo(mi)
449                 ret.ChunkSize = 3
450                 return
451         }())
452         // Simultaneously DownloadAll in Leecher, and read the contents
453         // consecutively in LeecherLeecher. This non-deterministically triggered a
454         // case where the leecher wouldn't unchoke the LeecherLeecher.
455         var wg sync.WaitGroup
456         wg.Add(1)
457         go func() {
458                 defer wg.Done()
459                 r := llg.NewReader()
460                 defer r.Close()
461                 b, err := ioutil.ReadAll(r)
462                 require.NoError(t, err)
463                 assert.EqualValues(t, testutil.GreetingFileContents, b)
464         }()
465         addClientPeer(leecherGreeting, seeder)
466         addClientPeer(leecherGreeting, leecherLeecher)
467         wg.Add(1)
468         go func() {
469                 defer wg.Done()
470                 leecherGreeting.DownloadAll()
471                 leecher.WaitAll()
472         }()
473         wg.Wait()
474 }
475
476 func TestMergingTrackersByAddingSpecs(t *testing.T) {
477         cl, err := NewClient(&TestingConfig)
478         require.NoError(t, err)
479         defer cl.Close()
480         spec := TorrentSpec{}
481         T, new, _ := cl.AddTorrentSpec(&spec)
482         if !new {
483                 t.FailNow()
484         }
485         spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
486         _, new, _ = cl.AddTorrentSpec(&spec)
487         assert.False(t, new)
488         assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
489         // Because trackers are disabled in TestingConfig.
490         assert.EqualValues(t, 0, len(T.trackerAnnouncers))
491 }
492
493 type badStorage struct{}
494
495 func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.Torrent, error) {
496         return bs, nil
497 }
498
499 func (bs badStorage) Close() error {
500         return nil
501 }
502
503 func (bs badStorage) Piece(p metainfo.Piece) storage.Piece {
504         return badStoragePiece{p}
505 }
506
507 type badStoragePiece struct {
508         p metainfo.Piece
509 }
510
511 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
512         return 0, nil
513 }
514
515 func (p badStoragePiece) GetIsComplete() bool {
516         return true
517 }
518
519 func (p badStoragePiece) MarkComplete() error {
520         return errors.New("psyyyyyyyche")
521 }
522
523 func (p badStoragePiece) randomlyTruncatedDataString() string {
524         return "hello, world\n"[:rand.Intn(14)]
525 }
526
527 func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
528         r := strings.NewReader(p.randomlyTruncatedDataString())
529         return r.ReadAt(b, off+p.p.Offset())
530 }
531
532 // We read from a piece which is marked completed, but is missing data.
533 func TestCompletedPieceWrongSize(t *testing.T) {
534         cfg := TestingConfig
535         cfg.DefaultStorage = badStorage{}
536         cl, err := NewClient(&cfg)
537         require.NoError(t, err)
538         defer cl.Close()
539         info := metainfo.Info{
540                 PieceLength: 15,
541                 Pieces:      make([]byte, 20),
542                 Files: []metainfo.FileInfo{
543                         metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
544                 },
545         }
546         b, err := bencode.Marshal(info)
547         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
548                 InfoBytes: b,
549                 InfoHash:  metainfo.HashBytes(b),
550         })
551         require.NoError(t, err)
552         defer tt.Drop()
553         assert.True(t, new)
554         r := tt.NewReader()
555         defer r.Close()
556         b, err = ioutil.ReadAll(r)
557         assert.Len(t, b, 13)
558         assert.NoError(t, err)
559 }
560
561 func BenchmarkAddLargeTorrent(b *testing.B) {
562         cfg := TestingConfig
563         cfg.DisableTCP = true
564         cfg.DisableUTP = true
565         cfg.ListenAddr = "redonk"
566         cl, err := NewClient(&cfg)
567         require.NoError(b, err)
568         defer cl.Close()
569         for range iter.N(b.N) {
570                 t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
571                 if err != nil {
572                         b.Fatal(err)
573                 }
574                 t.Drop()
575         }
576 }
577
578 func TestResponsive(t *testing.T) {
579         seederDataDir, mi := testutil.GreetingTestTorrent()
580         defer os.RemoveAll(seederDataDir)
581         cfg := TestingConfig
582         cfg.Seed = true
583         cfg.DataDir = seederDataDir
584         seeder, err := NewClient(&cfg)
585         require.Nil(t, err)
586         defer seeder.Close()
587         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
588         leecherDataDir, err := ioutil.TempDir("", "")
589         require.Nil(t, err)
590         defer os.RemoveAll(leecherDataDir)
591         cfg = TestingConfig
592         cfg.DataDir = leecherDataDir
593         leecher, err := NewClient(&cfg)
594         require.Nil(t, err)
595         defer leecher.Close()
596         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
597                 ret = TorrentSpecFromMetaInfo(mi)
598                 ret.ChunkSize = 2
599                 return
600         }())
601         addClientPeer(leecherTorrent, seeder)
602         reader := leecherTorrent.NewReader()
603         defer reader.Close()
604         reader.SetReadahead(0)
605         reader.SetResponsive()
606         b := make([]byte, 2)
607         _, err = reader.Seek(3, os.SEEK_SET)
608         require.NoError(t, err)
609         _, err = io.ReadFull(reader, b)
610         assert.Nil(t, err)
611         assert.EqualValues(t, "lo", string(b))
612         _, err = reader.Seek(11, os.SEEK_SET)
613         require.NoError(t, err)
614         n, err := io.ReadFull(reader, b)
615         assert.Nil(t, err)
616         assert.EqualValues(t, 2, n)
617         assert.EqualValues(t, "d\n", string(b))
618 }
619
620 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
621         seederDataDir, mi := testutil.GreetingTestTorrent()
622         defer os.RemoveAll(seederDataDir)
623         cfg := TestingConfig
624         cfg.Seed = true
625         cfg.DataDir = seederDataDir
626         seeder, err := NewClient(&cfg)
627         require.Nil(t, err)
628         defer seeder.Close()
629         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
630         leecherDataDir, err := ioutil.TempDir("", "")
631         require.Nil(t, err)
632         defer os.RemoveAll(leecherDataDir)
633         cfg = TestingConfig
634         cfg.DataDir = leecherDataDir
635         leecher, err := NewClient(&cfg)
636         require.Nil(t, err)
637         defer leecher.Close()
638         leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
639                 ret = TorrentSpecFromMetaInfo(mi)
640                 ret.ChunkSize = 2
641                 return
642         }())
643         addClientPeer(leecherTorrent, seeder)
644         reader := leecherTorrent.NewReader()
645         defer reader.Close()
646         reader.SetReadahead(0)
647         reader.SetResponsive()
648         b := make([]byte, 2)
649         _, err = reader.Seek(3, os.SEEK_SET)
650         require.NoError(t, err)
651         _, err = io.ReadFull(reader, b)
652         assert.Nil(t, err)
653         assert.EqualValues(t, "lo", string(b))
654         go leecherTorrent.Drop()
655         _, err = reader.Seek(11, os.SEEK_SET)
656         require.NoError(t, err)
657         n, err := reader.Read(b)
658         assert.EqualError(t, err, "torrent closed")
659         assert.EqualValues(t, 0, n)
660 }
661
662 func TestDHTInheritBlocklist(t *testing.T) {
663         ipl := iplist.New(nil)
664         require.NotNil(t, ipl)
665         cfg := TestingConfig
666         cfg.IPBlocklist = ipl
667         cfg.NoDHT = false
668         cl, err := NewClient(&cfg)
669         require.NoError(t, err)
670         defer cl.Close()
671         require.Equal(t, ipl, cl.DHT().IPBlocklist())
672 }
673
674 // Check that stuff is merged in subsequent AddTorrentSpec for the same
675 // infohash.
676 func TestAddTorrentSpecMerging(t *testing.T) {
677         cl, err := NewClient(&TestingConfig)
678         require.NoError(t, err)
679         defer cl.Close()
680         dir, mi := testutil.GreetingTestTorrent()
681         defer os.RemoveAll(dir)
682         tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
683                 InfoHash: mi.HashInfoBytes(),
684         })
685         require.NoError(t, err)
686         require.True(t, new)
687         require.Nil(t, tt.Info())
688         _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
689         require.NoError(t, err)
690         require.False(t, new)
691         require.NotNil(t, tt.Info())
692 }
693
694 func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
695         dir, mi := testutil.GreetingTestTorrent()
696         os.RemoveAll(dir)
697         cl, _ := NewClient(&TestingConfig)
698         defer cl.Close()
699         tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
700                 InfoHash: mi.HashInfoBytes(),
701         })
702         tt.Drop()
703         assert.EqualValues(t, 0, len(cl.Torrents()))
704         select {
705         case <-tt.GotInfo():
706                 t.FailNow()
707         default:
708         }
709 }
710
711 func writeTorrentData(ts storage.Torrent, info metainfo.Info, b []byte) {
712         for i := range iter.N(info.NumPieces()) {
713                 n, _ := ts.Piece(info.Piece(i)).WriteAt(b, 0)
714                 b = b[n:]
715         }
716 }
717
718 func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.Client) {
719         fileCacheDir, err := ioutil.TempDir("", "")
720         require.NoError(t, err)
721         defer os.RemoveAll(fileCacheDir)
722         fileCache, err := filecache.NewCache(fileCacheDir)
723         require.NoError(t, err)
724         greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
725         defer os.RemoveAll(greetingDataTempDir)
726         filePieceStore := csf(fileCache)
727         info := greetingMetainfo.UnmarshalInfo()
728         ih := greetingMetainfo.HashInfoBytes()
729         greetingData, err := filePieceStore.OpenTorrent(&info, ih)
730         require.NoError(t, err)
731         writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
732         // require.Equal(t, len(testutil.GreetingFileContents), written)
733         // require.NoError(t, err)
734         for i := 0; i < info.NumPieces(); i++ {
735                 p := info.Piece(i)
736                 if alreadyCompleted {
737                         err := greetingData.Piece(p).MarkComplete()
738                         assert.NoError(t, err)
739                 }
740         }
741         cfg := TestingConfig
742         // TODO: Disable network option?
743         cfg.DisableTCP = true
744         cfg.DisableUTP = true
745         cfg.DefaultStorage = filePieceStore
746         cl, err := NewClient(&cfg)
747         require.NoError(t, err)
748         defer cl.Close()
749         tt, err := cl.AddTorrent(greetingMetainfo)
750         require.NoError(t, err)
751         psrs := tt.PieceStateRuns()
752         assert.Len(t, psrs, 1)
753         assert.EqualValues(t, 3, psrs[0].Length)
754         assert.Equal(t, alreadyCompleted, psrs[0].Complete)
755         if alreadyCompleted {
756                 r := tt.NewReader()
757                 b, err := ioutil.ReadAll(r)
758                 assert.NoError(t, err)
759                 assert.EqualValues(t, testutil.GreetingFileContents, b)
760         }
761 }
762
763 func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
764         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceFileStorage)
765         testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
766 }
767
768 func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
769         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceFileStorage)
770         testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
771 }
772
773 func TestAddMetainfoWithNodes(t *testing.T) {
774         cfg := TestingConfig
775         cfg.NoDHT = false
776         // For now, we want to just jam the nodes into the table, without
777         // verifying them first. Also the DHT code doesn't support mixing secure
778         // and insecure nodes if security is enabled (yet).
779         cfg.DHTConfig.NoSecurity = true
780         cl, err := NewClient(&cfg)
781         require.NoError(t, err)
782         defer cl.Close()
783         assert.EqualValues(t, cl.DHT().NumNodes(), 0)
784         tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
785         require.NoError(t, err)
786         assert.Len(t, tt.metainfo.AnnounceList, 5)
787         assert.EqualValues(t, 6, cl.DHT().NumNodes())
788 }
789
790 type testDownloadCancelParams struct {
791         ExportClientStatus        bool
792         SetLeecherStorageCapacity bool
793         LeecherStorageCapacity    int64
794         Cancel                    bool
795 }
796
797 func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
798         greetingTempDir, mi := testutil.GreetingTestTorrent()
799         defer os.RemoveAll(greetingTempDir)
800         cfg := TestingConfig
801         cfg.Seed = true
802         cfg.DataDir = greetingTempDir
803         seeder, err := NewClient(&cfg)
804         require.NoError(t, err)
805         defer seeder.Close()
806         if ps.ExportClientStatus {
807                 testutil.ExportStatusWriter(seeder, "s")
808         }
809         seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
810         leecherDataDir, err := ioutil.TempDir("", "")
811         require.NoError(t, err)
812         defer os.RemoveAll(leecherDataDir)
813         fc, err := filecache.NewCache(leecherDataDir)
814         require.NoError(t, err)
815         if ps.SetLeecherStorageCapacity {
816                 fc.SetCapacity(ps.LeecherStorageCapacity)
817         }
818         cfg.DefaultStorage = storage.NewFileStorePieces(fc.AsFileStore())
819         cfg.DataDir = leecherDataDir
820         leecher, _ := NewClient(&cfg)
821         defer leecher.Close()
822         if ps.ExportClientStatus {
823                 testutil.ExportStatusWriter(leecher, "l")
824         }
825         leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
826                 ret = TorrentSpecFromMetaInfo(mi)
827                 ret.ChunkSize = 2
828                 return
829         }())
830         require.NoError(t, err)
831         assert.True(t, new)
832         psc := leecherGreeting.SubscribePieceStateChanges()
833         defer psc.Close()
834         leecherGreeting.DownloadAll()
835         if ps.Cancel {
836                 leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
837         }
838         addClientPeer(leecherGreeting, seeder)
839         completes := make(map[int]bool, 3)
840 values:
841         for {
842                 // started := time.Now()
843                 select {
844                 case _v := <-psc.Values:
845                         // log.Print(time.Since(started))
846                         v := _v.(PieceStateChange)
847                         completes[v.Index] = v.Complete
848                 case <-time.After(100 * time.Millisecond):
849                         break values
850                 }
851         }
852         if ps.Cancel {
853                 assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
854         } else {
855                 assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
856         }
857
858 }
859
860 func TestTorrentDownloadAll(t *testing.T) {
861         testDownloadCancel(t, testDownloadCancelParams{})
862 }
863
864 func TestTorrentDownloadAllThenCancel(t *testing.T) {
865         testDownloadCancel(t, testDownloadCancelParams{
866                 Cancel: true,
867         })
868 }
869
870 // Ensure that it's an error for a peer to send an invalid have message.
871 func TestPeerInvalidHave(t *testing.T) {
872         cl, err := NewClient(&TestingConfig)
873         require.NoError(t, err)
874         defer cl.Close()
875         info := metainfo.Info{
876                 PieceLength: 1,
877                 Pieces:      make([]byte, 20),
878                 Files:       []metainfo.FileInfo{{Length: 1}},
879         }
880         infoBytes, err := bencode.Marshal(info)
881         require.NoError(t, err)
882         tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
883                 InfoBytes: infoBytes,
884                 InfoHash:  metainfo.HashBytes(infoBytes),
885         })
886         require.NoError(t, err)
887         assert.True(t, _new)
888         defer tt.Drop()
889         cn := &connection{
890                 t: tt,
891         }
892         assert.NoError(t, cn.peerSentHave(0))
893         assert.Error(t, cn.peerSentHave(1))
894 }
895
896 func TestPieceCompletedInStorageButNotClient(t *testing.T) {
897         greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
898         defer os.RemoveAll(greetingTempDir)
899         cfg := TestingConfig
900         cfg.DataDir = greetingTempDir
901         seeder, err := NewClient(&TestingConfig)
902         require.NoError(t, err)
903         seeder.AddTorrentSpec(&TorrentSpec{
904                 InfoBytes: greetingMetainfo.InfoBytes,
905         })
906 }
907
908 func TestPrepareTrackerAnnounce(t *testing.T) {
909         cl := &Client{}
910         blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
911         require.NoError(t, err)
912         assert.False(t, blocked)
913         assert.EqualValues(t, "localhost:1234", host)
914         assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
915 }
916
917 // Check that when the listen port is 0, all the protocols listened on have
918 // the same port, and it isn't zero.
919 func TestClientDynamicListenPortAllProtocols(t *testing.T) {
920         cl, err := NewClient(&TestingConfig)
921         require.NoError(t, err)
922         defer cl.Close()
923         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
924         assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
925 }
926
927 func TestClientDynamicListenTCPOnly(t *testing.T) {
928         cfg := TestingConfig
929         cfg.DisableUTP = true
930         cl, err := NewClient(&cfg)
931         require.NoError(t, err)
932         defer cl.Close()
933         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
934         assert.Nil(t, cl.utpSock)
935 }
936
937 func TestClientDynamicListenUTPOnly(t *testing.T) {
938         cfg := TestingConfig
939         cfg.DisableTCP = true
940         cl, err := NewClient(&cfg)
941         require.NoError(t, err)
942         defer cl.Close()
943         assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
944         assert.Nil(t, cl.tcpListener)
945 }
946
947 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
948         cfg := TestingConfig
949         cfg.DisableTCP = true
950         cfg.DisableUTP = true
951         cl, err := NewClient(&cfg)
952         require.NoError(t, err)
953         defer cl.Close()
954         assert.Nil(t, cl.ListenAddr())
955 }
956
957 func addClientPeer(t *Torrent, cl *Client) {
958         t.AddPeers([]Peer{
959                 Peer{
960                         IP:   missinggo.AddrIP(cl.ListenAddr()),
961                         Port: missinggo.AddrPort(cl.ListenAddr()),
962                 },
963         })
964 }
965
966 func printConnPeerCounts(t *Torrent) {
967         t.cl.mu.Lock()
968         log.Println(len(t.conns), len(t.peers))
969         t.cl.mu.Unlock()
970 }
971
972 func totalConns(tts []*Torrent) (ret int) {
973         for _, tt := range tts {
974                 tt.cl.mu.Lock()
975                 ret += len(tt.conns)
976                 tt.cl.mu.Unlock()
977         }
978         return
979 }
980
981 func TestSetMaxEstablishedConn(t *testing.T) {
982         var tts []*Torrent
983         ih := testutil.GreetingMetaInfo().HashInfoBytes()
984         cfg := TestingConfig
985         for i := range iter.N(3) {
986                 cl, err := NewClient(&cfg)
987                 require.NoError(t, err)
988                 defer cl.Close()
989                 tt, _ := cl.AddTorrentInfoHash(ih)
990                 tt.SetMaxEstablishedConns(2)
991                 testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
992                 tts = append(tts, tt)
993         }
994         addPeers := func() {
995                 for i, tt := range tts {
996                         for _, _tt := range tts[:i] {
997                                 addClientPeer(tt, _tt.cl)
998                         }
999                 }
1000         }
1001         waitTotalConns := func(num int) {
1002                 for totalConns(tts) != num {
1003                         time.Sleep(time.Millisecond)
1004                 }
1005         }
1006         addPeers()
1007         waitTotalConns(6)
1008         tts[0].SetMaxEstablishedConns(1)
1009         waitTotalConns(4)
1010         tts[0].SetMaxEstablishedConns(0)
1011         waitTotalConns(2)
1012         tts[0].SetMaxEstablishedConns(1)
1013         addPeers()
1014         waitTotalConns(4)
1015         tts[0].SetMaxEstablishedConns(2)
1016         addPeers()
1017         waitTotalConns(6)
1018 }