]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Track ConnStats with atomics
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "crypto/rand"
8         "encoding/binary"
9         "errors"
10         "fmt"
11         "io"
12         "net"
13         "strconv"
14         "strings"
15         "time"
16
17         "github.com/anacrolix/dht"
18         "github.com/anacrolix/dht/krpc"
19         "github.com/anacrolix/log"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/pproffd"
22         "github.com/anacrolix/missinggo/pubsub"
23         "github.com/anacrolix/missinggo/slices"
24         "github.com/anacrolix/sync"
25         "github.com/davecgh/go-spew/spew"
26         "github.com/dustin/go-humanize"
27         "github.com/google/btree"
28         "golang.org/x/time/rate"
29
30         "github.com/anacrolix/torrent/bencode"
31         "github.com/anacrolix/torrent/iplist"
32         "github.com/anacrolix/torrent/metainfo"
33         "github.com/anacrolix/torrent/mse"
34         pp "github.com/anacrolix/torrent/peer_protocol"
35         "github.com/anacrolix/torrent/storage"
36 )
37
38 // Clients contain zero or more Torrents. A Client manages a blocklist, the
39 // TCP/UDP protocol ports, and DHT as desired.
40 type Client struct {
41         mu     sync.RWMutex
42         event  sync.Cond
43         closed missinggo.Event
44
45         config Config
46         logger *log.Logger
47
48         halfOpenLimit  int
49         peerID         PeerID
50         defaultStorage *storage.Client
51         onClose        []func()
52         conns          []socket
53         dhtServers     []*dht.Server
54         ipBlockList    iplist.Ranger
55         // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
56         extensionBytes peerExtensionBytes
57         uploadLimit    *rate.Limiter
58         downloadLimit  *rate.Limiter
59
60         // Set of addresses that have our client ID. This intentionally will
61         // include ourselves if we end up trying to connect to our own address
62         // through legitimate channels.
63         dopplegangerAddrs map[string]struct{}
64         badPeerIPs        map[string]struct{}
65         torrents          map[metainfo.Hash]*Torrent
66         // An aggregate of stats over all connections.
67         stats ConnStats
68 }
69
70 func (cl *Client) BadPeerIPs() []string {
71         cl.mu.RLock()
72         defer cl.mu.RUnlock()
73         return cl.badPeerIPsLocked()
74 }
75
76 func (cl *Client) badPeerIPsLocked() []string {
77         return slices.FromMapKeys(cl.badPeerIPs).([]string)
78 }
79
80 func (cl *Client) PeerID() PeerID {
81         return cl.peerID
82 }
83
84 type torrentAddr string
85
86 func (torrentAddr) Network() string { return "" }
87
88 func (me torrentAddr) String() string { return string(me) }
89
90 func (cl *Client) LocalPort() (port int) {
91         cl.eachListener(func(l socket) bool {
92                 _port := missinggo.AddrPort(l.Addr())
93                 if _port == 0 {
94                         panic(l)
95                 }
96                 if port == 0 {
97                         port = _port
98                 } else if port != _port {
99                         panic("mismatched ports")
100                 }
101                 return true
102         })
103         return
104 }
105
106 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
107         dhtStats := s.Stats()
108         fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
109         fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
110         fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
111         fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
112 }
113
114 // Writes out a human readable status of the client, such as for writing to a
115 // HTTP status page.
116 func (cl *Client) WriteStatus(_w io.Writer) {
117         cl.mu.Lock()
118         defer cl.mu.Unlock()
119         w := bufio.NewWriter(_w)
120         defer w.Flush()
121         fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
122         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
123         fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
124         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
125         cl.eachDhtServer(func(s *dht.Server) {
126                 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
127                 writeDhtServerStatus(w, s)
128         })
129         spew.Fdump(w, cl.stats)
130         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
131         fmt.Fprintln(w)
132         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
133                 return l.InfoHash().AsString() < r.InfoHash().AsString()
134         }).([]*Torrent) {
135                 if t.name() == "" {
136                         fmt.Fprint(w, "<unknown name>")
137                 } else {
138                         fmt.Fprint(w, t.name())
139                 }
140                 fmt.Fprint(w, "\n")
141                 if t.info != nil {
142                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
143                 } else {
144                         w.WriteString("<missing metainfo>")
145                 }
146                 fmt.Fprint(w, "\n")
147                 t.writeStatus(w)
148                 fmt.Fprintln(w)
149         }
150 }
151
152 const debugLogValue = "debug"
153
154 func (cl *Client) debugLogFilter(m *log.Msg) bool {
155         if !cl.config.Debug {
156                 _, ok := m.Values()[debugLogValue]
157                 return !ok
158         }
159         return true
160 }
161
162 func (cl *Client) initLogger() {
163         cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
164 }
165
166 func (cl *Client) announceKey() int32 {
167         return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
168 }
169
170 func NewClient(cfg *Config) (cl *Client, err error) {
171         if cfg == nil {
172                 cfg = &Config{}
173         }
174         cfg.setDefaults()
175
176         defer func() {
177                 if err != nil {
178                         cl = nil
179                 }
180         }()
181         cl = &Client{
182                 halfOpenLimit:     cfg.HalfOpenConnsPerTorrent,
183                 config:            *cfg,
184                 dopplegangerAddrs: make(map[string]struct{}),
185                 torrents:          make(map[metainfo.Hash]*Torrent),
186         }
187         cl.initLogger()
188         defer func() {
189                 if err == nil {
190                         return
191                 }
192                 cl.Close()
193         }()
194         if cfg.UploadRateLimiter == nil {
195                 cl.uploadLimit = unlimited
196         } else {
197                 cl.uploadLimit = cfg.UploadRateLimiter
198         }
199         if cfg.DownloadRateLimiter == nil {
200                 cl.downloadLimit = unlimited
201         } else {
202                 cl.downloadLimit = cfg.DownloadRateLimiter
203         }
204         cl.extensionBytes = defaultPeerExtensionBytes()
205         cl.event.L = &cl.mu
206         storageImpl := cfg.DefaultStorage
207         if storageImpl == nil {
208                 // We'd use mmap but HFS+ doesn't support sparse files.
209                 storageImpl = storage.NewFile(cfg.DataDir)
210                 cl.onClose = append(cl.onClose, func() {
211                         if err := storageImpl.Close(); err != nil {
212                                 log.Printf("error closing default storage: %s", err)
213                         }
214                 })
215         }
216         cl.defaultStorage = storage.NewClient(storageImpl)
217         if cfg.IPBlocklist != nil {
218                 cl.ipBlockList = cfg.IPBlocklist
219         }
220
221         if cfg.PeerID != "" {
222                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
223         } else {
224                 o := copy(cl.peerID[:], cfg.Bep20)
225                 _, err = rand.Read(cl.peerID[o:])
226                 if err != nil {
227                         panic("error generating peer id")
228                 }
229         }
230
231         cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL)
232         if err != nil {
233                 return
234         }
235         // Check for panics.
236         cl.LocalPort()
237
238         for _, s := range cl.conns {
239                 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
240                         go cl.acceptConnections(s)
241                 }
242         }
243
244         go cl.forwardPort()
245         if !cfg.NoDHT {
246                 for _, s := range cl.conns {
247                         if pc, ok := s.(net.PacketConn); ok {
248                                 ds, err := cl.newDhtServer(pc)
249                                 if err != nil {
250                                         panic(err)
251                                 }
252                                 cl.dhtServers = append(cl.dhtServers, ds)
253                         }
254                 }
255         }
256
257         return
258 }
259
260 func (cl *Client) enabledPeerNetworks() (ns []string) {
261         for _, n := range allPeerNetworks {
262                 if peerNetworkEnabled(n, cl.config) {
263                         ns = append(ns, n)
264                 }
265         }
266         return
267 }
268
269 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
270         cfg := dht.ServerConfig{
271                 IPBlocklist:    cl.ipBlockList,
272                 Conn:           conn,
273                 OnAnnouncePeer: cl.onDHTAnnouncePeer,
274                 PublicIP: func() net.IP {
275                         if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
276                                 return cl.config.PublicIp6
277                         }
278                         return cl.config.PublicIp4
279                 }(),
280                 StartingNodes: cl.config.DhtStartingNodes,
281         }
282         s, err = dht.NewServer(&cfg)
283         if err == nil {
284                 go func() {
285                         if _, err := s.Bootstrap(); err != nil {
286                                 log.Printf("error bootstrapping dht: %s", err)
287                         }
288                 }()
289         }
290         return
291 }
292
293 func firstNonEmptyString(ss ...string) string {
294         for _, s := range ss {
295                 if s != "" {
296                         return s
297                 }
298         }
299         return ""
300 }
301
302 func (cl *Client) Closed() <-chan struct{} {
303         cl.mu.Lock()
304         defer cl.mu.Unlock()
305         return cl.closed.C()
306 }
307
308 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
309         for _, ds := range cl.dhtServers {
310                 f(ds)
311         }
312 }
313
314 func (cl *Client) closeSockets() {
315         cl.eachListener(func(l socket) bool {
316                 l.Close()
317                 return true
318         })
319         cl.conns = nil
320 }
321
322 // Stops the client. All connections to peers are closed and all activity will
323 // come to a halt.
324 func (cl *Client) Close() {
325         cl.mu.Lock()
326         defer cl.mu.Unlock()
327         cl.closed.Set()
328         cl.eachDhtServer(func(s *dht.Server) { s.Close() })
329         cl.closeSockets()
330         for _, t := range cl.torrents {
331                 t.close()
332         }
333         for _, f := range cl.onClose {
334                 f()
335         }
336         cl.event.Broadcast()
337 }
338
339 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
340         if cl.ipBlockList == nil {
341                 return
342         }
343         return cl.ipBlockList.Lookup(ip)
344 }
345
346 func (cl *Client) ipIsBlocked(ip net.IP) bool {
347         _, blocked := cl.ipBlockRange(ip)
348         return blocked
349 }
350
351 func (cl *Client) waitAccept() {
352         for {
353                 for _, t := range cl.torrents {
354                         if t.wantConns() {
355                                 return
356                         }
357                 }
358                 if cl.closed.IsSet() {
359                         return
360                 }
361                 cl.event.Wait()
362         }
363 }
364
365 func (cl *Client) rejectAccepted(conn net.Conn) bool {
366         ra := conn.RemoteAddr()
367         rip := missinggo.AddrIP(ra)
368         if cl.config.DisableIPv4Peers && rip.To4() != nil {
369                 return true
370         }
371         if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
372                 return true
373         }
374         if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
375                 return true
376         }
377         return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
378 }
379
380 func (cl *Client) acceptConnections(l net.Listener) {
381         for {
382                 conn, err := l.Accept()
383                 conn = pproffd.WrapNetConn(conn)
384                 cl.mu.RLock()
385                 closed := cl.closed.IsSet()
386                 reject := false
387                 if conn != nil {
388                         reject = cl.rejectAccepted(conn)
389                 }
390                 cl.mu.RUnlock()
391                 if closed {
392                         if conn != nil {
393                                 conn.Close()
394                         }
395                         return
396                 }
397                 if err != nil {
398                         log.Print(err)
399                         // I think something harsher should happen here? Our accept
400                         // routine just fucked off.
401                         return
402                 }
403                 go func() {
404                         if reject {
405                                 torrent.Add("rejected accepted connections", 1)
406                                 conn.Close()
407                         } else {
408                                 go cl.incomingConnection(conn)
409                         }
410                         log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
411                         torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
412                         torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
413                         torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
414                 }()
415         }
416 }
417
418 func (cl *Client) incomingConnection(nc net.Conn) {
419         defer nc.Close()
420         if tc, ok := nc.(*net.TCPConn); ok {
421                 tc.SetLinger(0)
422         }
423         c := cl.newConnection(nc, false)
424         c.Discovery = peerSourceIncoming
425         cl.runReceivedConn(c)
426 }
427
428 // Returns a handle to the given torrent, if it's present in the client.
429 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
430         cl.mu.Lock()
431         defer cl.mu.Unlock()
432         t, ok = cl.torrents[ih]
433         return
434 }
435
436 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
437         return cl.torrents[ih]
438 }
439
440 type dialResult struct {
441         Conn net.Conn
442 }
443
444 func countDialResult(err error) {
445         if err == nil {
446                 successfulDials.Add(1)
447         } else {
448                 unsuccessfulDials.Add(1)
449         }
450 }
451
452 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
453         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
454         if ret < minDialTimeout {
455                 ret = minDialTimeout
456         }
457         return
458 }
459
460 // Returns whether an address is known to connect to a client with our own ID.
461 func (cl *Client) dopplegangerAddr(addr string) bool {
462         _, ok := cl.dopplegangerAddrs[addr]
463         return ok
464 }
465
466 func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) {
467         d := net.Dialer{
468                 // Can't bind to the listen address, even though we intend to create an
469                 // endpoint pair that is distinct. Oh well.
470
471                 // LocalAddr: cl.tcpListener.Addr(),
472         }
473         c, err = d.DialContext(ctx, "tcp"+ipNetworkSuffix(!cl.config.DisableIPv4 && !cl.config.DisableIPv4Peers, !cl.config.DisableIPv6), addr)
474         countDialResult(err)
475         if err == nil {
476                 c.(*net.TCPConn).SetLinger(0)
477         }
478         c = pproffd.WrapNetConn(c)
479         return
480 }
481
482 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
483         switch {
484         case allowIpv4 && allowIpv6:
485                 return ""
486         case allowIpv4 && !allowIpv6:
487                 return "4"
488         case !allowIpv4 && allowIpv6:
489                 return "6"
490         default:
491                 panic("unhandled ip network combination")
492         }
493 }
494
495 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
496         return sock.DialContext(ctx, "", addr)
497 }
498
499 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
500
501 func peerNetworkEnabled(network string, cfg Config) bool {
502         c := func(s string) bool {
503                 return strings.Contains(network, s)
504         }
505         if cfg.DisableUTP {
506                 if c("udp") || c("utp") {
507                         return false
508                 }
509         }
510         if cfg.DisableTCP && c("tcp") {
511                 return false
512         }
513         if cfg.DisableIPv6 && c("6") {
514                 return false
515         }
516         return true
517 }
518
519 // Returns a connection over UTP or TCP, whichever is first to connect.
520 func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
521         ctx, cancel := context.WithCancel(ctx)
522         // As soon as we return one connection, cancel the others.
523         defer cancel()
524         left := 0
525         resCh := make(chan dialResult, left)
526         dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
527                 left++
528                 go func() {
529                         c, err := f(ctx, addr)
530                         countDialResult(err)
531                         resCh <- dialResult{c}
532                 }()
533         }
534         func() {
535                 cl.mu.Lock()
536                 defer cl.mu.Unlock()
537                 cl.eachListener(func(s socket) bool {
538                         if peerNetworkEnabled(s.Addr().Network(), cl.config) {
539                                 dial(s.dial)
540                         }
541                         return true
542                 })
543         }()
544         var res dialResult
545         // Wait for a successful connection.
546         for ; left > 0 && res.Conn == nil; left-- {
547                 res = <-resCh
548         }
549         // There are still incompleted dials.
550         go func() {
551                 for ; left > 0; left-- {
552                         conn := (<-resCh).Conn
553                         if conn != nil {
554                                 conn.Close()
555                         }
556                 }
557         }()
558         if res.Conn != nil {
559                 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
560         }
561         return res.Conn
562 }
563
564 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
565         if _, ok := t.halfOpen[addr]; !ok {
566                 panic("invariant broken")
567         }
568         delete(t.halfOpen, addr)
569         t.openNewConns()
570 }
571
572 // Performs initiator handshakes and returns a connection. Returns nil
573 // *connection if no connection for valid reasons.
574 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
575         c = cl.newConnection(nc, true)
576         c.headerEncrypted = encryptHeader
577         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
578         defer cancel()
579         dl, ok := ctx.Deadline()
580         if !ok {
581                 panic(ctx)
582         }
583         err = nc.SetDeadline(dl)
584         if err != nil {
585                 panic(err)
586         }
587         ok, err = cl.initiateHandshakes(c, t)
588         if !ok {
589                 c = nil
590         }
591         return
592 }
593
594 // Returns nil connection and nil error if no connection could be established
595 // for valid reasons.
596 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
597         nc := cl.dialFirst(ctx, addr)
598         if nc == nil {
599                 return
600         }
601         defer func() {
602                 if c == nil || err != nil {
603                         nc.Close()
604                 }
605         }()
606         return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
607 }
608
609 // Returns nil connection and nil error if no connection could be established
610 // for valid reasons.
611 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
612         ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
613         defer cancel()
614         obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
615         c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
616         if err != nil {
617                 return
618         }
619         if c != nil {
620                 torrent.Add("initiated conn with preferred header obfuscation", 1)
621                 return
622         }
623         if cl.config.ForceEncryption {
624                 // We should have just tried with an obfuscated header. A plaintext
625                 // header can't result in an encrypted connection, so we're done.
626                 if !obfuscatedHeaderFirst {
627                         panic(cl.config.EncryptionPolicy)
628                 }
629                 return
630         }
631         // Try again with encryption if we didn't earlier, or without if we did.
632         c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
633         if c != nil {
634                 torrent.Add("initiated conn with fallback header obfuscation", 1)
635         }
636         return
637 }
638
639 // Called to dial out and run a connection. The addr we're given is already
640 // considered half-open.
641 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
642         c, err := cl.establishOutgoingConn(t, addr)
643         cl.mu.Lock()
644         defer cl.mu.Unlock()
645         // Don't release lock between here and addConnection, unless it's for
646         // failure.
647         cl.noLongerHalfOpen(t, addr)
648         if err != nil {
649                 if cl.config.Debug {
650                         log.Printf("error establishing outgoing connection: %s", err)
651                 }
652                 return
653         }
654         if c == nil {
655                 return
656         }
657         defer c.Close()
658         c.Discovery = ps
659         cl.runHandshookConn(c, t)
660 }
661
662 // The port number for incoming peer connections. 0 if the client isn't
663 // listening.
664 func (cl *Client) incomingPeerPort() int {
665         return cl.LocalPort()
666 }
667
668 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
669         if c.headerEncrypted {
670                 var rw io.ReadWriter
671                 rw, c.cryptoMethod, err = mse.InitiateHandshake(
672                         struct {
673                                 io.Reader
674                                 io.Writer
675                         }{c.r, c.w},
676                         t.infoHash[:],
677                         nil,
678                         func() mse.CryptoMethod {
679                                 switch {
680                                 case cl.config.ForceEncryption:
681                                         return mse.CryptoMethodRC4
682                                 case cl.config.DisableEncryption:
683                                         return mse.CryptoMethodPlaintext
684                                 default:
685                                         return mse.AllSupportedCrypto
686                                 }
687                         }(),
688                 )
689                 c.setRW(rw)
690                 if err != nil {
691                         return
692                 }
693         }
694         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
695         if ih != t.infoHash {
696                 ok = false
697         }
698         return
699 }
700
701 // Calls f with any secret keys.
702 func (cl *Client) forSkeys(f func([]byte) bool) {
703         cl.mu.Lock()
704         defer cl.mu.Unlock()
705         for ih := range cl.torrents {
706                 if !f(ih[:]) {
707                         break
708                 }
709         }
710 }
711
712 // Do encryption and bittorrent handshakes as receiver.
713 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
714         var rw io.ReadWriter
715         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
716         c.setRW(rw)
717         if err != nil {
718                 if err == mse.ErrNoSecretKeyMatch {
719                         err = nil
720                 }
721                 return
722         }
723         if cl.config.ForceEncryption && !c.headerEncrypted {
724                 err = errors.New("connection not encrypted")
725                 return
726         }
727         ih, ok, err := cl.connBTHandshake(c, nil)
728         if err != nil {
729                 err = fmt.Errorf("error during bt handshake: %s", err)
730                 return
731         }
732         if !ok {
733                 return
734         }
735         cl.mu.Lock()
736         t = cl.torrents[ih]
737         cl.mu.Unlock()
738         return
739 }
740
741 // Returns !ok if handshake failed for valid reasons.
742 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
743         res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
744         if err != nil || !ok {
745                 return
746         }
747         ret = res.Hash
748         c.PeerExtensionBytes = res.peerExtensionBytes
749         c.PeerID = res.PeerID
750         c.completedHandshake = time.Now()
751         return
752 }
753
754 func (cl *Client) runReceivedConn(c *connection) {
755         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
756         if err != nil {
757                 panic(err)
758         }
759         t, err := cl.receiveHandshakes(c)
760         if err != nil {
761                 if cl.config.Debug {
762                         log.Printf("error receiving handshakes: %s", err)
763                 }
764                 return
765         }
766         if t == nil {
767                 return
768         }
769         cl.mu.Lock()
770         defer cl.mu.Unlock()
771         cl.runHandshookConn(c, t)
772 }
773
774 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
775         c.setTorrent(t)
776         if c.PeerID == cl.peerID {
777                 if c.outgoing {
778                         connsToSelf.Add(1)
779                         addr := c.conn.RemoteAddr().String()
780                         cl.dopplegangerAddrs[addr] = struct{}{}
781                 } else {
782                         // Because the remote address is not necessarily the same as its
783                         // client's torrent listen address, we won't record the remote address
784                         // as a doppleganger. Instead, the initiator can record *us* as the
785                         // doppleganger.
786                 }
787                 return
788         }
789         c.conn.SetWriteDeadline(time.Time{})
790         c.r = deadlineReader{c.conn, c.r}
791         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
792         if connIsIpv6(c.conn) {
793                 torrent.Add("completed handshake over ipv6", 1)
794         }
795         if err := t.addConnection(c); err != nil {
796                 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
797                 return
798         }
799         defer t.dropConnection(c)
800         go c.writer(time.Minute)
801         cl.sendInitialMessages(c, t)
802         err := c.mainReadLoop()
803         if err != nil && cl.config.Debug {
804                 log.Printf("error during connection main read loop: %s", err)
805         }
806 }
807
808 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
809         func() {
810                 if conn.fastEnabled() {
811                         if torrent.haveAllPieces() {
812                                 conn.Post(pp.Message{Type: pp.HaveAll})
813                                 conn.sentHaves.AddRange(0, conn.t.NumPieces())
814                                 return
815                         } else if !torrent.haveAnyPieces() {
816                                 conn.Post(pp.Message{Type: pp.HaveNone})
817                                 conn.sentHaves.Clear()
818                                 return
819                         }
820                 }
821                 conn.PostBitfield()
822         }()
823         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
824                 conn.Post(pp.Message{
825                         Type:       pp.Extended,
826                         ExtendedID: pp.HandshakeExtendedID,
827                         ExtendedPayload: func() []byte {
828                                 d := map[string]interface{}{
829                                         "m": func() (ret map[string]int) {
830                                                 ret = make(map[string]int, 2)
831                                                 ret["ut_metadata"] = metadataExtendedId
832                                                 if !cl.config.DisablePEX {
833                                                         ret["ut_pex"] = pexExtendedId
834                                                 }
835                                                 return
836                                         }(),
837                                         "v": cl.config.ExtendedHandshakeClientVersion,
838                                         // No upload queue is implemented yet.
839                                         "reqq": 64,
840                                 }
841                                 if !cl.config.DisableEncryption {
842                                         d["e"] = 1
843                                 }
844                                 if torrent.metadataSizeKnown() {
845                                         d["metadata_size"] = torrent.metadataSize()
846                                 }
847                                 if p := cl.incomingPeerPort(); p != 0 {
848                                         d["p"] = p
849                                 }
850                                 yourip, err := addrCompactIP(conn.remoteAddr())
851                                 if err != nil {
852                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
853                                 } else {
854                                         d["yourip"] = yourip
855                                 }
856                                 // log.Printf("sending %v", d)
857                                 b, err := bencode.Marshal(d)
858                                 if err != nil {
859                                         panic(err)
860                                 }
861                                 return b
862                         }(),
863                 })
864         }
865         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
866                 conn.Post(pp.Message{
867                         Type: pp.Port,
868                         Port: cl.dhtPort(),
869                 })
870         }
871 }
872
873 func (cl *Client) dhtPort() (ret uint16) {
874         cl.eachDhtServer(func(s *dht.Server) {
875                 ret = uint16(missinggo.AddrPort(s.Addr()))
876         })
877         return
878 }
879
880 func (cl *Client) haveDhtServer() (ret bool) {
881         cl.eachDhtServer(func(_ *dht.Server) {
882                 ret = true
883         })
884         return
885 }
886
887 // Process incoming ut_metadata message.
888 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
889         var d map[string]int
890         err := bencode.Unmarshal(payload, &d)
891         if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
892         } else if err != nil {
893                 return fmt.Errorf("error unmarshalling bencode: %s", err)
894         }
895         msgType, ok := d["msg_type"]
896         if !ok {
897                 return errors.New("missing msg_type field")
898         }
899         piece := d["piece"]
900         switch msgType {
901         case pp.DataMetadataExtensionMsgType:
902                 if !c.requestedMetadataPiece(piece) {
903                         return fmt.Errorf("got unexpected piece %d", piece)
904                 }
905                 c.metadataRequests[piece] = false
906                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
907                 if begin < 0 || begin >= len(payload) {
908                         return fmt.Errorf("data has bad offset in payload: %d", begin)
909                 }
910                 t.saveMetadataPiece(piece, payload[begin:])
911                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
912                 c.lastUsefulChunkReceived = time.Now()
913                 return t.maybeCompleteMetadata()
914         case pp.RequestMetadataExtensionMsgType:
915                 if !t.haveMetadataPiece(piece) {
916                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
917                         return nil
918                 }
919                 start := (1 << 14) * piece
920                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
921                 return nil
922         case pp.RejectMetadataExtensionMsgType:
923                 return nil
924         default:
925                 return errors.New("unknown msg_type value")
926         }
927 }
928
929 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
930         if port == 0 {
931                 return true
932         }
933         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
934                 return true
935         }
936         if _, ok := cl.ipBlockRange(ip); ok {
937                 return true
938         }
939         if _, ok := cl.badPeerIPs[ip.String()]; ok {
940                 return true
941         }
942         return false
943 }
944
945 // Return a Torrent ready for insertion into a Client.
946 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
947         // use provided storage, if provided
948         storageClient := cl.defaultStorage
949         if specStorage != nil {
950                 storageClient = storage.NewClient(specStorage)
951         }
952
953         t = &Torrent{
954                 cl:       cl,
955                 infoHash: ih,
956                 peers: prioritizedPeers{
957                         om: btree.New(32),
958                         getPrio: func(p Peer) peerPriority {
959                                 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
960                         },
961                 },
962                 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
963
964                 halfOpen:          make(map[string]Peer),
965                 pieceStateChanges: pubsub.NewPubSub(),
966
967                 storageOpener:       storageClient,
968                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
969
970                 networkingEnabled: true,
971                 requestStrategy:   2,
972                 metadataChanged: sync.Cond{
973                         L: &cl.mu,
974                 },
975         }
976         t.logger = cl.logger.Clone().AddValue(t)
977         t.setChunkSize(defaultChunkSize)
978         return
979 }
980
981 // A file-like handle to some torrent data resource.
982 type Handle interface {
983         io.Reader
984         io.Seeker
985         io.Closer
986         io.ReaderAt
987 }
988
989 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
990         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
991 }
992
993 // Adds a torrent by InfoHash with a custom Storage implementation.
994 // If the torrent already exists then this Storage is ignored and the
995 // existing torrent returned with `new` set to `false`
996 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
997         cl.mu.Lock()
998         defer cl.mu.Unlock()
999         t, ok := cl.torrents[infoHash]
1000         if ok {
1001                 return
1002         }
1003         new = true
1004         t = cl.newTorrent(infoHash, specStorage)
1005         cl.eachDhtServer(func(s *dht.Server) {
1006                 go t.dhtAnnouncer(s)
1007         })
1008         cl.torrents[infoHash] = t
1009         t.updateWantPeersEvent()
1010         // Tickle Client.waitAccept, new torrent may want conns.
1011         cl.event.Broadcast()
1012         return
1013 }
1014
1015 // Add or merge a torrent spec. If the torrent is already present, the
1016 // trackers will be merged with the existing ones. If the Info isn't yet
1017 // known, it will be set. The display name is replaced if the new spec
1018 // provides one. Returns new if the torrent wasn't already in the client.
1019 // Note that any `Storage` defined on the spec will be ignored if the
1020 // torrent is already present (i.e. `new` return value is `true`)
1021 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1022         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1023         if spec.DisplayName != "" {
1024                 t.SetDisplayName(spec.DisplayName)
1025         }
1026         if spec.InfoBytes != nil {
1027                 err = t.SetInfoBytes(spec.InfoBytes)
1028                 if err != nil {
1029                         return
1030                 }
1031         }
1032         cl.mu.Lock()
1033         defer cl.mu.Unlock()
1034         if spec.ChunkSize != 0 {
1035                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1036         }
1037         t.addTrackers(spec.Trackers)
1038         t.maybeNewConns()
1039         return
1040 }
1041
1042 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1043         t, ok := cl.torrents[infoHash]
1044         if !ok {
1045                 err = fmt.Errorf("no such torrent")
1046                 return
1047         }
1048         err = t.close()
1049         if err != nil {
1050                 panic(err)
1051         }
1052         delete(cl.torrents, infoHash)
1053         return
1054 }
1055
1056 func (cl *Client) allTorrentsCompleted() bool {
1057         for _, t := range cl.torrents {
1058                 if !t.haveInfo() {
1059                         return false
1060                 }
1061                 if !t.haveAllPieces() {
1062                         return false
1063                 }
1064         }
1065         return true
1066 }
1067
1068 // Returns true when all torrents are completely downloaded and false if the
1069 // client is stopped before that.
1070 func (cl *Client) WaitAll() bool {
1071         cl.mu.Lock()
1072         defer cl.mu.Unlock()
1073         for !cl.allTorrentsCompleted() {
1074                 if cl.closed.IsSet() {
1075                         return false
1076                 }
1077                 cl.event.Wait()
1078         }
1079         return true
1080 }
1081
1082 // Returns handles to all the torrents loaded in the Client.
1083 func (cl *Client) Torrents() []*Torrent {
1084         cl.mu.Lock()
1085         defer cl.mu.Unlock()
1086         return cl.torrentsAsSlice()
1087 }
1088
1089 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1090         for _, t := range cl.torrents {
1091                 ret = append(ret, t)
1092         }
1093         return
1094 }
1095
1096 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1097         spec, err := TorrentSpecFromMagnetURI(uri)
1098         if err != nil {
1099                 return
1100         }
1101         T, _, err = cl.AddTorrentSpec(spec)
1102         return
1103 }
1104
1105 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1106         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1107         var ss []string
1108         slices.MakeInto(&ss, mi.Nodes)
1109         cl.AddDHTNodes(ss)
1110         return
1111 }
1112
1113 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1114         mi, err := metainfo.LoadFromFile(filename)
1115         if err != nil {
1116                 return
1117         }
1118         return cl.AddTorrent(mi)
1119 }
1120
1121 func (cl *Client) DhtServers() []*dht.Server {
1122         return cl.dhtServers
1123 }
1124
1125 func (cl *Client) AddDHTNodes(nodes []string) {
1126         for _, n := range nodes {
1127                 hmp := missinggo.SplitHostMaybePort(n)
1128                 ip := net.ParseIP(hmp.Host)
1129                 if ip == nil {
1130                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1131                         continue
1132                 }
1133                 ni := krpc.NodeInfo{
1134                         Addr: krpc.NodeAddr{
1135                                 IP:   ip,
1136                                 Port: hmp.Port,
1137                         },
1138                 }
1139                 cl.eachDhtServer(func(s *dht.Server) {
1140                         s.AddNode(ni)
1141                 })
1142         }
1143 }
1144
1145 func (cl *Client) banPeerIP(ip net.IP) {
1146         if cl.badPeerIPs == nil {
1147                 cl.badPeerIPs = make(map[string]struct{})
1148         }
1149         cl.badPeerIPs[ip.String()] = struct{}{}
1150 }
1151
1152 func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
1153         c = &connection{
1154                 conn:            nc,
1155                 outgoing:        outgoing,
1156                 Choked:          true,
1157                 PeerChoked:      true,
1158                 PeerMaxRequests: 250,
1159                 writeBuffer:     new(bytes.Buffer),
1160         }
1161         c.writerCond.L = &cl.mu
1162         c.setRW(connStatsReadWriter{nc, c})
1163         c.r = &rateLimitedReader{
1164                 l: cl.downloadLimit,
1165                 r: c.r,
1166         }
1167         return
1168 }
1169
1170 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1171         cl.mu.Lock()
1172         defer cl.mu.Unlock()
1173         t := cl.torrent(ih)
1174         if t == nil {
1175                 return
1176         }
1177         t.addPeers([]Peer{{
1178                 IP:     p.IP,
1179                 Port:   p.Port,
1180                 Source: peerSourceDHTAnnouncePeer,
1181         }})
1182 }
1183
1184 func firstNotNil(ips ...net.IP) net.IP {
1185         for _, ip := range ips {
1186                 if ip != nil {
1187                         return ip
1188                 }
1189         }
1190         return nil
1191 }
1192
1193 func (cl *Client) eachListener(f func(socket) bool) {
1194         for _, s := range cl.conns {
1195                 if !f(s) {
1196                         break
1197                 }
1198         }
1199 }
1200
1201 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1202         cl.eachListener(func(l socket) bool {
1203                 ret = l
1204                 return !f(l)
1205         })
1206         return
1207 }
1208
1209 func (cl *Client) publicIp(peer net.IP) net.IP {
1210         // TODO: Use BEP 10 to determine how peers are seeing us.
1211         if peer.To4() != nil {
1212                 return firstNotNil(
1213                         cl.config.PublicIp4,
1214                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1215                 )
1216         } else {
1217                 return firstNotNil(
1218                         cl.config.PublicIp6,
1219                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1220                 )
1221         }
1222 }
1223
1224 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1225         return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1226                 return f(missinggo.AddrIP(l.Addr()))
1227         }).Addr())
1228 }
1229
1230 // Our IP as a peer should see it.
1231 func (cl *Client) publicAddr(peer net.IP) ipPort {
1232         return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1233 }
1234
1235 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1236         cl.eachListener(func(l socket) bool {
1237                 ret = append(ret, l.Addr())
1238                 return true
1239         })
1240         return
1241 }