]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
7fb7d50161a679697efa3b6744a8d42510b93a42
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "context"
6         "crypto/rand"
7         "errors"
8         "expvar"
9         "fmt"
10         "io"
11         "log"
12         "net"
13         "net/url"
14         "strconv"
15         "strings"
16         "time"
17
18         "github.com/anacrolix/dht"
19         "github.com/anacrolix/dht/krpc"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/pproffd"
22         "github.com/anacrolix/missinggo/pubsub"
23         "github.com/anacrolix/missinggo/slices"
24         "github.com/anacrolix/sync"
25         "github.com/dustin/go-humanize"
26         "golang.org/x/time/rate"
27
28         "github.com/anacrolix/torrent/bencode"
29         "github.com/anacrolix/torrent/iplist"
30         "github.com/anacrolix/torrent/metainfo"
31         "github.com/anacrolix/torrent/mse"
32         pp "github.com/anacrolix/torrent/peer_protocol"
33         "github.com/anacrolix/torrent/storage"
34 )
35
36 // Clients contain zero or more Torrents. A Client manages a blocklist, the
37 // TCP/UDP protocol ports, and DHT as desired.
38 type Client struct {
39         mu     sync.RWMutex
40         event  sync.Cond
41         closed missinggo.Event
42
43         config Config
44
45         halfOpenLimit  int
46         peerID         PeerID
47         defaultStorage *storage.Client
48         onClose        []func()
49         tcpListener    net.Listener
50         utpSock        utpSocket
51         dHT            *dht.Server
52         ipBlockList    iplist.Ranger
53         // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
54         extensionBytes peerExtensionBytes
55         // The net.Addr.String part that should be common to all active listeners.
56         listenAddr    string
57         uploadLimit   *rate.Limiter
58         downloadLimit *rate.Limiter
59
60         // Set of addresses that have our client ID. This intentionally will
61         // include ourselves if we end up trying to connect to our own address
62         // through legitimate channels.
63         dopplegangerAddrs map[string]struct{}
64         badPeerIPs        map[string]struct{}
65         torrents          map[metainfo.Hash]*Torrent
66 }
67
68 func (cl *Client) BadPeerIPs() []string {
69         cl.mu.RLock()
70         defer cl.mu.RUnlock()
71         return cl.badPeerIPsLocked()
72 }
73
74 func (cl *Client) badPeerIPsLocked() []string {
75         return slices.FromMapKeys(cl.badPeerIPs).([]string)
76 }
77
78 func (cl *Client) IPBlockList() iplist.Ranger {
79         cl.mu.Lock()
80         defer cl.mu.Unlock()
81         return cl.ipBlockList
82 }
83
84 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
85         cl.mu.Lock()
86         defer cl.mu.Unlock()
87         cl.ipBlockList = list
88         if cl.dHT != nil {
89                 cl.dHT.SetIPBlockList(list)
90         }
91 }
92
93 func (cl *Client) PeerID() PeerID {
94         return cl.peerID
95 }
96
97 type torrentAddr string
98
99 func (torrentAddr) Network() string { return "" }
100
101 func (me torrentAddr) String() string { return string(me) }
102
103 func (cl *Client) ListenAddr() net.Addr {
104         if cl.listenAddr == "" {
105                 return nil
106         }
107         return torrentAddr(cl.listenAddr)
108 }
109
110 // Writes out a human readable status of the client, such as for writing to a
111 // HTTP status page.
112 func (cl *Client) WriteStatus(_w io.Writer) {
113         cl.mu.Lock()
114         defer cl.mu.Unlock()
115         w := bufio.NewWriter(_w)
116         defer w.Flush()
117         if addr := cl.ListenAddr(); addr != nil {
118                 fmt.Fprintf(w, "Listening on %s\n", addr)
119         } else {
120                 fmt.Fprintln(w, "Not listening!")
121         }
122         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
123         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
124         if dht := cl.DHT(); dht != nil {
125                 dhtStats := dht.Stats()
126                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
127                 fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID())
128                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr()))
129                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
130                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
131         }
132         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
133         fmt.Fprintln(w)
134         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
135                 return l.InfoHash().AsString() < r.InfoHash().AsString()
136         }).([]*Torrent) {
137                 if t.name() == "" {
138                         fmt.Fprint(w, "<unknown name>")
139                 } else {
140                         fmt.Fprint(w, t.name())
141                 }
142                 fmt.Fprint(w, "\n")
143                 if t.info != nil {
144                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
145                 } else {
146                         w.WriteString("<missing metainfo>")
147                 }
148                 fmt.Fprint(w, "\n")
149                 t.writeStatus(w)
150                 fmt.Fprintln(w)
151         }
152 }
153
154 func listenUTP(networkSuffix, addr string) (utpSocket, error) {
155         return NewUtpSocket("udp"+networkSuffix, addr)
156 }
157
158 func listenTCP(networkSuffix, addr string) (net.Listener, error) {
159         return net.Listen("tcp"+networkSuffix, addr)
160 }
161
162 func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
163         for {
164                 tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
165                 if err != nil {
166                         return
167                 }
168                 listenedAddr = tcpL.Addr().String()
169                 utpSock, err = listenUTP(networkSuffix, listenedAddr)
170                 if err == nil {
171                         return
172                 }
173                 tcpL.Close()
174                 if !strings.Contains(err.Error(), "address already in use") {
175                         return
176                 }
177         }
178 }
179
180 // Listen to enabled protocols, ensuring ports match.
181 func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
182         if addr == "" {
183                 addr = ":50007"
184         }
185         if tcp && utp {
186                 var host string
187                 var port int
188                 host, port, err = missinggo.ParseHostPort(addr)
189                 if err != nil {
190                         return
191                 }
192                 if port == 0 {
193                         // If both protocols are active, they need to have the same port.
194                         return listenBothSameDynamicPort(networkSuffix, host)
195                 }
196         }
197         defer func() {
198                 if err != nil {
199                         listenedAddr = ""
200                 }
201         }()
202         if tcp {
203                 tcpL, err = listenTCP(networkSuffix, addr)
204                 if err != nil {
205                         return
206                 }
207                 defer func() {
208                         if err != nil {
209                                 tcpL.Close()
210                         }
211                 }()
212                 listenedAddr = tcpL.Addr().String()
213         }
214         if utp {
215                 utpSock, err = listenUTP(networkSuffix, addr)
216                 if err != nil {
217                         return
218                 }
219                 listenedAddr = utpSock.Addr().String()
220         }
221         return
222 }
223
224 // Creates a new client.
225 func NewClient(cfg *Config) (cl *Client, err error) {
226         if cfg == nil {
227                 cfg = &Config{
228                         DHTConfig: dht.ServerConfig{
229                                 StartingNodes: dht.GlobalBootstrapAddrs,
230                         },
231                 }
232         }
233         if cfg == nil {
234                 cfg = &Config{}
235         }
236         cfg.setDefaults()
237
238         defer func() {
239                 if err != nil {
240                         cl = nil
241                 }
242         }()
243         cl = &Client{
244                 halfOpenLimit:     cfg.HalfOpenConnsPerTorrent,
245                 config:            *cfg,
246                 dopplegangerAddrs: make(map[string]struct{}),
247                 torrents:          make(map[metainfo.Hash]*Torrent),
248         }
249         defer func() {
250                 if err == nil {
251                         return
252                 }
253                 cl.Close()
254         }()
255         if cfg.UploadRateLimiter == nil {
256                 cl.uploadLimit = rate.NewLimiter(rate.Inf, 0)
257         } else {
258                 cl.uploadLimit = cfg.UploadRateLimiter
259         }
260         if cfg.DownloadRateLimiter == nil {
261                 cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
262         } else {
263                 cl.downloadLimit = cfg.DownloadRateLimiter
264         }
265         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
266         cl.event.L = &cl.mu
267         storageImpl := cfg.DefaultStorage
268         if storageImpl == nil {
269                 // We'd use mmap but HFS+ doesn't support sparse files.
270                 storageImpl = storage.NewFile(cfg.DataDir)
271                 cl.onClose = append(cl.onClose, func() {
272                         if err := storageImpl.Close(); err != nil {
273                                 log.Printf("error closing default storage: %s", err)
274                         }
275                 })
276         }
277         cl.defaultStorage = storage.NewClient(storageImpl)
278         if cfg.IPBlocklist != nil {
279                 cl.ipBlockList = cfg.IPBlocklist
280         }
281
282         if cfg.PeerID != "" {
283                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
284         } else {
285                 o := copy(cl.peerID[:], cfg.Bep20)
286                 _, err = rand.Read(cl.peerID[o:])
287                 if err != nil {
288                         panic("error generating peer id")
289                 }
290         }
291
292         cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
293                 !cl.config.DisableTCP,
294                 !cl.config.DisableUTP,
295                 func() string {
296                         if cl.config.DisableIPv6 {
297                                 return "4"
298                         } else {
299                                 return ""
300                         }
301                 }(),
302                 cl.config.ListenAddr)
303         if err != nil {
304                 return
305         }
306         if cl.tcpListener != nil {
307                 go cl.acceptConnections(cl.tcpListener, false)
308         }
309         if cl.utpSock != nil {
310                 go cl.acceptConnections(cl.utpSock, true)
311         }
312         if !cfg.NoDHT {
313                 dhtCfg := cfg.DHTConfig
314                 if dhtCfg.IPBlocklist == nil {
315                         dhtCfg.IPBlocklist = cl.ipBlockList
316                 }
317                 if dhtCfg.Conn == nil {
318                         if cl.utpSock != nil {
319                                 dhtCfg.Conn = cl.utpSock
320                         } else {
321                                 dhtCfg.Conn, err = net.ListenPacket("udp", firstNonEmptyString(cl.listenAddr, cl.config.ListenAddr))
322                                 if err != nil {
323                                         return
324                                 }
325                         }
326                 }
327                 if dhtCfg.OnAnnouncePeer == nil {
328                         dhtCfg.OnAnnouncePeer = cl.onDHTAnnouncePeer
329                 }
330                 cl.dHT, err = dht.NewServer(&dhtCfg)
331                 if err != nil {
332                         return
333                 }
334                 go func() {
335                         if _, err := cl.dHT.Bootstrap(); err != nil {
336                                 log.Printf("error bootstrapping dht: %s", err)
337                         }
338                 }()
339         }
340
341         return
342 }
343
344 func firstNonEmptyString(ss ...string) string {
345         for _, s := range ss {
346                 if s != "" {
347                         return s
348                 }
349         }
350         return ""
351 }
352
353 // Stops the client. All connections to peers are closed and all activity will
354 // come to a halt.
355 func (cl *Client) Close() {
356         cl.mu.Lock()
357         defer cl.mu.Unlock()
358         cl.closed.Set()
359         if cl.dHT != nil {
360                 cl.dHT.Close()
361         }
362         if cl.utpSock != nil {
363                 cl.utpSock.Close()
364         }
365         if cl.tcpListener != nil {
366                 cl.tcpListener.Close()
367         }
368         for _, t := range cl.torrents {
369                 t.close()
370         }
371         for _, f := range cl.onClose {
372                 f()
373         }
374         cl.event.Broadcast()
375 }
376
377 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
378
379 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
380         if cl.ipBlockList == nil {
381                 return
382         }
383         ip4 := ip.To4()
384         // If blocklists are enabled, then block non-IPv4 addresses, because
385         // blocklists do not yet support IPv6.
386         if ip4 == nil {
387                 if missinggo.CryHeard() {
388                         log.Printf("blocking non-IPv4 address: %s", ip)
389                 }
390                 r = ipv6BlockRange
391                 blocked = true
392                 return
393         }
394         return cl.ipBlockList.Lookup(ip4)
395 }
396
397 func (cl *Client) waitAccept() {
398         for {
399                 for _, t := range cl.torrents {
400                         if t.wantConns() {
401                                 return
402                         }
403                 }
404                 if cl.closed.IsSet() {
405                         return
406                 }
407                 cl.event.Wait()
408         }
409 }
410
411 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
412         cl.mu.Lock()
413         defer cl.mu.Unlock()
414         for {
415                 cl.waitAccept()
416                 cl.mu.Unlock()
417                 conn, err := l.Accept()
418                 conn = pproffd.WrapNetConn(conn)
419                 cl.mu.Lock()
420                 if cl.closed.IsSet() {
421                         if conn != nil {
422                                 conn.Close()
423                         }
424                         return
425                 }
426                 if err != nil {
427                         log.Print(err)
428                         // I think something harsher should happen here? Our accept
429                         // routine just fucked off.
430                         return
431                 }
432                 if utp {
433                         acceptUTP.Add(1)
434                 } else {
435                         acceptTCP.Add(1)
436                 }
437                 if cl.config.Debug {
438                         log.Printf("accepted connection from %s", conn.RemoteAddr())
439                 }
440                 reject := cl.badPeerIPPort(
441                         missinggo.AddrIP(conn.RemoteAddr()),
442                         missinggo.AddrPort(conn.RemoteAddr()))
443                 if reject {
444                         if cl.config.Debug {
445                                 log.Printf("rejecting connection from %s", conn.RemoteAddr())
446                         }
447                         acceptReject.Add(1)
448                         conn.Close()
449                         continue
450                 }
451                 go cl.incomingConnection(conn, utp)
452         }
453 }
454
455 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
456         defer nc.Close()
457         if tc, ok := nc.(*net.TCPConn); ok {
458                 tc.SetLinger(0)
459         }
460         c := cl.newConnection(nc)
461         c.Discovery = peerSourceIncoming
462         c.uTP = utp
463         cl.runReceivedConn(c)
464 }
465
466 // Returns a handle to the given torrent, if it's present in the client.
467 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
468         cl.mu.Lock()
469         defer cl.mu.Unlock()
470         t, ok = cl.torrents[ih]
471         return
472 }
473
474 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
475         return cl.torrents[ih]
476 }
477
478 type dialResult struct {
479         Conn net.Conn
480         UTP  bool
481 }
482
483 func countDialResult(err error) {
484         if err == nil {
485                 successfulDials.Add(1)
486         } else {
487                 unsuccessfulDials.Add(1)
488         }
489 }
490
491 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
492         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
493         if ret < minDialTimeout {
494                 ret = minDialTimeout
495         }
496         return
497 }
498
499 // Returns whether an address is known to connect to a client with our own ID.
500 func (cl *Client) dopplegangerAddr(addr string) bool {
501         _, ok := cl.dopplegangerAddrs[addr]
502         return ok
503 }
504
505 // Start the process of connecting to the given peer for the given torrent if
506 // appropriate.
507 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
508         if peer.Id == cl.peerID {
509                 return
510         }
511         if cl.badPeerIPPort(peer.IP, peer.Port) {
512                 return
513         }
514         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
515         if t.addrActive(addr) {
516                 return
517         }
518         t.halfOpen[addr] = peer
519         go cl.outgoingConnection(t, addr, peer.Source)
520 }
521
522 func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) {
523         d := net.Dialer{
524         // LocalAddr: cl.tcpListener.Addr(),
525         }
526         c, err = d.DialContext(ctx, "tcp", addr)
527         countDialResult(err)
528         if err == nil {
529                 c.(*net.TCPConn).SetLinger(0)
530         }
531         c = pproffd.WrapNetConn(c)
532         return
533 }
534
535 func (cl *Client) dialUTP(ctx context.Context, addr string) (c net.Conn, err error) {
536         c, err = cl.utpSock.DialContext(ctx, addr)
537         countDialResult(err)
538         return
539 }
540
541 var (
542         dialledFirstUtp    = expvar.NewInt("dialledFirstUtp")
543         dialledFirstNotUtp = expvar.NewInt("dialledFirstNotUtp")
544 )
545
546 // Returns a connection over UTP or TCP, whichever is first to connect.
547 func (cl *Client) dialFirst(ctx context.Context, addr string) (conn net.Conn, utp bool) {
548         ctx, cancel := context.WithCancel(ctx)
549         // As soon as we return one connection, cancel the others.
550         defer cancel()
551         left := 0
552         resCh := make(chan dialResult, left)
553         if !cl.config.DisableUTP {
554                 left++
555                 go func() {
556                         c, _ := cl.dialUTP(ctx, addr)
557                         resCh <- dialResult{c, true}
558                 }()
559         }
560         if !cl.config.DisableTCP {
561                 left++
562                 go func() {
563                         c, _ := cl.dialTCP(ctx, addr)
564                         resCh <- dialResult{c, false}
565                 }()
566         }
567         var res dialResult
568         // Wait for a successful connection.
569         for ; left > 0 && res.Conn == nil; left-- {
570                 res = <-resCh
571         }
572         if left > 0 {
573                 // There are still incompleted dials.
574                 go func() {
575                         for ; left > 0; left-- {
576                                 conn := (<-resCh).Conn
577                                 if conn != nil {
578                                         conn.Close()
579                                 }
580                         }
581                 }()
582         }
583         conn = res.Conn
584         utp = res.UTP
585         if conn != nil {
586                 if utp {
587                         dialledFirstUtp.Add(1)
588                 } else {
589                         dialledFirstNotUtp.Add(1)
590                 }
591         }
592         return
593 }
594
595 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
596         if _, ok := t.halfOpen[addr]; !ok {
597                 panic("invariant broken")
598         }
599         delete(t.halfOpen, addr)
600         cl.openNewConns(t)
601 }
602
603 // Performs initiator handshakes and returns a connection. Returns nil
604 // *connection if no connection for valid reasons.
605 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader, utp bool) (c *connection, err error) {
606         c = cl.newConnection(nc)
607         c.headerEncrypted = encryptHeader
608         c.uTP = utp
609         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
610         defer cancel()
611         dl, ok := ctx.Deadline()
612         if !ok {
613                 panic(ctx)
614         }
615         err = nc.SetDeadline(dl)
616         if err != nil {
617                 panic(err)
618         }
619         ok, err = cl.initiateHandshakes(c, t)
620         if !ok {
621                 c = nil
622         }
623         return
624 }
625
626 var (
627         initiatedConnWithPreferredHeaderEncryption = expvar.NewInt("initiatedConnWithPreferredHeaderEncryption")
628         initiatedConnWithFallbackHeaderEncryption  = expvar.NewInt("initiatedConnWithFallbackHeaderEncryption")
629 )
630
631 // Returns nil connection and nil error if no connection could be established
632 // for valid reasons.
633 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
634         ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
635         defer cancel()
636         nc, utp := cl.dialFirst(ctx, addr)
637         if nc == nil {
638                 return
639         }
640         obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
641         c, err = cl.handshakesConnection(ctx, nc, t, obfuscatedHeaderFirst, utp)
642         if err != nil {
643                 // log.Printf("error initiating connection handshakes: %s", err)
644                 nc.Close()
645                 return
646         } else if c != nil {
647                 initiatedConnWithPreferredHeaderEncryption.Add(1)
648                 return
649         }
650         nc.Close()
651         if cl.config.ForceEncryption {
652                 // We should have just tried with an obfuscated header. A plaintext
653                 // header can't result in an encrypted connection, so we're done.
654                 if !obfuscatedHeaderFirst {
655                         panic(cl.config.EncryptionPolicy)
656                 }
657                 return
658         }
659         // Try again with encryption if we didn't earlier, or without if we did,
660         // using whichever protocol type worked last time.
661         if utp {
662                 nc, err = cl.dialUTP(ctx, addr)
663         } else {
664                 nc, err = cl.dialTCP(ctx, addr)
665         }
666         if err != nil {
667                 err = fmt.Errorf("error dialing for header encryption fallback: %s", err)
668                 return
669         }
670         c, err = cl.handshakesConnection(ctx, nc, t, !obfuscatedHeaderFirst, utp)
671         if err != nil || c == nil {
672                 nc.Close()
673         }
674         if err == nil && c != nil {
675                 initiatedConnWithFallbackHeaderEncryption.Add(1)
676         }
677         return
678 }
679
680 // Called to dial out and run a connection. The addr we're given is already
681 // considered half-open.
682 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
683         c, err := cl.establishOutgoingConn(t, addr)
684         cl.mu.Lock()
685         defer cl.mu.Unlock()
686         // Don't release lock between here and addConnection, unless it's for
687         // failure.
688         cl.noLongerHalfOpen(t, addr)
689         if err != nil {
690                 if cl.config.Debug {
691                         log.Printf("error establishing outgoing connection: %s", err)
692                 }
693                 return
694         }
695         if c == nil {
696                 return
697         }
698         defer c.Close()
699         c.Discovery = ps
700         cl.runInitiatedHandshookConn(c, t)
701 }
702
703 // The port number for incoming peer connections. 0 if the client isn't
704 // listening.
705 func (cl *Client) incomingPeerPort() int {
706         if cl.listenAddr == "" {
707                 return 0
708         }
709         _, port, err := missinggo.ParseHostPort(cl.listenAddr)
710         if err != nil {
711                 panic(err)
712         }
713         return port
714 }
715
716 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
717         if c.headerEncrypted {
718                 var rw io.ReadWriter
719                 rw, err = mse.InitiateHandshake(
720                         struct {
721                                 io.Reader
722                                 io.Writer
723                         }{c.r, c.w},
724                         t.infoHash[:],
725                         nil,
726                         func() uint32 {
727                                 switch {
728                                 case cl.config.ForceEncryption:
729                                         return mse.CryptoMethodRC4
730                                 case cl.config.DisableEncryption:
731                                         return mse.CryptoMethodPlaintext
732                                 default:
733                                         return mse.AllSupportedCrypto
734                                 }
735                         }(),
736                 )
737                 c.setRW(rw)
738                 if err != nil {
739                         return
740                 }
741         }
742         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
743         if ih != t.infoHash {
744                 ok = false
745         }
746         return
747 }
748
749 // Calls f with any secret keys.
750 func (cl *Client) forSkeys(f func([]byte) bool) {
751         cl.mu.Lock()
752         defer cl.mu.Unlock()
753         for ih := range cl.torrents {
754                 if !f(ih[:]) {
755                         break
756                 }
757         }
758 }
759
760 // Do encryption and bittorrent handshakes as receiver.
761 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
762         var rw io.ReadWriter
763         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
764         c.setRW(rw)
765         if err != nil {
766                 if err == mse.ErrNoSecretKeyMatch {
767                         err = nil
768                 }
769                 return
770         }
771         if cl.config.ForceEncryption && !c.headerEncrypted {
772                 err = errors.New("connection not encrypted")
773                 return
774         }
775         ih, ok, err := cl.connBTHandshake(c, nil)
776         if err != nil {
777                 err = fmt.Errorf("error during bt handshake: %s", err)
778                 return
779         }
780         if !ok {
781                 return
782         }
783         cl.mu.Lock()
784         t = cl.torrents[ih]
785         cl.mu.Unlock()
786         return
787 }
788
789 // Returns !ok if handshake failed for valid reasons.
790 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
791         res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
792         if err != nil || !ok {
793                 return
794         }
795         ret = res.Hash
796         c.PeerExtensionBytes = res.peerExtensionBytes
797         c.PeerID = res.PeerID
798         c.completedHandshake = time.Now()
799         return
800 }
801
802 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
803         if c.PeerID == cl.peerID {
804                 connsToSelf.Add(1)
805                 addr := c.conn.RemoteAddr().String()
806                 cl.dopplegangerAddrs[addr] = struct{}{}
807                 return
808         }
809         cl.runHandshookConn(c, t, true)
810 }
811
812 func (cl *Client) runReceivedConn(c *connection) {
813         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
814         if err != nil {
815                 panic(err)
816         }
817         t, err := cl.receiveHandshakes(c)
818         if err != nil {
819                 if cl.config.Debug {
820                         log.Printf("error receiving handshakes: %s", err)
821                 }
822                 return
823         }
824         if t == nil {
825                 return
826         }
827         cl.mu.Lock()
828         defer cl.mu.Unlock()
829         if c.PeerID == cl.peerID {
830                 // Because the remote address is not necessarily the same as its
831                 // client's torrent listen address, we won't record the remote address
832                 // as a doppleganger. Instead, the initiator can record *us* as the
833                 // doppleganger.
834                 return
835         }
836         cl.runHandshookConn(c, t, false)
837 }
838
839 func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
840         c.conn.SetWriteDeadline(time.Time{})
841         c.r = deadlineReader{c.conn, c.r}
842         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
843         if !t.addConnection(c, outgoing) {
844                 return
845         }
846         defer t.dropConnection(c)
847         go c.writer(time.Minute)
848         cl.sendInitialMessages(c, t)
849         err := c.mainReadLoop()
850         if err != nil && cl.config.Debug {
851                 log.Printf("error during connection loop: %s", err)
852         }
853 }
854
855 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
856         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
857                 conn.Post(pp.Message{
858                         Type:       pp.Extended,
859                         ExtendedID: pp.HandshakeExtendedID,
860                         ExtendedPayload: func() []byte {
861                                 d := map[string]interface{}{
862                                         "m": func() (ret map[string]int) {
863                                                 ret = make(map[string]int, 2)
864                                                 ret["ut_metadata"] = metadataExtendedId
865                                                 if !cl.config.DisablePEX {
866                                                         ret["ut_pex"] = pexExtendedId
867                                                 }
868                                                 return
869                                         }(),
870                                         "v": cl.config.ExtendedHandshakeClientVersion,
871                                         // No upload queue is implemented yet.
872                                         "reqq": 64,
873                                 }
874                                 if !cl.config.DisableEncryption {
875                                         d["e"] = 1
876                                 }
877                                 if torrent.metadataSizeKnown() {
878                                         d["metadata_size"] = torrent.metadataSize()
879                                 }
880                                 if p := cl.incomingPeerPort(); p != 0 {
881                                         d["p"] = p
882                                 }
883                                 yourip, err := addrCompactIP(conn.remoteAddr())
884                                 if err != nil {
885                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
886                                 } else {
887                                         d["yourip"] = yourip
888                                 }
889                                 // log.Printf("sending %v", d)
890                                 b, err := bencode.Marshal(d)
891                                 if err != nil {
892                                         panic(err)
893                                 }
894                                 return b
895                         }(),
896                 })
897         }
898         if torrent.haveAnyPieces() {
899                 conn.Bitfield(torrent.bitfield())
900         } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
901                 conn.Post(pp.Message{
902                         Type: pp.HaveNone,
903                 })
904         }
905         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
906                 conn.Post(pp.Message{
907                         Type: pp.Port,
908                         Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
909                 })
910         }
911 }
912
913 // Process incoming ut_metadata message.
914 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
915         var d map[string]int
916         err := bencode.Unmarshal(payload, &d)
917         if err != nil {
918                 return fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
919         }
920         msgType, ok := d["msg_type"]
921         if !ok {
922                 return errors.New("missing msg_type field")
923         }
924         piece := d["piece"]
925         switch msgType {
926         case pp.DataMetadataExtensionMsgType:
927                 if !c.requestedMetadataPiece(piece) {
928                         return fmt.Errorf("got unexpected piece %d", piece)
929                 }
930                 c.metadataRequests[piece] = false
931                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
932                 if begin < 0 || begin >= len(payload) {
933                         return fmt.Errorf("data has bad offset in payload: %d", begin)
934                 }
935                 t.saveMetadataPiece(piece, payload[begin:])
936                 c.UsefulChunksReceived++
937                 c.lastUsefulChunkReceived = time.Now()
938                 return t.maybeCompleteMetadata()
939         case pp.RequestMetadataExtensionMsgType:
940                 if !t.haveMetadataPiece(piece) {
941                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
942                         return nil
943                 }
944                 start := (1 << 14) * piece
945                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
946                 return nil
947         case pp.RejectMetadataExtensionMsgType:
948                 return nil
949         default:
950                 return errors.New("unknown msg_type value")
951         }
952 }
953
954 func (cl *Client) sendChunk(t *Torrent, c *connection, r request, msg func(pp.Message) bool) (more bool, err error) {
955         // Count the chunk being sent, even if it isn't.
956         b := make([]byte, r.Length)
957         p := t.info.Piece(int(r.Index))
958         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
959         if n != len(b) {
960                 if err == nil {
961                         panic("expected error")
962                 }
963                 return
964         } else if err == io.EOF {
965                 err = nil
966         }
967         more = msg(pp.Message{
968                 Type:  pp.Piece,
969                 Index: r.Index,
970                 Begin: r.Begin,
971                 Piece: b,
972         })
973         c.chunksSent++
974         uploadChunksPosted.Add(1)
975         c.lastChunkSent = time.Now()
976         return
977 }
978
979 func (cl *Client) openNewConns(t *Torrent) {
980         defer t.updateWantPeersEvent()
981         for len(t.peers) != 0 {
982                 if !t.wantConns() {
983                         return
984                 }
985                 if len(t.halfOpen) >= cl.halfOpenLimit {
986                         return
987                 }
988                 var (
989                         k peersKey
990                         p Peer
991                 )
992                 for k, p = range t.peers {
993                         break
994                 }
995                 delete(t.peers, k)
996                 cl.initiateConn(p, t)
997         }
998 }
999
1000 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1001         if port == 0 {
1002                 return true
1003         }
1004         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1005                 return true
1006         }
1007         if _, ok := cl.ipBlockRange(ip); ok {
1008                 return true
1009         }
1010         if _, ok := cl.badPeerIPs[ip.String()]; ok {
1011                 return true
1012         }
1013         return false
1014 }
1015
1016 // Return a Torrent ready for insertion into a Client.
1017 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1018         // use provided storage, if provided
1019         storageClient := cl.defaultStorage
1020         if specStorage != nil {
1021                 storageClient = storage.NewClient(specStorage)
1022         }
1023
1024         t = &Torrent{
1025                 cl:       cl,
1026                 infoHash: ih,
1027                 peers:    make(map[peersKey]Peer),
1028                 conns:    make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1029
1030                 halfOpen:          make(map[string]Peer),
1031                 pieceStateChanges: pubsub.NewPubSub(),
1032
1033                 storageOpener:       storageClient,
1034                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1035
1036                 networkingEnabled: true,
1037                 requestStrategy:   2,
1038                 metadataChanged: sync.Cond{
1039                         L: &cl.mu,
1040                 },
1041         }
1042         t.setChunkSize(defaultChunkSize)
1043         return
1044 }
1045
1046 // A file-like handle to some torrent data resource.
1047 type Handle interface {
1048         io.Reader
1049         io.Seeker
1050         io.Closer
1051         io.ReaderAt
1052 }
1053
1054 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1055         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1056 }
1057
1058 // Adds a torrent by InfoHash with a custom Storage implementation.
1059 // If the torrent already exists then this Storage is ignored and the
1060 // existing torrent returned with `new` set to `false`
1061 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1062         cl.mu.Lock()
1063         defer cl.mu.Unlock()
1064         t, ok := cl.torrents[infoHash]
1065         if ok {
1066                 return
1067         }
1068         new = true
1069         t = cl.newTorrent(infoHash, specStorage)
1070         if cl.dHT != nil {
1071                 go t.dhtAnnouncer()
1072         }
1073         cl.torrents[infoHash] = t
1074         t.updateWantPeersEvent()
1075         // Tickle Client.waitAccept, new torrent may want conns.
1076         cl.event.Broadcast()
1077         return
1078 }
1079
1080 // Add or merge a torrent spec. If the torrent is already present, the
1081 // trackers will be merged with the existing ones. If the Info isn't yet
1082 // known, it will be set. The display name is replaced if the new spec
1083 // provides one. Returns new if the torrent wasn't already in the client.
1084 // Note that any `Storage` defined on the spec will be ignored if the
1085 // torrent is already present (i.e. `new` return value is `true`)
1086 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1087         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1088         if spec.DisplayName != "" {
1089                 t.SetDisplayName(spec.DisplayName)
1090         }
1091         if spec.InfoBytes != nil {
1092                 err = t.SetInfoBytes(spec.InfoBytes)
1093                 if err != nil {
1094                         return
1095                 }
1096         }
1097         cl.mu.Lock()
1098         defer cl.mu.Unlock()
1099         if spec.ChunkSize != 0 {
1100                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1101         }
1102         t.addTrackers(spec.Trackers)
1103         t.maybeNewConns()
1104         return
1105 }
1106
1107 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1108         t, ok := cl.torrents[infoHash]
1109         if !ok {
1110                 err = fmt.Errorf("no such torrent")
1111                 return
1112         }
1113         err = t.close()
1114         if err != nil {
1115                 panic(err)
1116         }
1117         delete(cl.torrents, infoHash)
1118         return
1119 }
1120
1121 func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
1122         _url, err := url.Parse(announceURL)
1123         if err != nil {
1124                 return
1125         }
1126         hmp := missinggo.SplitHostMaybePort(_url.Host)
1127         if hmp.Err != nil {
1128                 err = hmp.Err
1129                 return
1130         }
1131         addr, err := net.ResolveIPAddr("ip", hmp.Host)
1132         if err != nil {
1133                 return
1134         }
1135         cl.mu.RLock()
1136         _, blocked = cl.ipBlockRange(addr.IP)
1137         cl.mu.RUnlock()
1138         host = _url.Host
1139         hmp.Host = addr.String()
1140         _url.Host = hmp.String()
1141         urlToUse = _url.String()
1142         return
1143 }
1144
1145 func (cl *Client) allTorrentsCompleted() bool {
1146         for _, t := range cl.torrents {
1147                 if !t.haveInfo() {
1148                         return false
1149                 }
1150                 if t.numPiecesCompleted() != t.numPieces() {
1151                         return false
1152                 }
1153         }
1154         return true
1155 }
1156
1157 // Returns true when all torrents are completely downloaded and false if the
1158 // client is stopped before that.
1159 func (cl *Client) WaitAll() bool {
1160         cl.mu.Lock()
1161         defer cl.mu.Unlock()
1162         for !cl.allTorrentsCompleted() {
1163                 if cl.closed.IsSet() {
1164                         return false
1165                 }
1166                 cl.event.Wait()
1167         }
1168         return true
1169 }
1170
1171 // Returns handles to all the torrents loaded in the Client.
1172 func (cl *Client) Torrents() []*Torrent {
1173         cl.mu.Lock()
1174         defer cl.mu.Unlock()
1175         return cl.torrentsAsSlice()
1176 }
1177
1178 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1179         for _, t := range cl.torrents {
1180                 ret = append(ret, t)
1181         }
1182         return
1183 }
1184
1185 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1186         spec, err := TorrentSpecFromMagnetURI(uri)
1187         if err != nil {
1188                 return
1189         }
1190         T, _, err = cl.AddTorrentSpec(spec)
1191         return
1192 }
1193
1194 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1195         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1196         var ss []string
1197         slices.MakeInto(&ss, mi.Nodes)
1198         cl.AddDHTNodes(ss)
1199         return
1200 }
1201
1202 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1203         mi, err := metainfo.LoadFromFile(filename)
1204         if err != nil {
1205                 return
1206         }
1207         return cl.AddTorrent(mi)
1208 }
1209
1210 func (cl *Client) DHT() *dht.Server {
1211         return cl.dHT
1212 }
1213
1214 func (cl *Client) AddDHTNodes(nodes []string) {
1215         if cl.DHT() == nil {
1216                 return
1217         }
1218         for _, n := range nodes {
1219                 hmp := missinggo.SplitHostMaybePort(n)
1220                 ip := net.ParseIP(hmp.Host)
1221                 if ip == nil {
1222                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1223                         continue
1224                 }
1225                 ni := krpc.NodeInfo{
1226                         Addr: &net.UDPAddr{
1227                                 IP:   ip,
1228                                 Port: hmp.Port,
1229                         },
1230                 }
1231                 cl.DHT().AddNode(ni)
1232         }
1233 }
1234
1235 func (cl *Client) banPeerIP(ip net.IP) {
1236         if cl.badPeerIPs == nil {
1237                 cl.badPeerIPs = make(map[string]struct{})
1238         }
1239         cl.badPeerIPs[ip.String()] = struct{}{}
1240 }
1241
1242 func (cl *Client) newConnection(nc net.Conn) (c *connection) {
1243         c = &connection{
1244                 conn: nc,
1245
1246                 Choked:          true,
1247                 PeerChoked:      true,
1248                 PeerMaxRequests: 250,
1249         }
1250         c.writerCond.L = &cl.mu
1251         c.setRW(connStatsReadWriter{nc, &cl.mu, c})
1252         c.r = rateLimitedReader{cl.downloadLimit, c.r}
1253         return
1254 }
1255
1256 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1257         cl.mu.Lock()
1258         defer cl.mu.Unlock()
1259         t := cl.torrent(ih)
1260         if t == nil {
1261                 return
1262         }
1263         t.addPeers([]Peer{{
1264                 IP:     p.IP,
1265                 Port:   p.Port,
1266                 Source: peerSourceDHTAnnouncePeer,
1267         }})
1268 }