]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
8b8f9b4f86bd90db09df81f46bba4a42fb9c5f02
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "crypto/rand"
8         "encoding/binary"
9         "errors"
10         "fmt"
11         "io"
12         "net"
13         "strconv"
14         "strings"
15         "time"
16
17         "github.com/anacrolix/dht"
18         "github.com/anacrolix/dht/krpc"
19         "github.com/anacrolix/log"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/pproffd"
22         "github.com/anacrolix/missinggo/pubsub"
23         "github.com/anacrolix/missinggo/slices"
24         "github.com/anacrolix/sync"
25         "github.com/davecgh/go-spew/spew"
26         "github.com/dustin/go-humanize"
27         "github.com/google/btree"
28         "golang.org/x/time/rate"
29
30         "github.com/anacrolix/torrent/bencode"
31         "github.com/anacrolix/torrent/iplist"
32         "github.com/anacrolix/torrent/metainfo"
33         "github.com/anacrolix/torrent/mse"
34         pp "github.com/anacrolix/torrent/peer_protocol"
35         "github.com/anacrolix/torrent/storage"
36 )
37
38 // Clients contain zero or more Torrents. A Client manages a blocklist, the
39 // TCP/UDP protocol ports, and DHT as desired.
40 type Client struct {
41         mu     sync.RWMutex
42         event  sync.Cond
43         closed missinggo.Event
44
45         config Config
46         logger *log.Logger
47
48         halfOpenLimit  int
49         peerID         PeerID
50         defaultStorage *storage.Client
51         onClose        []func()
52         conns          []socket
53         dhtServers     []*dht.Server
54         ipBlockList    iplist.Ranger
55         // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
56         extensionBytes peerExtensionBytes
57         uploadLimit    *rate.Limiter
58         downloadLimit  *rate.Limiter
59
60         // Set of addresses that have our client ID. This intentionally will
61         // include ourselves if we end up trying to connect to our own address
62         // through legitimate channels.
63         dopplegangerAddrs map[string]struct{}
64         badPeerIPs        map[string]struct{}
65         torrents          map[metainfo.Hash]*Torrent
66         // An aggregate of stats over all connections.
67         stats ConnStats
68 }
69
70 func (cl *Client) BadPeerIPs() []string {
71         cl.mu.RLock()
72         defer cl.mu.RUnlock()
73         return cl.badPeerIPsLocked()
74 }
75
76 func (cl *Client) badPeerIPsLocked() []string {
77         return slices.FromMapKeys(cl.badPeerIPs).([]string)
78 }
79
80 func (cl *Client) PeerID() PeerID {
81         return cl.peerID
82 }
83
84 type torrentAddr string
85
86 func (torrentAddr) Network() string { return "" }
87
88 func (me torrentAddr) String() string { return string(me) }
89
90 func (cl *Client) LocalPort() (port int) {
91         cl.eachListener(func(l socket) bool {
92                 _port := missinggo.AddrPort(l.Addr())
93                 if _port == 0 {
94                         panic(l)
95                 }
96                 if port == 0 {
97                         port = _port
98                 } else if port != _port {
99                         panic("mismatched ports")
100                 }
101                 return true
102         })
103         return
104 }
105
106 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
107         dhtStats := s.Stats()
108         fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
109         fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
110         fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
111         fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
112 }
113
114 // Writes out a human readable status of the client, such as for writing to a
115 // HTTP status page.
116 func (cl *Client) WriteStatus(_w io.Writer) {
117         cl.mu.Lock()
118         defer cl.mu.Unlock()
119         w := bufio.NewWriter(_w)
120         defer w.Flush()
121         fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
122         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
123         fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
124         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
125         cl.eachDhtServer(func(s *dht.Server) {
126                 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
127                 writeDhtServerStatus(w, s)
128         })
129         spew.Fdump(w, cl.stats)
130         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
131         fmt.Fprintln(w)
132         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
133                 return l.InfoHash().AsString() < r.InfoHash().AsString()
134         }).([]*Torrent) {
135                 if t.name() == "" {
136                         fmt.Fprint(w, "<unknown name>")
137                 } else {
138                         fmt.Fprint(w, t.name())
139                 }
140                 fmt.Fprint(w, "\n")
141                 if t.info != nil {
142                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
143                 } else {
144                         w.WriteString("<missing metainfo>")
145                 }
146                 fmt.Fprint(w, "\n")
147                 t.writeStatus(w)
148                 fmt.Fprintln(w)
149         }
150 }
151
152 const debugLogValue = "debug"
153
154 func (cl *Client) debugLogFilter(m *log.Msg) bool {
155         if !cl.config.Debug {
156                 _, ok := m.Values()[debugLogValue]
157                 return !ok
158         }
159         return true
160 }
161
162 func (cl *Client) initLogger() {
163         cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
164 }
165
166 func (cl *Client) announceKey() int32 {
167         return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
168 }
169
170 func NewClient(cfg *Config) (cl *Client, err error) {
171         if cfg == nil {
172                 cfg = &Config{}
173         }
174         cfg.setDefaults()
175
176         defer func() {
177                 if err != nil {
178                         cl = nil
179                 }
180         }()
181         cl = &Client{
182                 halfOpenLimit:     cfg.HalfOpenConnsPerTorrent,
183                 config:            *cfg,
184                 dopplegangerAddrs: make(map[string]struct{}),
185                 torrents:          make(map[metainfo.Hash]*Torrent),
186         }
187         cl.initLogger()
188         defer func() {
189                 if err == nil {
190                         return
191                 }
192                 cl.Close()
193         }()
194         if cfg.UploadRateLimiter == nil {
195                 cl.uploadLimit = rate.NewLimiter(rate.Inf, 0)
196         } else {
197                 cl.uploadLimit = cfg.UploadRateLimiter
198         }
199         if cfg.DownloadRateLimiter == nil {
200                 cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
201         } else {
202                 cl.downloadLimit = cfg.DownloadRateLimiter
203         }
204         cl.extensionBytes = defaultPeerExtensionBytes()
205         cl.event.L = &cl.mu
206         storageImpl := cfg.DefaultStorage
207         if storageImpl == nil {
208                 // We'd use mmap but HFS+ doesn't support sparse files.
209                 storageImpl = storage.NewFile(cfg.DataDir)
210                 cl.onClose = append(cl.onClose, func() {
211                         if err := storageImpl.Close(); err != nil {
212                                 log.Printf("error closing default storage: %s", err)
213                         }
214                 })
215         }
216         cl.defaultStorage = storage.NewClient(storageImpl)
217         if cfg.IPBlocklist != nil {
218                 cl.ipBlockList = cfg.IPBlocklist
219         }
220
221         if cfg.PeerID != "" {
222                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
223         } else {
224                 o := copy(cl.peerID[:], cfg.Bep20)
225                 _, err = rand.Read(cl.peerID[o:])
226                 if err != nil {
227                         panic("error generating peer id")
228                 }
229         }
230
231         cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL)
232         if err != nil {
233                 return
234         }
235         // Check for panics.
236         cl.LocalPort()
237
238         for _, s := range cl.conns {
239                 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
240                         go cl.acceptConnections(s)
241                 }
242         }
243
244         go cl.forwardPort()
245         if !cfg.NoDHT {
246                 for _, s := range cl.conns {
247                         if pc, ok := s.(net.PacketConn); ok {
248                                 ds, err := cl.newDhtServer(pc)
249                                 if err != nil {
250                                         panic(err)
251                                 }
252                                 cl.dhtServers = append(cl.dhtServers, ds)
253                         }
254                 }
255         }
256
257         return
258 }
259
260 func (cl *Client) enabledPeerNetworks() (ns []string) {
261         for _, n := range allPeerNetworks {
262                 if peerNetworkEnabled(n, cl.config) {
263                         ns = append(ns, n)
264                 }
265         }
266         return
267 }
268
269 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
270         cfg := dht.ServerConfig{
271                 IPBlocklist:    cl.ipBlockList,
272                 Conn:           conn,
273                 OnAnnouncePeer: cl.onDHTAnnouncePeer,
274                 PublicIP: func() net.IP {
275                         if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
276                                 return cl.config.PublicIp6
277                         }
278                         return cl.config.PublicIp4
279                 }(),
280                 StartingNodes: cl.config.DhtStartingNodes,
281         }
282         s, err = dht.NewServer(&cfg)
283         if err == nil {
284                 go func() {
285                         if _, err := s.Bootstrap(); err != nil {
286                                 log.Printf("error bootstrapping dht: %s", err)
287                         }
288                 }()
289         }
290         return
291 }
292
293 func firstNonEmptyString(ss ...string) string {
294         for _, s := range ss {
295                 if s != "" {
296                         return s
297                 }
298         }
299         return ""
300 }
301
302 func (cl *Client) Closed() <-chan struct{} {
303         cl.mu.Lock()
304         defer cl.mu.Unlock()
305         return cl.closed.C()
306 }
307
308 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
309         for _, ds := range cl.dhtServers {
310                 f(ds)
311         }
312 }
313
314 func (cl *Client) closeSockets() {
315         cl.eachListener(func(l socket) bool {
316                 l.Close()
317                 return true
318         })
319         cl.conns = nil
320 }
321
322 // Stops the client. All connections to peers are closed and all activity will
323 // come to a halt.
324 func (cl *Client) Close() {
325         cl.mu.Lock()
326         defer cl.mu.Unlock()
327         cl.closed.Set()
328         cl.eachDhtServer(func(s *dht.Server) { s.Close() })
329         cl.closeSockets()
330         for _, t := range cl.torrents {
331                 t.close()
332         }
333         for _, f := range cl.onClose {
334                 f()
335         }
336         cl.event.Broadcast()
337 }
338
339 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
340         if cl.ipBlockList == nil {
341                 return
342         }
343         return cl.ipBlockList.Lookup(ip)
344 }
345
346 func (cl *Client) ipIsBlocked(ip net.IP) bool {
347         _, blocked := cl.ipBlockRange(ip)
348         return blocked
349 }
350
351 func (cl *Client) waitAccept() {
352         for {
353                 for _, t := range cl.torrents {
354                         if t.wantConns() {
355                                 return
356                         }
357                 }
358                 if cl.closed.IsSet() {
359                         return
360                 }
361                 cl.event.Wait()
362         }
363 }
364
365 func (cl *Client) rejectAccepted(conn net.Conn) bool {
366         ra := conn.RemoteAddr()
367         rip := missinggo.AddrIP(ra)
368         if cl.config.DisableIPv4Peers && rip.To4() != nil {
369                 return true
370         }
371         if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
372                 return true
373         }
374         if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
375                 return true
376         }
377         return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
378 }
379
380 func (cl *Client) acceptConnections(l net.Listener) {
381         for {
382                 conn, err := l.Accept()
383                 conn = pproffd.WrapNetConn(conn)
384                 cl.mu.RLock()
385                 closed := cl.closed.IsSet()
386                 reject := false
387                 if conn != nil {
388                         reject = cl.rejectAccepted(conn)
389                 }
390                 cl.mu.RUnlock()
391                 if closed {
392                         if conn != nil {
393                                 conn.Close()
394                         }
395                         return
396                 }
397                 if err != nil {
398                         log.Print(err)
399                         // I think something harsher should happen here? Our accept
400                         // routine just fucked off.
401                         return
402                 }
403                 go func() {
404                         if reject {
405                                 torrent.Add("rejected accepted connections", 1)
406                                 conn.Close()
407                         } else {
408                                 go cl.incomingConnection(conn)
409                         }
410                         log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
411                         torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
412                         torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
413                         torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
414                 }()
415         }
416 }
417
418 func (cl *Client) incomingConnection(nc net.Conn) {
419         defer nc.Close()
420         if tc, ok := nc.(*net.TCPConn); ok {
421                 tc.SetLinger(0)
422         }
423         c := cl.newConnection(nc)
424         c.Discovery = peerSourceIncoming
425         cl.runReceivedConn(c)
426 }
427
428 // Returns a handle to the given torrent, if it's present in the client.
429 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
430         cl.mu.Lock()
431         defer cl.mu.Unlock()
432         t, ok = cl.torrents[ih]
433         return
434 }
435
436 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
437         return cl.torrents[ih]
438 }
439
440 type dialResult struct {
441         Conn net.Conn
442 }
443
444 func countDialResult(err error) {
445         if err == nil {
446                 successfulDials.Add(1)
447         } else {
448                 unsuccessfulDials.Add(1)
449         }
450 }
451
452 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
453         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
454         if ret < minDialTimeout {
455                 ret = minDialTimeout
456         }
457         return
458 }
459
460 // Returns whether an address is known to connect to a client with our own ID.
461 func (cl *Client) dopplegangerAddr(addr string) bool {
462         _, ok := cl.dopplegangerAddrs[addr]
463         return ok
464 }
465
466 func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) {
467         d := net.Dialer{
468                 // Can't bind to the listen address, even though we intend to create an
469                 // endpoint pair that is distinct. Oh well.
470
471                 // LocalAddr: cl.tcpListener.Addr(),
472         }
473         c, err = d.DialContext(ctx, "tcp"+ipNetworkSuffix(!cl.config.DisableIPv4 && !cl.config.DisableIPv4Peers, !cl.config.DisableIPv6), addr)
474         countDialResult(err)
475         if err == nil {
476                 c.(*net.TCPConn).SetLinger(0)
477         }
478         c = pproffd.WrapNetConn(c)
479         return
480 }
481
482 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
483         switch {
484         case allowIpv4 && allowIpv6:
485                 return ""
486         case allowIpv4 && !allowIpv6:
487                 return "4"
488         case !allowIpv4 && allowIpv6:
489                 return "6"
490         default:
491                 panic("unhandled ip network combination")
492         }
493 }
494
495 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
496         return sock.DialContext(ctx, "", addr)
497 }
498
499 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
500
501 func peerNetworkEnabled(network string, cfg Config) bool {
502         c := func(s string) bool {
503                 return strings.Contains(network, s)
504         }
505         if cfg.DisableUTP {
506                 if c("udp") || c("utp") {
507                         return false
508                 }
509         }
510         if cfg.DisableTCP && c("tcp") {
511                 return false
512         }
513         if cfg.DisableIPv6 && c("6") {
514                 return false
515         }
516         return true
517 }
518
519 // Returns a connection over UTP or TCP, whichever is first to connect.
520 func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
521         ctx, cancel := context.WithCancel(ctx)
522         // As soon as we return one connection, cancel the others.
523         defer cancel()
524         left := 0
525         resCh := make(chan dialResult, left)
526         dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
527                 left++
528                 go func() {
529                         c, err := f(ctx, addr)
530                         countDialResult(err)
531                         resCh <- dialResult{c}
532                 }()
533         }
534         func() {
535                 cl.mu.Lock()
536                 defer cl.mu.Unlock()
537                 cl.eachListener(func(s socket) bool {
538                         if peerNetworkEnabled(s.Addr().Network(), cl.config) {
539                                 dial(s.dial)
540                         }
541                         return true
542                 })
543         }()
544         var res dialResult
545         // Wait for a successful connection.
546         for ; left > 0 && res.Conn == nil; left-- {
547                 res = <-resCh
548         }
549         // There are still incompleted dials.
550         go func() {
551                 for ; left > 0; left-- {
552                         conn := (<-resCh).Conn
553                         if conn != nil {
554                                 conn.Close()
555                         }
556                 }
557         }()
558         if res.Conn != nil {
559                 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
560         }
561         return res.Conn
562 }
563
564 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
565         if _, ok := t.halfOpen[addr]; !ok {
566                 panic("invariant broken")
567         }
568         delete(t.halfOpen, addr)
569         t.openNewConns()
570 }
571
572 // Performs initiator handshakes and returns a connection. Returns nil
573 // *connection if no connection for valid reasons.
574 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
575         c = cl.newConnection(nc)
576         c.headerEncrypted = encryptHeader
577         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
578         defer cancel()
579         dl, ok := ctx.Deadline()
580         if !ok {
581                 panic(ctx)
582         }
583         err = nc.SetDeadline(dl)
584         if err != nil {
585                 panic(err)
586         }
587         ok, err = cl.initiateHandshakes(c, t)
588         if !ok {
589                 c = nil
590         }
591         return
592 }
593
594 // Returns nil connection and nil error if no connection could be established
595 // for valid reasons.
596 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
597         nc := cl.dialFirst(ctx, addr)
598         if nc == nil {
599                 return
600         }
601         defer func() {
602                 if c == nil || err != nil {
603                         nc.Close()
604                 }
605         }()
606         return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
607 }
608
609 // Returns nil connection and nil error if no connection could be established
610 // for valid reasons.
611 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
612         ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
613         defer cancel()
614         obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
615         c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
616         if err != nil {
617                 return
618         }
619         if c != nil {
620                 go torrent.Add("initiated conn with preferred header obfuscation", 1)
621                 return
622         }
623         if cl.config.ForceEncryption {
624                 // We should have just tried with an obfuscated header. A plaintext
625                 // header can't result in an encrypted connection, so we're done.
626                 if !obfuscatedHeaderFirst {
627                         panic(cl.config.EncryptionPolicy)
628                 }
629                 return
630         }
631         // Try again with encryption if we didn't earlier, or without if we did.
632         c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
633         if c != nil {
634                 go torrent.Add("initiated conn with fallback header obfuscation", 1)
635         }
636         return
637 }
638
639 // Called to dial out and run a connection. The addr we're given is already
640 // considered half-open.
641 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
642         c, err := cl.establishOutgoingConn(t, addr)
643         cl.mu.Lock()
644         defer cl.mu.Unlock()
645         // Don't release lock between here and addConnection, unless it's for
646         // failure.
647         cl.noLongerHalfOpen(t, addr)
648         if err != nil {
649                 if cl.config.Debug {
650                         log.Printf("error establishing outgoing connection: %s", err)
651                 }
652                 return
653         }
654         if c == nil {
655                 return
656         }
657         defer c.Close()
658         c.Discovery = ps
659         cl.runHandshookConn(c, t, true)
660 }
661
662 // The port number for incoming peer connections. 0 if the client isn't
663 // listening.
664 func (cl *Client) incomingPeerPort() int {
665         return cl.LocalPort()
666 }
667
668 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
669         if c.headerEncrypted {
670                 var rw io.ReadWriter
671                 rw, c.cryptoMethod, err = mse.InitiateHandshake(
672                         struct {
673                                 io.Reader
674                                 io.Writer
675                         }{c.r, c.w},
676                         t.infoHash[:],
677                         nil,
678                         func() mse.CryptoMethod {
679                                 switch {
680                                 case cl.config.ForceEncryption:
681                                         return mse.CryptoMethodRC4
682                                 case cl.config.DisableEncryption:
683                                         return mse.CryptoMethodPlaintext
684                                 default:
685                                         return mse.AllSupportedCrypto
686                                 }
687                         }(),
688                 )
689                 c.setRW(rw)
690                 if err != nil {
691                         return
692                 }
693         }
694         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
695         if ih != t.infoHash {
696                 ok = false
697         }
698         return
699 }
700
701 // Calls f with any secret keys.
702 func (cl *Client) forSkeys(f func([]byte) bool) {
703         cl.mu.Lock()
704         defer cl.mu.Unlock()
705         for ih := range cl.torrents {
706                 if !f(ih[:]) {
707                         break
708                 }
709         }
710 }
711
712 // Do encryption and bittorrent handshakes as receiver.
713 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
714         var rw io.ReadWriter
715         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
716         c.setRW(rw)
717         if err != nil {
718                 if err == mse.ErrNoSecretKeyMatch {
719                         err = nil
720                 }
721                 return
722         }
723         if cl.config.ForceEncryption && !c.headerEncrypted {
724                 err = errors.New("connection not encrypted")
725                 return
726         }
727         ih, ok, err := cl.connBTHandshake(c, nil)
728         if err != nil {
729                 err = fmt.Errorf("error during bt handshake: %s", err)
730                 return
731         }
732         if !ok {
733                 return
734         }
735         cl.mu.Lock()
736         t = cl.torrents[ih]
737         cl.mu.Unlock()
738         return
739 }
740
741 // Returns !ok if handshake failed for valid reasons.
742 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
743         res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
744         if err != nil || !ok {
745                 return
746         }
747         ret = res.Hash
748         c.PeerExtensionBytes = res.peerExtensionBytes
749         c.PeerID = res.PeerID
750         c.completedHandshake = time.Now()
751         return
752 }
753
754 func (cl *Client) runReceivedConn(c *connection) {
755         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
756         if err != nil {
757                 panic(err)
758         }
759         t, err := cl.receiveHandshakes(c)
760         if err != nil {
761                 if cl.config.Debug {
762                         log.Printf("error receiving handshakes: %s", err)
763                 }
764                 return
765         }
766         if t == nil {
767                 return
768         }
769         cl.mu.Lock()
770         defer cl.mu.Unlock()
771         cl.runHandshookConn(c, t, false)
772 }
773
774 func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
775         c.setTorrent(t)
776         if c.PeerID == cl.peerID {
777                 if outgoing {
778                         connsToSelf.Add(1)
779                         addr := c.conn.RemoteAddr().String()
780                         cl.dopplegangerAddrs[addr] = struct{}{}
781                 } else {
782                         // Because the remote address is not necessarily the same as its
783                         // client's torrent listen address, we won't record the remote address
784                         // as a doppleganger. Instead, the initiator can record *us* as the
785                         // doppleganger.
786                 }
787                 return
788         }
789         c.conn.SetWriteDeadline(time.Time{})
790         c.r = deadlineReader{c.conn, c.r}
791         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
792         if connIsIpv6(c.conn) {
793                 torrent.Add("completed handshake over ipv6", 1)
794         }
795         if !t.addConnection(c, outgoing) {
796                 return
797         }
798         defer t.dropConnection(c)
799         go c.writer(time.Minute)
800         cl.sendInitialMessages(c, t)
801         err := c.mainReadLoop()
802         if err != nil && cl.config.Debug {
803                 log.Printf("error during connection main read loop: %s", err)
804         }
805 }
806
807 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
808         func() {
809                 if conn.fastEnabled() {
810                         if torrent.haveAllPieces() {
811                                 conn.Post(pp.Message{Type: pp.HaveAll})
812                                 conn.sentHaves.AddRange(0, conn.t.NumPieces())
813                                 return
814                         } else if !torrent.haveAnyPieces() {
815                                 conn.Post(pp.Message{Type: pp.HaveNone})
816                                 conn.sentHaves.Clear()
817                                 return
818                         }
819                 }
820                 conn.PostBitfield()
821         }()
822         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
823                 conn.Post(pp.Message{
824                         Type:       pp.Extended,
825                         ExtendedID: pp.HandshakeExtendedID,
826                         ExtendedPayload: func() []byte {
827                                 d := map[string]interface{}{
828                                         "m": func() (ret map[string]int) {
829                                                 ret = make(map[string]int, 2)
830                                                 ret["ut_metadata"] = metadataExtendedId
831                                                 if !cl.config.DisablePEX {
832                                                         ret["ut_pex"] = pexExtendedId
833                                                 }
834                                                 return
835                                         }(),
836                                         "v": cl.config.ExtendedHandshakeClientVersion,
837                                         // No upload queue is implemented yet.
838                                         "reqq": 64,
839                                 }
840                                 if !cl.config.DisableEncryption {
841                                         d["e"] = 1
842                                 }
843                                 if torrent.metadataSizeKnown() {
844                                         d["metadata_size"] = torrent.metadataSize()
845                                 }
846                                 if p := cl.incomingPeerPort(); p != 0 {
847                                         d["p"] = p
848                                 }
849                                 yourip, err := addrCompactIP(conn.remoteAddr())
850                                 if err != nil {
851                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
852                                 } else {
853                                         d["yourip"] = yourip
854                                 }
855                                 // log.Printf("sending %v", d)
856                                 b, err := bencode.Marshal(d)
857                                 if err != nil {
858                                         panic(err)
859                                 }
860                                 return b
861                         }(),
862                 })
863         }
864         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
865                 conn.Post(pp.Message{
866                         Type: pp.Port,
867                         Port: cl.dhtPort(),
868                 })
869         }
870 }
871
872 func (cl *Client) dhtPort() (ret uint16) {
873         cl.eachDhtServer(func(s *dht.Server) {
874                 ret = uint16(missinggo.AddrPort(s.Addr()))
875         })
876         return
877 }
878
879 func (cl *Client) haveDhtServer() (ret bool) {
880         cl.eachDhtServer(func(_ *dht.Server) {
881                 ret = true
882         })
883         return
884 }
885
886 // Process incoming ut_metadata message.
887 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
888         var d map[string]int
889         err := bencode.Unmarshal(payload, &d)
890         if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
891         } else if err != nil {
892                 return fmt.Errorf("error unmarshalling bencode: %s", err)
893         }
894         msgType, ok := d["msg_type"]
895         if !ok {
896                 return errors.New("missing msg_type field")
897         }
898         piece := d["piece"]
899         switch msgType {
900         case pp.DataMetadataExtensionMsgType:
901                 if !c.requestedMetadataPiece(piece) {
902                         return fmt.Errorf("got unexpected piece %d", piece)
903                 }
904                 c.metadataRequests[piece] = false
905                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
906                 if begin < 0 || begin >= len(payload) {
907                         return fmt.Errorf("data has bad offset in payload: %d", begin)
908                 }
909                 t.saveMetadataPiece(piece, payload[begin:])
910                 c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful }))
911                 c.lastUsefulChunkReceived = time.Now()
912                 return t.maybeCompleteMetadata()
913         case pp.RequestMetadataExtensionMsgType:
914                 if !t.haveMetadataPiece(piece) {
915                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
916                         return nil
917                 }
918                 start := (1 << 14) * piece
919                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
920                 return nil
921         case pp.RejectMetadataExtensionMsgType:
922                 return nil
923         default:
924                 return errors.New("unknown msg_type value")
925         }
926 }
927
928 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
929         if port == 0 {
930                 return true
931         }
932         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
933                 return true
934         }
935         if _, ok := cl.ipBlockRange(ip); ok {
936                 return true
937         }
938         if _, ok := cl.badPeerIPs[ip.String()]; ok {
939                 return true
940         }
941         return false
942 }
943
944 // Return a Torrent ready for insertion into a Client.
945 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
946         // use provided storage, if provided
947         storageClient := cl.defaultStorage
948         if specStorage != nil {
949                 storageClient = storage.NewClient(specStorage)
950         }
951
952         t = &Torrent{
953                 cl:       cl,
954                 infoHash: ih,
955                 peers: prioritizedPeers{
956                         om: btree.New(32),
957                         getPrio: func(p Peer) peerPriority {
958                                 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
959                         },
960                 },
961                 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
962
963                 halfOpen:          make(map[string]Peer),
964                 pieceStateChanges: pubsub.NewPubSub(),
965
966                 storageOpener:       storageClient,
967                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
968
969                 networkingEnabled: true,
970                 requestStrategy:   2,
971                 metadataChanged: sync.Cond{
972                         L: &cl.mu,
973                 },
974         }
975         t.logger = cl.logger.Clone().AddValue(t)
976         t.setChunkSize(defaultChunkSize)
977         return
978 }
979
980 // A file-like handle to some torrent data resource.
981 type Handle interface {
982         io.Reader
983         io.Seeker
984         io.Closer
985         io.ReaderAt
986 }
987
988 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
989         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
990 }
991
992 // Adds a torrent by InfoHash with a custom Storage implementation.
993 // If the torrent already exists then this Storage is ignored and the
994 // existing torrent returned with `new` set to `false`
995 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
996         cl.mu.Lock()
997         defer cl.mu.Unlock()
998         t, ok := cl.torrents[infoHash]
999         if ok {
1000                 return
1001         }
1002         new = true
1003         t = cl.newTorrent(infoHash, specStorage)
1004         cl.eachDhtServer(func(s *dht.Server) {
1005                 go t.dhtAnnouncer(s)
1006         })
1007         cl.torrents[infoHash] = t
1008         t.updateWantPeersEvent()
1009         // Tickle Client.waitAccept, new torrent may want conns.
1010         cl.event.Broadcast()
1011         return
1012 }
1013
1014 // Add or merge a torrent spec. If the torrent is already present, the
1015 // trackers will be merged with the existing ones. If the Info isn't yet
1016 // known, it will be set. The display name is replaced if the new spec
1017 // provides one. Returns new if the torrent wasn't already in the client.
1018 // Note that any `Storage` defined on the spec will be ignored if the
1019 // torrent is already present (i.e. `new` return value is `true`)
1020 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1021         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1022         if spec.DisplayName != "" {
1023                 t.SetDisplayName(spec.DisplayName)
1024         }
1025         if spec.InfoBytes != nil {
1026                 err = t.SetInfoBytes(spec.InfoBytes)
1027                 if err != nil {
1028                         return
1029                 }
1030         }
1031         cl.mu.Lock()
1032         defer cl.mu.Unlock()
1033         if spec.ChunkSize != 0 {
1034                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1035         }
1036         t.addTrackers(spec.Trackers)
1037         t.maybeNewConns()
1038         return
1039 }
1040
1041 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1042         t, ok := cl.torrents[infoHash]
1043         if !ok {
1044                 err = fmt.Errorf("no such torrent")
1045                 return
1046         }
1047         err = t.close()
1048         if err != nil {
1049                 panic(err)
1050         }
1051         delete(cl.torrents, infoHash)
1052         return
1053 }
1054
1055 func (cl *Client) allTorrentsCompleted() bool {
1056         for _, t := range cl.torrents {
1057                 if !t.haveInfo() {
1058                         return false
1059                 }
1060                 if !t.haveAllPieces() {
1061                         return false
1062                 }
1063         }
1064         return true
1065 }
1066
1067 // Returns true when all torrents are completely downloaded and false if the
1068 // client is stopped before that.
1069 func (cl *Client) WaitAll() bool {
1070         cl.mu.Lock()
1071         defer cl.mu.Unlock()
1072         for !cl.allTorrentsCompleted() {
1073                 if cl.closed.IsSet() {
1074                         return false
1075                 }
1076                 cl.event.Wait()
1077         }
1078         return true
1079 }
1080
1081 // Returns handles to all the torrents loaded in the Client.
1082 func (cl *Client) Torrents() []*Torrent {
1083         cl.mu.Lock()
1084         defer cl.mu.Unlock()
1085         return cl.torrentsAsSlice()
1086 }
1087
1088 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1089         for _, t := range cl.torrents {
1090                 ret = append(ret, t)
1091         }
1092         return
1093 }
1094
1095 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1096         spec, err := TorrentSpecFromMagnetURI(uri)
1097         if err != nil {
1098                 return
1099         }
1100         T, _, err = cl.AddTorrentSpec(spec)
1101         return
1102 }
1103
1104 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1105         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1106         var ss []string
1107         slices.MakeInto(&ss, mi.Nodes)
1108         cl.AddDHTNodes(ss)
1109         return
1110 }
1111
1112 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1113         mi, err := metainfo.LoadFromFile(filename)
1114         if err != nil {
1115                 return
1116         }
1117         return cl.AddTorrent(mi)
1118 }
1119
1120 func (cl *Client) DhtServers() []*dht.Server {
1121         return cl.dhtServers
1122 }
1123
1124 func (cl *Client) AddDHTNodes(nodes []string) {
1125         for _, n := range nodes {
1126                 hmp := missinggo.SplitHostMaybePort(n)
1127                 ip := net.ParseIP(hmp.Host)
1128                 if ip == nil {
1129                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1130                         continue
1131                 }
1132                 ni := krpc.NodeInfo{
1133                         Addr: krpc.NodeAddr{
1134                                 IP:   ip,
1135                                 Port: hmp.Port,
1136                         },
1137                 }
1138                 cl.eachDhtServer(func(s *dht.Server) {
1139                         s.AddNode(ni)
1140                 })
1141         }
1142 }
1143
1144 func (cl *Client) banPeerIP(ip net.IP) {
1145         if cl.badPeerIPs == nil {
1146                 cl.badPeerIPs = make(map[string]struct{})
1147         }
1148         cl.badPeerIPs[ip.String()] = struct{}{}
1149 }
1150
1151 func (cl *Client) newConnection(nc net.Conn) (c *connection) {
1152         c = &connection{
1153                 conn:            nc,
1154                 Choked:          true,
1155                 PeerChoked:      true,
1156                 PeerMaxRequests: 250,
1157                 writeBuffer:     new(bytes.Buffer),
1158         }
1159         c.writerCond.L = &cl.mu
1160         c.setRW(connStatsReadWriter{nc, &cl.mu, c})
1161         c.r = &rateLimitedReader{
1162                 l: cl.downloadLimit,
1163                 r: c.r,
1164         }
1165         return
1166 }
1167
1168 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1169         cl.mu.Lock()
1170         defer cl.mu.Unlock()
1171         t := cl.torrent(ih)
1172         if t == nil {
1173                 return
1174         }
1175         t.addPeers([]Peer{{
1176                 IP:     p.IP,
1177                 Port:   p.Port,
1178                 Source: peerSourceDHTAnnouncePeer,
1179         }})
1180 }
1181
1182 func firstNotNil(ips ...net.IP) net.IP {
1183         for _, ip := range ips {
1184                 if ip != nil {
1185                         return ip
1186                 }
1187         }
1188         return nil
1189 }
1190
1191 func (cl *Client) eachListener(f func(socket) bool) {
1192         for _, s := range cl.conns {
1193                 if !f(s) {
1194                         break
1195                 }
1196         }
1197 }
1198
1199 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1200         cl.eachListener(func(l socket) bool {
1201                 ret = l
1202                 return !f(l)
1203         })
1204         return
1205 }
1206
1207 func (cl *Client) publicIp(peer net.IP) net.IP {
1208         // TODO: Use BEP 10 to determine how peers are seeing us.
1209         if peer.To4() != nil {
1210                 return firstNotNil(
1211                         cl.config.PublicIp4,
1212                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1213                 )
1214         } else {
1215                 return firstNotNil(
1216                         cl.config.PublicIp6,
1217                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1218                 )
1219         }
1220 }
1221
1222 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1223         return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1224                 return f(missinggo.AddrIP(l.Addr()))
1225         }).Addr())
1226 }
1227
1228 // Our IP as a peer should see it.
1229 func (cl *Client) publicAddr(peer net.IP) ipPort {
1230         return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1231 }
1232
1233 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1234         cl.eachListener(func(l socket) bool {
1235                 ret = append(ret, l.Addr())
1236                 return true
1237         })
1238         return
1239 }