]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
491424314c910742c795ac449cb080f5aca1f3eb
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "crypto/rand"
8         "encoding/binary"
9         "errors"
10         "fmt"
11         "io"
12         "net"
13         "strconv"
14         "strings"
15         "time"
16
17         "github.com/anacrolix/missinggo/perf"
18
19         "github.com/anacrolix/dht"
20         "github.com/anacrolix/dht/krpc"
21         "github.com/anacrolix/log"
22         "github.com/anacrolix/missinggo"
23         "github.com/anacrolix/missinggo/pproffd"
24         "github.com/anacrolix/missinggo/pubsub"
25         "github.com/anacrolix/missinggo/slices"
26         "github.com/anacrolix/sync"
27         "github.com/davecgh/go-spew/spew"
28         "github.com/dustin/go-humanize"
29         "github.com/google/btree"
30         "golang.org/x/time/rate"
31
32         "github.com/anacrolix/torrent/bencode"
33         "github.com/anacrolix/torrent/iplist"
34         "github.com/anacrolix/torrent/metainfo"
35         "github.com/anacrolix/torrent/mse"
36         pp "github.com/anacrolix/torrent/peer_protocol"
37         "github.com/anacrolix/torrent/storage"
38 )
39
40 // Clients contain zero or more Torrents. A Client manages a blocklist, the
41 // TCP/UDP protocol ports, and DHT as desired.
42 type Client struct {
43         mu     sync.RWMutex
44         event  sync.Cond
45         closed missinggo.Event
46
47         config Config
48         logger *log.Logger
49
50         halfOpenLimit  int
51         peerID         PeerID
52         defaultStorage *storage.Client
53         onClose        []func()
54         conns          []socket
55         dhtServers     []*dht.Server
56         ipBlockList    iplist.Ranger
57         // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
58         extensionBytes peerExtensionBytes
59         uploadLimit    *rate.Limiter
60         downloadLimit  *rate.Limiter
61
62         // Set of addresses that have our client ID. This intentionally will
63         // include ourselves if we end up trying to connect to our own address
64         // through legitimate channels.
65         dopplegangerAddrs map[string]struct{}
66         badPeerIPs        map[string]struct{}
67         torrents          map[metainfo.Hash]*Torrent
68         // An aggregate of stats over all connections.
69         stats ConnStats
70
71         acceptLimiter map[ipStr]int
72 }
73
74 type ipStr string
75
76 func (cl *Client) BadPeerIPs() []string {
77         cl.mu.RLock()
78         defer cl.mu.RUnlock()
79         return cl.badPeerIPsLocked()
80 }
81
82 func (cl *Client) badPeerIPsLocked() []string {
83         return slices.FromMapKeys(cl.badPeerIPs).([]string)
84 }
85
86 func (cl *Client) PeerID() PeerID {
87         return cl.peerID
88 }
89
90 type torrentAddr string
91
92 func (torrentAddr) Network() string { return "" }
93
94 func (me torrentAddr) String() string { return string(me) }
95
96 func (cl *Client) LocalPort() (port int) {
97         cl.eachListener(func(l socket) bool {
98                 _port := missinggo.AddrPort(l.Addr())
99                 if _port == 0 {
100                         panic(l)
101                 }
102                 if port == 0 {
103                         port = _port
104                 } else if port != _port {
105                         panic("mismatched ports")
106                 }
107                 return true
108         })
109         return
110 }
111
112 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
113         dhtStats := s.Stats()
114         fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
115         fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
116         fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
117         fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
118 }
119
120 // Writes out a human readable status of the client, such as for writing to a
121 // HTTP status page.
122 func (cl *Client) WriteStatus(_w io.Writer) {
123         cl.mu.Lock()
124         defer cl.mu.Unlock()
125         w := bufio.NewWriter(_w)
126         defer w.Flush()
127         fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
128         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
129         fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
130         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
131         cl.eachDhtServer(func(s *dht.Server) {
132                 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
133                 writeDhtServerStatus(w, s)
134         })
135         spew.Fdump(w, cl.stats)
136         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
137         fmt.Fprintln(w)
138         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
139                 return l.InfoHash().AsString() < r.InfoHash().AsString()
140         }).([]*Torrent) {
141                 if t.name() == "" {
142                         fmt.Fprint(w, "<unknown name>")
143                 } else {
144                         fmt.Fprint(w, t.name())
145                 }
146                 fmt.Fprint(w, "\n")
147                 if t.info != nil {
148                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
149                 } else {
150                         w.WriteString("<missing metainfo>")
151                 }
152                 fmt.Fprint(w, "\n")
153                 t.writeStatus(w)
154                 fmt.Fprintln(w)
155         }
156 }
157
158 const debugLogValue = "debug"
159
160 func (cl *Client) debugLogFilter(m *log.Msg) bool {
161         if !cl.config.Debug {
162                 _, ok := m.Values()[debugLogValue]
163                 return !ok
164         }
165         return true
166 }
167
168 func (cl *Client) initLogger() {
169         cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
170 }
171
172 func (cl *Client) announceKey() int32 {
173         return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
174 }
175
176 func NewClient(cfg *Config) (cl *Client, err error) {
177         if cfg == nil {
178                 cfg = &Config{}
179         }
180         cfg.setDefaults()
181
182         defer func() {
183                 if err != nil {
184                         cl = nil
185                 }
186         }()
187         cl = &Client{
188                 halfOpenLimit:     cfg.HalfOpenConnsPerTorrent,
189                 config:            *cfg,
190                 dopplegangerAddrs: make(map[string]struct{}),
191                 torrents:          make(map[metainfo.Hash]*Torrent),
192         }
193         go cl.acceptLimitClearer()
194         cl.initLogger()
195         defer func() {
196                 if err == nil {
197                         return
198                 }
199                 cl.Close()
200         }()
201         if cfg.UploadRateLimiter == nil {
202                 cl.uploadLimit = unlimited
203         } else {
204                 cl.uploadLimit = cfg.UploadRateLimiter
205         }
206         if cfg.DownloadRateLimiter == nil {
207                 cl.downloadLimit = unlimited
208         } else {
209                 cl.downloadLimit = cfg.DownloadRateLimiter
210         }
211         cl.extensionBytes = defaultPeerExtensionBytes()
212         cl.event.L = &cl.mu
213         storageImpl := cfg.DefaultStorage
214         if storageImpl == nil {
215                 // We'd use mmap but HFS+ doesn't support sparse files.
216                 storageImpl = storage.NewFile(cfg.DataDir)
217                 cl.onClose = append(cl.onClose, func() {
218                         if err := storageImpl.Close(); err != nil {
219                                 log.Printf("error closing default storage: %s", err)
220                         }
221                 })
222         }
223         cl.defaultStorage = storage.NewClient(storageImpl)
224         if cfg.IPBlocklist != nil {
225                 cl.ipBlockList = cfg.IPBlocklist
226         }
227
228         if cfg.PeerID != "" {
229                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
230         } else {
231                 o := copy(cl.peerID[:], cfg.Bep20)
232                 _, err = rand.Read(cl.peerID[o:])
233                 if err != nil {
234                         panic("error generating peer id")
235                 }
236         }
237
238         cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL)
239         if err != nil {
240                 return
241         }
242         // Check for panics.
243         cl.LocalPort()
244
245         for _, s := range cl.conns {
246                 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
247                         go cl.acceptConnections(s)
248                 }
249         }
250
251         go cl.forwardPort()
252         if !cfg.NoDHT {
253                 for _, s := range cl.conns {
254                         if pc, ok := s.(net.PacketConn); ok {
255                                 ds, err := cl.newDhtServer(pc)
256                                 if err != nil {
257                                         panic(err)
258                                 }
259                                 cl.dhtServers = append(cl.dhtServers, ds)
260                         }
261                 }
262         }
263
264         return
265 }
266
267 func (cl *Client) enabledPeerNetworks() (ns []string) {
268         for _, n := range allPeerNetworks {
269                 if peerNetworkEnabled(n, cl.config) {
270                         ns = append(ns, n)
271                 }
272         }
273         return
274 }
275
276 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
277         cfg := dht.ServerConfig{
278                 IPBlocklist:    cl.ipBlockList,
279                 Conn:           conn,
280                 OnAnnouncePeer: cl.onDHTAnnouncePeer,
281                 PublicIP: func() net.IP {
282                         if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
283                                 return cl.config.PublicIp6
284                         }
285                         return cl.config.PublicIp4
286                 }(),
287                 StartingNodes: cl.config.DhtStartingNodes,
288         }
289         s, err = dht.NewServer(&cfg)
290         if err == nil {
291                 go func() {
292                         if _, err := s.Bootstrap(); err != nil {
293                                 log.Printf("error bootstrapping dht: %s", err)
294                         }
295                 }()
296         }
297         return
298 }
299
300 func firstNonEmptyString(ss ...string) string {
301         for _, s := range ss {
302                 if s != "" {
303                         return s
304                 }
305         }
306         return ""
307 }
308
309 func (cl *Client) Closed() <-chan struct{} {
310         cl.mu.Lock()
311         defer cl.mu.Unlock()
312         return cl.closed.C()
313 }
314
315 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
316         for _, ds := range cl.dhtServers {
317                 f(ds)
318         }
319 }
320
321 func (cl *Client) closeSockets() {
322         cl.eachListener(func(l socket) bool {
323                 l.Close()
324                 return true
325         })
326         cl.conns = nil
327 }
328
329 // Stops the client. All connections to peers are closed and all activity will
330 // come to a halt.
331 func (cl *Client) Close() {
332         cl.mu.Lock()
333         defer cl.mu.Unlock()
334         cl.closed.Set()
335         cl.eachDhtServer(func(s *dht.Server) { s.Close() })
336         cl.closeSockets()
337         for _, t := range cl.torrents {
338                 t.close()
339         }
340         for _, f := range cl.onClose {
341                 f()
342         }
343         cl.event.Broadcast()
344 }
345
346 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
347         if cl.ipBlockList == nil {
348                 return
349         }
350         return cl.ipBlockList.Lookup(ip)
351 }
352
353 func (cl *Client) ipIsBlocked(ip net.IP) bool {
354         _, blocked := cl.ipBlockRange(ip)
355         return blocked
356 }
357
358 func (cl *Client) waitAccept() {
359         for {
360                 for _, t := range cl.torrents {
361                         if t.wantConns() {
362                                 return
363                         }
364                 }
365                 if cl.closed.IsSet() {
366                         return
367                 }
368                 cl.event.Wait()
369         }
370 }
371
372 func (cl *Client) rejectAccepted(conn net.Conn) bool {
373         ra := conn.RemoteAddr()
374         rip := missinggo.AddrIP(ra)
375         if cl.config.DisableIPv4Peers && rip.To4() != nil {
376                 return true
377         }
378         if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
379                 return true
380         }
381         if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
382                 return true
383         }
384         if cl.rateLimitAccept(rip) {
385                 return true
386         }
387         return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
388 }
389
390 func (cl *Client) acceptConnections(l net.Listener) {
391         for {
392                 conn, err := l.Accept()
393                 conn = pproffd.WrapNetConn(conn)
394                 cl.mu.RLock()
395                 closed := cl.closed.IsSet()
396                 reject := false
397                 if conn != nil {
398                         reject = cl.rejectAccepted(conn)
399                 }
400                 cl.mu.RUnlock()
401                 if closed {
402                         if conn != nil {
403                                 conn.Close()
404                         }
405                         return
406                 }
407                 if err != nil {
408                         log.Print(err)
409                         // I think something harsher should happen here? Our accept
410                         // routine just fucked off.
411                         return
412                 }
413                 go func() {
414                         if reject {
415                                 torrent.Add("rejected accepted connections", 1)
416                                 conn.Close()
417                         } else {
418                                 go cl.incomingConnection(conn)
419                         }
420                         log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
421                         torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
422                         torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
423                         torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
424                 }()
425         }
426 }
427
428 func (cl *Client) incomingConnection(nc net.Conn) {
429         defer nc.Close()
430         if tc, ok := nc.(*net.TCPConn); ok {
431                 tc.SetLinger(0)
432         }
433         c := cl.newConnection(nc, false)
434         c.Discovery = peerSourceIncoming
435         cl.runReceivedConn(c)
436 }
437
438 // Returns a handle to the given torrent, if it's present in the client.
439 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
440         cl.mu.Lock()
441         defer cl.mu.Unlock()
442         t, ok = cl.torrents[ih]
443         return
444 }
445
446 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
447         return cl.torrents[ih]
448 }
449
450 type dialResult struct {
451         Conn net.Conn
452 }
453
454 func countDialResult(err error) {
455         if err == nil {
456                 torrent.Add("successful dials", 1)
457         } else {
458                 torrent.Add("unsuccessful dials", 1)
459         }
460 }
461
462 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
463         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
464         if ret < minDialTimeout {
465                 ret = minDialTimeout
466         }
467         return
468 }
469
470 // Returns whether an address is known to connect to a client with our own ID.
471 func (cl *Client) dopplegangerAddr(addr string) bool {
472         _, ok := cl.dopplegangerAddrs[addr]
473         return ok
474 }
475
476 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
477         switch {
478         case allowIpv4 && allowIpv6:
479                 return ""
480         case allowIpv4 && !allowIpv6:
481                 return "4"
482         case !allowIpv4 && allowIpv6:
483                 return "6"
484         default:
485                 panic("unhandled ip network combination")
486         }
487 }
488
489 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
490         return sock.DialContext(ctx, "", addr)
491 }
492
493 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
494
495 func peerNetworkEnabled(network string, cfg Config) bool {
496         c := func(s string) bool {
497                 return strings.Contains(network, s)
498         }
499         if cfg.DisableUTP {
500                 if c("udp") || c("utp") {
501                         return false
502                 }
503         }
504         if cfg.DisableTCP && c("tcp") {
505                 return false
506         }
507         if cfg.DisableIPv6 && c("6") {
508                 return false
509         }
510         return true
511 }
512
513 // Returns a connection over UTP or TCP, whichever is first to connect.
514 func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
515         ctx, cancel := context.WithCancel(ctx)
516         // As soon as we return one connection, cancel the others.
517         defer cancel()
518         left := 0
519         resCh := make(chan dialResult, left)
520         dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
521                 left++
522                 go func() {
523                         c, err := f(ctx, addr)
524                         // This is a bit optimistic, but it looks non-trivial to thread
525                         // this through the proxy code. Set it now in case we close the
526                         // connection forthwith.
527                         if tc, ok := c.(*net.TCPConn); ok {
528                                 tc.SetLinger(0)
529                         }
530                         countDialResult(err)
531                         resCh <- dialResult{c}
532                 }()
533         }
534         func() {
535                 cl.mu.Lock()
536                 defer cl.mu.Unlock()
537                 cl.eachListener(func(s socket) bool {
538                         if peerNetworkEnabled(s.Addr().Network(), cl.config) {
539                                 dial(s.dial)
540                         }
541                         return true
542                 })
543         }()
544         var res dialResult
545         // Wait for a successful connection.
546         func() {
547                 defer perf.ScopeTimer()()
548                 for ; left > 0 && res.Conn == nil; left-- {
549                         res = <-resCh
550                 }
551         }()
552         // There are still incompleted dials.
553         go func() {
554                 for ; left > 0; left-- {
555                         conn := (<-resCh).Conn
556                         if conn != nil {
557                                 conn.Close()
558                         }
559                 }
560         }()
561         if res.Conn != nil {
562                 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
563         }
564         return res.Conn
565 }
566
567 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
568         if _, ok := t.halfOpen[addr]; !ok {
569                 panic("invariant broken")
570         }
571         delete(t.halfOpen, addr)
572         t.openNewConns()
573 }
574
575 // Performs initiator handshakes and returns a connection. Returns nil
576 // *connection if no connection for valid reasons.
577 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
578         c = cl.newConnection(nc, true)
579         c.headerEncrypted = encryptHeader
580         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
581         defer cancel()
582         dl, ok := ctx.Deadline()
583         if !ok {
584                 panic(ctx)
585         }
586         err = nc.SetDeadline(dl)
587         if err != nil {
588                 panic(err)
589         }
590         ok, err = cl.initiateHandshakes(c, t)
591         if !ok {
592                 c = nil
593         }
594         return
595 }
596
597 // Returns nil connection and nil error if no connection could be established
598 // for valid reasons.
599 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
600         nc := cl.dialFirst(ctx, addr)
601         if nc == nil {
602                 return
603         }
604         defer func() {
605                 if c == nil || err != nil {
606                         nc.Close()
607                 }
608         }()
609         return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
610 }
611
612 // Returns nil connection and nil error if no connection could be established
613 // for valid reasons.
614 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
615         ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
616         defer cancel()
617         obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
618         c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
619         if err != nil {
620                 return
621         }
622         if c != nil {
623                 torrent.Add("initiated conn with preferred header obfuscation", 1)
624                 return
625         }
626         if cl.config.ForceEncryption {
627                 // We should have just tried with an obfuscated header. A plaintext
628                 // header can't result in an encrypted connection, so we're done.
629                 if !obfuscatedHeaderFirst {
630                         panic(cl.config.EncryptionPolicy)
631                 }
632                 return
633         }
634         // Try again with encryption if we didn't earlier, or without if we did.
635         c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
636         if c != nil {
637                 torrent.Add("initiated conn with fallback header obfuscation", 1)
638         }
639         return
640 }
641
642 // Called to dial out and run a connection. The addr we're given is already
643 // considered half-open.
644 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
645         c, err := cl.establishOutgoingConn(t, addr)
646         cl.mu.Lock()
647         defer cl.mu.Unlock()
648         // Don't release lock between here and addConnection, unless it's for
649         // failure.
650         cl.noLongerHalfOpen(t, addr)
651         if err != nil {
652                 if cl.config.Debug {
653                         log.Printf("error establishing outgoing connection: %s", err)
654                 }
655                 return
656         }
657         if c == nil {
658                 return
659         }
660         defer c.Close()
661         c.Discovery = ps
662         cl.runHandshookConn(c, t)
663 }
664
665 // The port number for incoming peer connections. 0 if the client isn't
666 // listening.
667 func (cl *Client) incomingPeerPort() int {
668         return cl.LocalPort()
669 }
670
671 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
672         if c.headerEncrypted {
673                 var rw io.ReadWriter
674                 rw, c.cryptoMethod, err = mse.InitiateHandshake(
675                         struct {
676                                 io.Reader
677                                 io.Writer
678                         }{c.r, c.w},
679                         t.infoHash[:],
680                         nil,
681                         func() mse.CryptoMethod {
682                                 switch {
683                                 case cl.config.ForceEncryption:
684                                         return mse.CryptoMethodRC4
685                                 case cl.config.DisableEncryption:
686                                         return mse.CryptoMethodPlaintext
687                                 default:
688                                         return mse.AllSupportedCrypto
689                                 }
690                         }(),
691                 )
692                 c.setRW(rw)
693                 if err != nil {
694                         return
695                 }
696         }
697         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
698         if ih != t.infoHash {
699                 ok = false
700         }
701         return
702 }
703
704 // Calls f with any secret keys.
705 func (cl *Client) forSkeys(f func([]byte) bool) {
706         cl.mu.Lock()
707         defer cl.mu.Unlock()
708         for ih := range cl.torrents {
709                 if !f(ih[:]) {
710                         break
711                 }
712         }
713 }
714
715 // Do encryption and bittorrent handshakes as receiver.
716 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
717         defer perf.ScopeTimerErr(&err)()
718         var rw io.ReadWriter
719         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
720         c.setRW(rw)
721         if err != nil {
722                 if err == mse.ErrNoSecretKeyMatch {
723                         err = nil
724                 }
725                 return
726         }
727         if cl.config.ForceEncryption && !c.headerEncrypted {
728                 err = errors.New("connection not encrypted")
729                 return
730         }
731         ih, ok, err := cl.connBTHandshake(c, nil)
732         if err != nil {
733                 err = fmt.Errorf("error during bt handshake: %s", err)
734                 return
735         }
736         if !ok {
737                 return
738         }
739         cl.mu.Lock()
740         t = cl.torrents[ih]
741         cl.mu.Unlock()
742         return
743 }
744
745 // Returns !ok if handshake failed for valid reasons.
746 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
747         res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
748         if err != nil || !ok {
749                 return
750         }
751         ret = res.Hash
752         c.PeerExtensionBytes = res.peerExtensionBytes
753         c.PeerID = res.PeerID
754         c.completedHandshake = time.Now()
755         return
756 }
757
758 func (cl *Client) runReceivedConn(c *connection) {
759         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
760         if err != nil {
761                 panic(err)
762         }
763         t, err := cl.receiveHandshakes(c)
764         if err != nil {
765                 log.Fmsg(
766                         "error receiving handshakes: %s", err,
767                 ).AddValue(
768                         debugLogValue,
769                 ).Add(
770                         "network", c.remoteAddr().Network(),
771                 ).Log(cl.logger)
772                 torrent.Add("error receiving handshake", 1)
773                 cl.mu.Lock()
774                 cl.onBadAccept(c.remoteAddr())
775                 cl.mu.Unlock()
776                 return
777         }
778         if t == nil {
779                 torrent.Add("received handshake for unloaded torrent", 1)
780                 cl.mu.Lock()
781                 cl.onBadAccept(c.remoteAddr())
782                 cl.mu.Unlock()
783                 return
784         }
785         torrent.Add("received handshake for loaded torrent", 1)
786         cl.mu.Lock()
787         defer cl.mu.Unlock()
788         cl.runHandshookConn(c, t)
789 }
790
791 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
792         c.setTorrent(t)
793         if c.PeerID == cl.peerID {
794                 if c.outgoing {
795                         connsToSelf.Add(1)
796                         addr := c.conn.RemoteAddr().String()
797                         cl.dopplegangerAddrs[addr] = struct{}{}
798                 } else {
799                         // Because the remote address is not necessarily the same as its
800                         // client's torrent listen address, we won't record the remote address
801                         // as a doppleganger. Instead, the initiator can record *us* as the
802                         // doppleganger.
803                 }
804                 return
805         }
806         c.conn.SetWriteDeadline(time.Time{})
807         c.r = deadlineReader{c.conn, c.r}
808         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
809         if connIsIpv6(c.conn) {
810                 torrent.Add("completed handshake over ipv6", 1)
811         }
812         if err := t.addConnection(c); err != nil {
813                 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
814                 return
815         }
816         defer t.dropConnection(c)
817         go c.writer(time.Minute)
818         cl.sendInitialMessages(c, t)
819         err := c.mainReadLoop()
820         if err != nil && cl.config.Debug {
821                 log.Printf("error during connection main read loop: %s", err)
822         }
823 }
824
825 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
826         func() {
827                 if conn.fastEnabled() {
828                         if torrent.haveAllPieces() {
829                                 conn.Post(pp.Message{Type: pp.HaveAll})
830                                 conn.sentHaves.AddRange(0, conn.t.NumPieces())
831                                 return
832                         } else if !torrent.haveAnyPieces() {
833                                 conn.Post(pp.Message{Type: pp.HaveNone})
834                                 conn.sentHaves.Clear()
835                                 return
836                         }
837                 }
838                 conn.PostBitfield()
839         }()
840         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
841                 conn.Post(pp.Message{
842                         Type:       pp.Extended,
843                         ExtendedID: pp.HandshakeExtendedID,
844                         ExtendedPayload: func() []byte {
845                                 d := map[string]interface{}{
846                                         "m": func() (ret map[string]int) {
847                                                 ret = make(map[string]int, 2)
848                                                 ret["ut_metadata"] = metadataExtendedId
849                                                 if !cl.config.DisablePEX {
850                                                         ret["ut_pex"] = pexExtendedId
851                                                 }
852                                                 return
853                                         }(),
854                                         "v": cl.config.ExtendedHandshakeClientVersion,
855                                         // No upload queue is implemented yet.
856                                         "reqq": 64,
857                                 }
858                                 if !cl.config.DisableEncryption {
859                                         d["e"] = 1
860                                 }
861                                 if torrent.metadataSizeKnown() {
862                                         d["metadata_size"] = torrent.metadataSize()
863                                 }
864                                 if p := cl.incomingPeerPort(); p != 0 {
865                                         d["p"] = p
866                                 }
867                                 yourip, err := addrCompactIP(conn.remoteAddr())
868                                 if err != nil {
869                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
870                                 } else {
871                                         d["yourip"] = yourip
872                                 }
873                                 // log.Printf("sending %v", d)
874                                 b, err := bencode.Marshal(d)
875                                 if err != nil {
876                                         panic(err)
877                                 }
878                                 return b
879                         }(),
880                 })
881         }
882         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
883                 conn.Post(pp.Message{
884                         Type: pp.Port,
885                         Port: cl.dhtPort(),
886                 })
887         }
888 }
889
890 func (cl *Client) dhtPort() (ret uint16) {
891         cl.eachDhtServer(func(s *dht.Server) {
892                 ret = uint16(missinggo.AddrPort(s.Addr()))
893         })
894         return
895 }
896
897 func (cl *Client) haveDhtServer() (ret bool) {
898         cl.eachDhtServer(func(_ *dht.Server) {
899                 ret = true
900         })
901         return
902 }
903
904 // Process incoming ut_metadata message.
905 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
906         var d map[string]int
907         err := bencode.Unmarshal(payload, &d)
908         if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
909         } else if err != nil {
910                 return fmt.Errorf("error unmarshalling bencode: %s", err)
911         }
912         msgType, ok := d["msg_type"]
913         if !ok {
914                 return errors.New("missing msg_type field")
915         }
916         piece := d["piece"]
917         switch msgType {
918         case pp.DataMetadataExtensionMsgType:
919                 if !c.requestedMetadataPiece(piece) {
920                         return fmt.Errorf("got unexpected piece %d", piece)
921                 }
922                 c.metadataRequests[piece] = false
923                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
924                 if begin < 0 || begin >= len(payload) {
925                         return fmt.Errorf("data has bad offset in payload: %d", begin)
926                 }
927                 t.saveMetadataPiece(piece, payload[begin:])
928                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
929                 c.lastUsefulChunkReceived = time.Now()
930                 return t.maybeCompleteMetadata()
931         case pp.RequestMetadataExtensionMsgType:
932                 if !t.haveMetadataPiece(piece) {
933                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
934                         return nil
935                 }
936                 start := (1 << 14) * piece
937                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
938                 return nil
939         case pp.RejectMetadataExtensionMsgType:
940                 return nil
941         default:
942                 return errors.New("unknown msg_type value")
943         }
944 }
945
946 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
947         if port == 0 {
948                 return true
949         }
950         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
951                 return true
952         }
953         if _, ok := cl.ipBlockRange(ip); ok {
954                 return true
955         }
956         if _, ok := cl.badPeerIPs[ip.String()]; ok {
957                 return true
958         }
959         return false
960 }
961
962 // Return a Torrent ready for insertion into a Client.
963 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
964         // use provided storage, if provided
965         storageClient := cl.defaultStorage
966         if specStorage != nil {
967                 storageClient = storage.NewClient(specStorage)
968         }
969
970         t = &Torrent{
971                 cl:       cl,
972                 infoHash: ih,
973                 peers: prioritizedPeers{
974                         om: btree.New(32),
975                         getPrio: func(p Peer) peerPriority {
976                                 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
977                         },
978                 },
979                 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
980
981                 halfOpen:          make(map[string]Peer),
982                 pieceStateChanges: pubsub.NewPubSub(),
983
984                 storageOpener:       storageClient,
985                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
986
987                 networkingEnabled: true,
988                 requestStrategy:   2,
989                 metadataChanged: sync.Cond{
990                         L: &cl.mu,
991                 },
992         }
993         t.logger = cl.logger.Clone().AddValue(t)
994         t.setChunkSize(defaultChunkSize)
995         return
996 }
997
998 // A file-like handle to some torrent data resource.
999 type Handle interface {
1000         io.Reader
1001         io.Seeker
1002         io.Closer
1003         io.ReaderAt
1004 }
1005
1006 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1007         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1008 }
1009
1010 // Adds a torrent by InfoHash with a custom Storage implementation.
1011 // If the torrent already exists then this Storage is ignored and the
1012 // existing torrent returned with `new` set to `false`
1013 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1014         cl.mu.Lock()
1015         defer cl.mu.Unlock()
1016         t, ok := cl.torrents[infoHash]
1017         if ok {
1018                 return
1019         }
1020         new = true
1021         t = cl.newTorrent(infoHash, specStorage)
1022         cl.eachDhtServer(func(s *dht.Server) {
1023                 go t.dhtAnnouncer(s)
1024         })
1025         cl.torrents[infoHash] = t
1026         t.updateWantPeersEvent()
1027         // Tickle Client.waitAccept, new torrent may want conns.
1028         cl.event.Broadcast()
1029         return
1030 }
1031
1032 // Add or merge a torrent spec. If the torrent is already present, the
1033 // trackers will be merged with the existing ones. If the Info isn't yet
1034 // known, it will be set. The display name is replaced if the new spec
1035 // provides one. Returns new if the torrent wasn't already in the client.
1036 // Note that any `Storage` defined on the spec will be ignored if the
1037 // torrent is already present (i.e. `new` return value is `true`)
1038 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1039         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1040         if spec.DisplayName != "" {
1041                 t.SetDisplayName(spec.DisplayName)
1042         }
1043         if spec.InfoBytes != nil {
1044                 err = t.SetInfoBytes(spec.InfoBytes)
1045                 if err != nil {
1046                         return
1047                 }
1048         }
1049         cl.mu.Lock()
1050         defer cl.mu.Unlock()
1051         if spec.ChunkSize != 0 {
1052                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1053         }
1054         t.addTrackers(spec.Trackers)
1055         t.maybeNewConns()
1056         return
1057 }
1058
1059 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1060         t, ok := cl.torrents[infoHash]
1061         if !ok {
1062                 err = fmt.Errorf("no such torrent")
1063                 return
1064         }
1065         err = t.close()
1066         if err != nil {
1067                 panic(err)
1068         }
1069         delete(cl.torrents, infoHash)
1070         return
1071 }
1072
1073 func (cl *Client) allTorrentsCompleted() bool {
1074         for _, t := range cl.torrents {
1075                 if !t.haveInfo() {
1076                         return false
1077                 }
1078                 if !t.haveAllPieces() {
1079                         return false
1080                 }
1081         }
1082         return true
1083 }
1084
1085 // Returns true when all torrents are completely downloaded and false if the
1086 // client is stopped before that.
1087 func (cl *Client) WaitAll() bool {
1088         cl.mu.Lock()
1089         defer cl.mu.Unlock()
1090         for !cl.allTorrentsCompleted() {
1091                 if cl.closed.IsSet() {
1092                         return false
1093                 }
1094                 cl.event.Wait()
1095         }
1096         return true
1097 }
1098
1099 // Returns handles to all the torrents loaded in the Client.
1100 func (cl *Client) Torrents() []*Torrent {
1101         cl.mu.Lock()
1102         defer cl.mu.Unlock()
1103         return cl.torrentsAsSlice()
1104 }
1105
1106 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1107         for _, t := range cl.torrents {
1108                 ret = append(ret, t)
1109         }
1110         return
1111 }
1112
1113 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1114         spec, err := TorrentSpecFromMagnetURI(uri)
1115         if err != nil {
1116                 return
1117         }
1118         T, _, err = cl.AddTorrentSpec(spec)
1119         return
1120 }
1121
1122 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1123         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1124         var ss []string
1125         slices.MakeInto(&ss, mi.Nodes)
1126         cl.AddDHTNodes(ss)
1127         return
1128 }
1129
1130 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1131         mi, err := metainfo.LoadFromFile(filename)
1132         if err != nil {
1133                 return
1134         }
1135         return cl.AddTorrent(mi)
1136 }
1137
1138 func (cl *Client) DhtServers() []*dht.Server {
1139         return cl.dhtServers
1140 }
1141
1142 func (cl *Client) AddDHTNodes(nodes []string) {
1143         for _, n := range nodes {
1144                 hmp := missinggo.SplitHostMaybePort(n)
1145                 ip := net.ParseIP(hmp.Host)
1146                 if ip == nil {
1147                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1148                         continue
1149                 }
1150                 ni := krpc.NodeInfo{
1151                         Addr: krpc.NodeAddr{
1152                                 IP:   ip,
1153                                 Port: hmp.Port,
1154                         },
1155                 }
1156                 cl.eachDhtServer(func(s *dht.Server) {
1157                         s.AddNode(ni)
1158                 })
1159         }
1160 }
1161
1162 func (cl *Client) banPeerIP(ip net.IP) {
1163         if cl.badPeerIPs == nil {
1164                 cl.badPeerIPs = make(map[string]struct{})
1165         }
1166         cl.badPeerIPs[ip.String()] = struct{}{}
1167 }
1168
1169 func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
1170         c = &connection{
1171                 conn:            nc,
1172                 outgoing:        outgoing,
1173                 Choked:          true,
1174                 PeerChoked:      true,
1175                 PeerMaxRequests: 250,
1176                 writeBuffer:     new(bytes.Buffer),
1177         }
1178         c.writerCond.L = &cl.mu
1179         c.setRW(connStatsReadWriter{nc, c})
1180         c.r = &rateLimitedReader{
1181                 l: cl.downloadLimit,
1182                 r: c.r,
1183         }
1184         return
1185 }
1186
1187 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1188         cl.mu.Lock()
1189         defer cl.mu.Unlock()
1190         t := cl.torrent(ih)
1191         if t == nil {
1192                 return
1193         }
1194         t.addPeers([]Peer{{
1195                 IP:     p.IP,
1196                 Port:   p.Port,
1197                 Source: peerSourceDHTAnnouncePeer,
1198         }})
1199 }
1200
1201 func firstNotNil(ips ...net.IP) net.IP {
1202         for _, ip := range ips {
1203                 if ip != nil {
1204                         return ip
1205                 }
1206         }
1207         return nil
1208 }
1209
1210 func (cl *Client) eachListener(f func(socket) bool) {
1211         for _, s := range cl.conns {
1212                 if !f(s) {
1213                         break
1214                 }
1215         }
1216 }
1217
1218 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1219         cl.eachListener(func(l socket) bool {
1220                 ret = l
1221                 return !f(l)
1222         })
1223         return
1224 }
1225
1226 func (cl *Client) publicIp(peer net.IP) net.IP {
1227         // TODO: Use BEP 10 to determine how peers are seeing us.
1228         if peer.To4() != nil {
1229                 return firstNotNil(
1230                         cl.config.PublicIp4,
1231                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1232                 )
1233         } else {
1234                 return firstNotNil(
1235                         cl.config.PublicIp6,
1236                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1237                 )
1238         }
1239 }
1240
1241 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1242         return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1243                 return f(missinggo.AddrIP(l.Addr()))
1244         }).Addr())
1245 }
1246
1247 // Our IP as a peer should see it.
1248 func (cl *Client) publicAddr(peer net.IP) ipPort {
1249         return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1250 }
1251
1252 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1253         cl.mu.Lock()
1254         defer cl.mu.Unlock()
1255         cl.eachListener(func(l socket) bool {
1256                 ret = append(ret, l.Addr())
1257                 return true
1258         })
1259         return
1260 }
1261
1262 func (cl *Client) onBadAccept(addr net.Addr) {
1263         ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
1264         if cl.acceptLimiter == nil {
1265                 cl.acceptLimiter = make(map[ipStr]int)
1266         }
1267         cl.acceptLimiter[ipStr(ip.String())]++
1268 }
1269
1270 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1271         if ip4 := ip.To4(); ip4 != nil {
1272                 return ip4.Mask(net.CIDRMask(24, 32))
1273         }
1274         return ip
1275 }
1276
1277 func (cl *Client) acceptLimitClearer() {
1278         for {
1279                 select {
1280                 case <-cl.closed.LockedChan(&cl.mu):
1281                         return
1282                 case <-time.After(15 * time.Minute):
1283                         cl.mu.Lock()
1284                         cl.acceptLimiter = nil
1285                         cl.mu.Unlock()
1286                 }
1287         }
1288 }
1289
1290 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1291         // return true
1292         return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] >= 10
1293 }