]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
866aa2358578243fd4ab422c3a4dc34022b0ca2e
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "crypto/rand"
8         "encoding/binary"
9         "errors"
10         "fmt"
11         "io"
12         "net"
13         "net/http"
14         "net/url"
15         "strconv"
16         "strings"
17         "time"
18
19         "github.com/anacrolix/dht/v2"
20         "github.com/anacrolix/dht/v2/krpc"
21         "github.com/anacrolix/log"
22         "github.com/anacrolix/missinggo/bitmap"
23         "github.com/anacrolix/missinggo/perf"
24         "github.com/anacrolix/missinggo/pproffd"
25         "github.com/anacrolix/missinggo/pubsub"
26         "github.com/anacrolix/missinggo/slices"
27         "github.com/anacrolix/missinggo/v2"
28         "github.com/anacrolix/missinggo/v2/conntrack"
29         "github.com/anacrolix/sync"
30         "github.com/davecgh/go-spew/spew"
31         "github.com/dustin/go-humanize"
32         "github.com/google/btree"
33         "golang.org/x/time/rate"
34         "golang.org/x/xerrors"
35
36         "github.com/anacrolix/torrent/bencode"
37         "github.com/anacrolix/torrent/iplist"
38         "github.com/anacrolix/torrent/metainfo"
39         "github.com/anacrolix/torrent/mse"
40         pp "github.com/anacrolix/torrent/peer_protocol"
41         "github.com/anacrolix/torrent/storage"
42 )
43
44 // Clients contain zero or more Torrents. A Client manages a blocklist, the
45 // TCP/UDP protocol ports, and DHT as desired.
46 type Client struct {
47         // An aggregate of stats over all connections. First in struct to ensure
48         // 64-bit alignment of fields. See #262.
49         stats ConnStats
50
51         _mu    lockWithDeferreds
52         event  sync.Cond
53         closed missinggo.Event
54
55         config *ClientConfig
56         logger log.Logger
57
58         peerID         PeerID
59         defaultStorage *storage.Client
60         onClose        []func()
61         conns          []socket
62         dhtServers     []*dht.Server
63         ipBlockList    iplist.Ranger
64         // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
65         extensionBytes pp.PeerExtensionBits
66
67         // Set of addresses that have our client ID. This intentionally will
68         // include ourselves if we end up trying to connect to our own address
69         // through legitimate channels.
70         dopplegangerAddrs map[string]struct{}
71         badPeerIPs        map[string]struct{}
72         torrents          map[InfoHash]*Torrent
73
74         acceptLimiter   map[ipStr]int
75         dialRateLimiter *rate.Limiter
76 }
77
78 type ipStr string
79
80 func (cl *Client) BadPeerIPs() []string {
81         cl.rLock()
82         defer cl.rUnlock()
83         return cl.badPeerIPsLocked()
84 }
85
86 func (cl *Client) badPeerIPsLocked() []string {
87         return slices.FromMapKeys(cl.badPeerIPs).([]string)
88 }
89
90 func (cl *Client) PeerID() PeerID {
91         return cl.peerID
92 }
93
94 func (cl *Client) LocalPort() (port int) {
95         cl.eachListener(func(l socket) bool {
96                 _port := missinggo.AddrPort(l.Addr())
97                 if _port == 0 {
98                         panic(l)
99                 }
100                 if port == 0 {
101                         port = _port
102                 } else if port != _port {
103                         panic("mismatched ports")
104                 }
105                 return true
106         })
107         return
108 }
109
110 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
111         dhtStats := s.Stats()
112         fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
113         fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
114         fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
115         fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
116 }
117
118 // Writes out a human readable status of the client, such as for writing to a
119 // HTTP status page.
120 func (cl *Client) WriteStatus(_w io.Writer) {
121         cl.rLock()
122         defer cl.rUnlock()
123         w := bufio.NewWriter(_w)
124         defer w.Flush()
125         fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
126         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
127         fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
128         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
129         cl.eachDhtServer(func(s *dht.Server) {
130                 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
131                 writeDhtServerStatus(w, s)
132         })
133         spew.Fdump(w, &cl.stats)
134         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
135         fmt.Fprintln(w)
136         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
137                 return l.InfoHash().AsString() < r.InfoHash().AsString()
138         }).([]*Torrent) {
139                 if t.name() == "" {
140                         fmt.Fprint(w, "<unknown name>")
141                 } else {
142                         fmt.Fprint(w, t.name())
143                 }
144                 fmt.Fprint(w, "\n")
145                 if t.info != nil {
146                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
147                 } else {
148                         w.WriteString("<missing metainfo>")
149                 }
150                 fmt.Fprint(w, "\n")
151                 t.writeStatus(w)
152                 fmt.Fprintln(w)
153         }
154 }
155
156 const debugLogValue = log.Debug
157
158 func (cl *Client) debugLogFilter(m log.Msg) bool {
159         if cl.config.Debug {
160                 return true
161         }
162         return !m.HasValue(debugLogValue)
163 }
164
165 func (cl *Client) initLogger() {
166         cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
167 }
168
169 func (cl *Client) announceKey() int32 {
170         return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
171 }
172
173 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
174         if cfg == nil {
175                 cfg = NewDefaultClientConfig()
176                 cfg.ListenPort = 0
177         }
178         defer func() {
179                 if err != nil {
180                         cl = nil
181                 }
182         }()
183         cl = &Client{
184                 config:            cfg,
185                 dopplegangerAddrs: make(map[string]struct{}),
186                 torrents:          make(map[metainfo.Hash]*Torrent),
187                 dialRateLimiter:   rate.NewLimiter(10, 10),
188         }
189         go cl.acceptLimitClearer()
190         cl.initLogger()
191         defer func() {
192                 if err == nil {
193                         return
194                 }
195                 cl.Close()
196         }()
197         cl.extensionBytes = defaultPeerExtensionBytes()
198         cl.event.L = cl.locker()
199         storageImpl := cfg.DefaultStorage
200         if storageImpl == nil {
201                 // We'd use mmap but HFS+ doesn't support sparse files.
202                 storageImpl = storage.NewFile(cfg.DataDir)
203                 cl.onClose = append(cl.onClose, func() {
204                         if err := storageImpl.Close(); err != nil {
205                                 cl.logger.Printf("error closing default storage: %s", err)
206                         }
207                 })
208         }
209         cl.defaultStorage = storage.NewClient(storageImpl)
210         if cfg.IPBlocklist != nil {
211                 cl.ipBlockList = cfg.IPBlocklist
212         }
213
214         if cfg.PeerID != "" {
215                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
216         } else {
217                 o := copy(cl.peerID[:], cfg.Bep20)
218                 _, err = rand.Read(cl.peerID[o:])
219                 if err != nil {
220                         panic("error generating peer id")
221                 }
222         }
223
224         if cl.config.HTTPProxy == nil && cl.config.ProxyURL != "" {
225                 if fixedURL, err := url.Parse(cl.config.ProxyURL); err == nil {
226                         cl.config.HTTPProxy = http.ProxyURL(fixedURL)
227                 }
228         }
229
230         cl.conns, err = listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL, cl.firewallCallback)
231         if err != nil {
232                 return
233         }
234         // Check for panics.
235         cl.LocalPort()
236
237         for _, s := range cl.conns {
238                 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
239                         go cl.acceptConnections(s)
240                 }
241         }
242
243         go cl.forwardPort()
244         if !cfg.NoDHT {
245                 for _, s := range cl.conns {
246                         if pc, ok := s.(net.PacketConn); ok {
247                                 ds, err := cl.newDhtServer(pc)
248                                 if err != nil {
249                                         panic(err)
250                                 }
251                                 cl.dhtServers = append(cl.dhtServers, ds)
252                         }
253                 }
254         }
255
256         return
257 }
258
259 func (cl *Client) firewallCallback(net.Addr) bool {
260         cl.rLock()
261         block := !cl.wantConns()
262         cl.rUnlock()
263         if block {
264                 torrent.Add("connections firewalled", 1)
265         } else {
266                 torrent.Add("connections not firewalled", 1)
267         }
268         return block
269 }
270
271 func (cl *Client) enabledPeerNetworks() (ns []network) {
272         for _, n := range allPeerNetworks {
273                 if peerNetworkEnabled(n, cl.config) {
274                         ns = append(ns, n)
275                 }
276         }
277         return
278 }
279
280 func (cl *Client) listenOnNetwork(n network) bool {
281         if n.Ipv4 && cl.config.DisableIPv4 {
282                 return false
283         }
284         if n.Ipv6 && cl.config.DisableIPv6 {
285                 return false
286         }
287         if n.Tcp && cl.config.DisableTCP {
288                 return false
289         }
290         if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
291                 return false
292         }
293         return true
294 }
295
296 func (cl *Client) listenNetworks() (ns []network) {
297         for _, n := range allPeerNetworks {
298                 if cl.listenOnNetwork(n) {
299                         ns = append(ns, n)
300                 }
301         }
302         return
303 }
304
305 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
306         cfg := dht.ServerConfig{
307                 IPBlocklist:    cl.ipBlockList,
308                 Conn:           conn,
309                 OnAnnouncePeer: cl.onDHTAnnouncePeer,
310                 PublicIP: func() net.IP {
311                         if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
312                                 return cl.config.PublicIp6
313                         }
314                         return cl.config.PublicIp4
315                 }(),
316                 StartingNodes:      cl.config.DhtStartingNodes,
317                 ConnectionTracking: cl.config.ConnTracker,
318                 OnQuery:            cl.config.DHTOnQuery,
319                 Logger:             cl.logger.WithValues("dht", conn.LocalAddr().String()),
320         }
321         s, err = dht.NewServer(&cfg)
322         if err == nil {
323                 go func() {
324                         ts, err := s.Bootstrap()
325                         if err != nil {
326                                 cl.logger.Printf("error bootstrapping dht: %s", err)
327                         }
328                         log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
329                 }()
330         }
331         return
332 }
333
334 func (cl *Client) Closed() <-chan struct{} {
335         cl.lock()
336         defer cl.unlock()
337         return cl.closed.C()
338 }
339
340 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
341         for _, ds := range cl.dhtServers {
342                 f(ds)
343         }
344 }
345
346 func (cl *Client) closeSockets() {
347         cl.eachListener(func(l socket) bool {
348                 l.Close()
349                 return true
350         })
351         cl.conns = nil
352 }
353
354 // Stops the client. All connections to peers are closed and all activity will
355 // come to a halt.
356 func (cl *Client) Close() {
357         cl.lock()
358         defer cl.unlock()
359         cl.closed.Set()
360         cl.eachDhtServer(func(s *dht.Server) { s.Close() })
361         cl.closeSockets()
362         for _, t := range cl.torrents {
363                 t.close()
364         }
365         for _, f := range cl.onClose {
366                 f()
367         }
368         cl.event.Broadcast()
369 }
370
371 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
372         if cl.ipBlockList == nil {
373                 return
374         }
375         return cl.ipBlockList.Lookup(ip)
376 }
377
378 func (cl *Client) ipIsBlocked(ip net.IP) bool {
379         _, blocked := cl.ipBlockRange(ip)
380         return blocked
381 }
382
383 func (cl *Client) wantConns() bool {
384         for _, t := range cl.torrents {
385                 if t.wantConns() {
386                         return true
387                 }
388         }
389         return false
390 }
391
392 func (cl *Client) waitAccept() {
393         for {
394                 if cl.closed.IsSet() {
395                         return
396                 }
397                 if cl.wantConns() {
398                         return
399                 }
400                 cl.event.Wait()
401         }
402 }
403
404 func (cl *Client) rejectAccepted(conn net.Conn) bool {
405         ra := conn.RemoteAddr()
406         rip := missinggo.AddrIP(ra)
407         if cl.config.DisableIPv4Peers && rip.To4() != nil {
408                 return true
409         }
410         if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
411                 return true
412         }
413         if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
414                 return true
415         }
416         if cl.rateLimitAccept(rip) {
417                 return true
418         }
419         return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
420 }
421
422 func (cl *Client) acceptConnections(l net.Listener) {
423         for {
424                 conn, err := l.Accept()
425                 torrent.Add("client listener accepts", 1)
426                 conn = pproffd.WrapNetConn(conn)
427                 cl.rLock()
428                 closed := cl.closed.IsSet()
429                 reject := false
430                 if conn != nil {
431                         reject = cl.rejectAccepted(conn)
432                 }
433                 cl.rUnlock()
434                 if closed {
435                         if conn != nil {
436                                 conn.Close()
437                         }
438                         return
439                 }
440                 if err != nil {
441                         cl.logger.Printf("error accepting connection: %s", err)
442                         continue
443                 }
444                 go func() {
445                         if reject {
446                                 torrent.Add("rejected accepted connections", 1)
447                                 conn.Close()
448                         } else {
449                                 go cl.incomingConnection(conn)
450                         }
451                         log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
452                         torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
453                         torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
454                         torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
455                 }()
456         }
457 }
458
459 func (cl *Client) incomingConnection(nc net.Conn) {
460         defer nc.Close()
461         if tc, ok := nc.(*net.TCPConn); ok {
462                 tc.SetLinger(0)
463         }
464         c := cl.newConnection(nc, false, missinggo.IpPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
465         c.Discovery = peerSourceIncoming
466         cl.runReceivedConn(c)
467 }
468
469 // Returns a handle to the given torrent, if it's present in the client.
470 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
471         cl.lock()
472         defer cl.unlock()
473         t, ok = cl.torrents[ih]
474         return
475 }
476
477 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
478         return cl.torrents[ih]
479 }
480
481 type dialResult struct {
482         Conn    net.Conn
483         Network string
484 }
485
486 func countDialResult(err error) {
487         if err == nil {
488                 torrent.Add("successful dials", 1)
489         } else {
490                 torrent.Add("unsuccessful dials", 1)
491         }
492 }
493
494 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
495         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
496         if ret < minDialTimeout {
497                 ret = minDialTimeout
498         }
499         return
500 }
501
502 // Returns whether an address is known to connect to a client with our own ID.
503 func (cl *Client) dopplegangerAddr(addr string) bool {
504         _, ok := cl.dopplegangerAddrs[addr]
505         return ok
506 }
507
508 // Returns a connection over UTP or TCP, whichever is first to connect.
509 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
510         {
511                 t := perf.NewTimer(perf.CallerName(0))
512                 defer func() {
513                         if res.Conn == nil {
514                                 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
515                         } else {
516                                 t.Mark("returned conn over " + res.Network)
517                         }
518                 }()
519         }
520         ctx, cancel := context.WithCancel(ctx)
521         // As soon as we return one connection, cancel the others.
522         defer cancel()
523         left := 0
524         resCh := make(chan dialResult, left)
525         func() {
526                 cl.lock()
527                 defer cl.unlock()
528                 cl.eachListener(func(s socket) bool {
529                         func() {
530                                 network := s.Addr().Network()
531                                 if !peerNetworkEnabled(parseNetworkString(network), cl.config) {
532                                         return
533                                 }
534                                 left++
535                                 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
536                                 go func() {
537                                         resCh <- dialResult{
538                                                 cl.dialFromSocket(ctx, s, addr),
539                                                 network,
540                                         }
541                                 }()
542                         }()
543                         return true
544                 })
545         }()
546         // Wait for a successful connection.
547         func() {
548                 defer perf.ScopeTimer()()
549                 for ; left > 0 && res.Conn == nil; left-- {
550                         res = <-resCh
551                 }
552         }()
553         // There are still incompleted dials.
554         go func() {
555                 for ; left > 0; left-- {
556                         conn := (<-resCh).Conn
557                         if conn != nil {
558                                 conn.Close()
559                         }
560                 }
561         }()
562         if res.Conn != nil {
563                 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
564         }
565         //if res.Conn != nil {
566         //      cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
567         //} else {
568         //      cl.logger.Printf("failed to dial %s", addr)
569         //}
570         return res
571 }
572
573 func (cl *Client) dialFromSocket(ctx context.Context, s socket, addr string) net.Conn {
574         network := s.Addr().Network()
575         cte := cl.config.ConnTracker.Wait(
576                 ctx,
577                 conntrack.Entry{network, s.Addr().String(), addr},
578                 "dial torrent client",
579                 0,
580         )
581         // Try to avoid committing to a dial if the context is complete as it's difficult to determine
582         // which dial errors allow us to forget the connection tracking entry handle.
583         if ctx.Err() != nil {
584                 if cte != nil {
585                         cte.Forget()
586                 }
587                 return nil
588         }
589         c, err := s.dial(ctx, addr)
590         // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
591         // it now in case we close the connection forthwith.
592         if tc, ok := c.(*net.TCPConn); ok {
593                 tc.SetLinger(0)
594         }
595         countDialResult(err)
596         if c == nil {
597                 if err != nil && forgettableDialError(err) {
598                         cte.Forget()
599                 } else {
600                         cte.Done()
601                 }
602                 return nil
603         }
604         return closeWrapper{c, func() error {
605                 err := c.Close()
606                 cte.Done()
607                 return err
608         }}
609 }
610
611 func forgettableDialError(err error) bool {
612         return strings.Contains(err.Error(), "no suitable address found")
613 }
614
615 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
616         if _, ok := t.halfOpen[addr]; !ok {
617                 panic("invariant broken")
618         }
619         delete(t.halfOpen, addr)
620         t.openNewConns()
621 }
622
623 // Performs initiator handshakes and returns a connection. Returns nil
624 // *connection if no connection for valid reasons.
625 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr IpPort, network string) (c *connection, err error) {
626         c = cl.newConnection(nc, true, remoteAddr, network)
627         c.headerEncrypted = encryptHeader
628         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
629         defer cancel()
630         dl, ok := ctx.Deadline()
631         if !ok {
632                 panic(ctx)
633         }
634         err = nc.SetDeadline(dl)
635         if err != nil {
636                 panic(err)
637         }
638         err = cl.initiateHandshakes(c, t)
639         return
640 }
641
642 // Returns nil connection and nil error if no connection could be established
643 // for valid reasons.
644 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr IpPort, obfuscatedHeader bool) (*connection, error) {
645         dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
646                 cl.rLock()
647                 defer cl.rUnlock()
648                 return t.dialTimeout()
649         }())
650         defer cancel()
651         dr := cl.dialFirst(dialCtx, addr.String())
652         nc := dr.Conn
653         if nc == nil {
654                 if dialCtx.Err() != nil {
655                         return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
656                 }
657                 return nil, errors.New("dial failed")
658         }
659         c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network)
660         if err != nil {
661                 nc.Close()
662         }
663         return c, err
664 }
665
666 // Returns nil connection and nil error if no connection could be established
667 // for valid reasons.
668 func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection, err error) {
669         torrent.Add("establish outgoing connection", 1)
670         obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
671         c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
672         if err == nil {
673                 torrent.Add("initiated conn with preferred header obfuscation", 1)
674                 return
675         }
676         //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
677         if cl.config.HeaderObfuscationPolicy.RequirePreferred {
678                 // We should have just tried with the preferred header obfuscation. If it was required,
679                 // there's nothing else to try.
680                 return
681         }
682         // Try again with encryption if we didn't earlier, or without if we did.
683         c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
684         if err == nil {
685                 torrent.Add("initiated conn with fallback header obfuscation", 1)
686         }
687         //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
688         return
689 }
690
691 // Called to dial out and run a connection. The addr we're given is already
692 // considered half-open.
693 func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource, trusted bool) {
694         cl.dialRateLimiter.Wait(context.Background())
695         c, err := cl.establishOutgoingConn(t, addr)
696         cl.lock()
697         defer cl.unlock()
698         // Don't release lock between here and addConnection, unless it's for
699         // failure.
700         cl.noLongerHalfOpen(t, addr.String())
701         if err != nil {
702                 if cl.config.Debug {
703                         cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
704                 }
705                 return
706         }
707         defer c.Close()
708         c.Discovery = ps
709         c.trusted = trusted
710         cl.runHandshookConn(c, t)
711 }
712
713 // The port number for incoming peer connections. 0 if the client isn't
714 // listening.
715 func (cl *Client) incomingPeerPort() int {
716         return cl.LocalPort()
717 }
718
719 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) error {
720         if c.headerEncrypted {
721                 var rw io.ReadWriter
722                 var err error
723                 rw, c.cryptoMethod, err = mse.InitiateHandshake(
724                         struct {
725                                 io.Reader
726                                 io.Writer
727                         }{c.r, c.w},
728                         t.infoHash[:],
729                         nil,
730                         cl.config.CryptoProvides,
731                 )
732                 c.setRW(rw)
733                 if err != nil {
734                         return xerrors.Errorf("header obfuscation handshake: %w", err)
735                 }
736         }
737         ih, err := cl.connBtHandshake(c, &t.infoHash)
738         if err != nil {
739                 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
740         }
741         if ih != t.infoHash {
742                 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
743         }
744         return nil
745 }
746
747 // Calls f with any secret keys.
748 func (cl *Client) forSkeys(f func([]byte) bool) {
749         cl.lock()
750         defer cl.unlock()
751         if false { // Emulate the bug from #114
752                 var firstIh InfoHash
753                 for ih := range cl.torrents {
754                         firstIh = ih
755                         break
756                 }
757                 for range cl.torrents {
758                         if !f(firstIh[:]) {
759                                 break
760                         }
761                 }
762                 return
763         }
764         for ih := range cl.torrents {
765                 if !f(ih[:]) {
766                         break
767                 }
768         }
769 }
770
771 // Do encryption and bittorrent handshakes as receiver.
772 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
773         defer perf.ScopeTimerErr(&err)()
774         var rw io.ReadWriter
775         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
776         c.setRW(rw)
777         if err == nil || err == mse.ErrNoSecretKeyMatch {
778                 if c.headerEncrypted {
779                         torrent.Add("handshakes received encrypted", 1)
780                 } else {
781                         torrent.Add("handshakes received unencrypted", 1)
782                 }
783         } else {
784                 torrent.Add("handshakes received with error while handling encryption", 1)
785         }
786         if err != nil {
787                 if err == mse.ErrNoSecretKeyMatch {
788                         err = nil
789                 }
790                 return
791         }
792         if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
793                 err = errors.New("connection not have required header obfuscation")
794                 return
795         }
796         ih, err := cl.connBtHandshake(c, nil)
797         if err != nil {
798                 err = xerrors.Errorf("during bt handshake: %w", err)
799                 return
800         }
801         cl.lock()
802         t = cl.torrents[ih]
803         cl.unlock()
804         return
805 }
806
807 func (cl *Client) connBtHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
808         res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
809         if err != nil {
810                 return
811         }
812         ret = res.Hash
813         c.PeerExtensionBytes = res.PeerExtensionBits
814         c.PeerID = res.PeerID
815         c.completedHandshake = time.Now()
816         return
817 }
818
819 func (cl *Client) runReceivedConn(c *connection) {
820         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
821         if err != nil {
822                 panic(err)
823         }
824         t, err := cl.receiveHandshakes(c)
825         if err != nil {
826                 log.Fmsg(
827                         "error receiving handshakes: %s", err,
828                 ).AddValue(
829                         debugLogValue,
830                 ).Add(
831                         "network", c.network,
832                 ).Log(cl.logger)
833                 torrent.Add("error receiving handshake", 1)
834                 cl.lock()
835                 cl.onBadAccept(c.remoteAddr)
836                 cl.unlock()
837                 return
838         }
839         if t == nil {
840                 torrent.Add("received handshake for unloaded torrent", 1)
841                 cl.lock()
842                 cl.onBadAccept(c.remoteAddr)
843                 cl.unlock()
844                 return
845         }
846         torrent.Add("received handshake for loaded torrent", 1)
847         cl.lock()
848         defer cl.unlock()
849         cl.runHandshookConn(c, t)
850 }
851
852 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
853         c.setTorrent(t)
854         if c.PeerID == cl.peerID {
855                 if c.outgoing {
856                         connsToSelf.Add(1)
857                         addr := c.conn.RemoteAddr().String()
858                         cl.dopplegangerAddrs[addr] = struct{}{}
859                 } else {
860                         // Because the remote address is not necessarily the same as its
861                         // client's torrent listen address, we won't record the remote address
862                         // as a doppleganger. Instead, the initiator can record *us* as the
863                         // doppleganger.
864                 }
865                 return
866         }
867         c.conn.SetWriteDeadline(time.Time{})
868         c.r = deadlineReader{c.conn, c.r}
869         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
870         if connIsIpv6(c.conn) {
871                 torrent.Add("completed handshake over ipv6", 1)
872         }
873         if err := t.addConnection(c); err != nil {
874                 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
875                 return
876         }
877         defer t.dropConnection(c)
878         go c.writer(time.Minute)
879         cl.sendInitialMessages(c, t)
880         err := c.mainReadLoop()
881         if err != nil && cl.config.Debug {
882                 cl.logger.Printf("error during connection main read loop: %s", err)
883         }
884 }
885
886 // See the order given in Transmission's tr_peerMsgsNew.
887 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
888         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
889                 conn.Post(pp.Message{
890                         Type:       pp.Extended,
891                         ExtendedID: pp.HandshakeExtendedID,
892                         ExtendedPayload: func() []byte {
893                                 msg := pp.ExtendedHandshakeMessage{
894                                         M: map[pp.ExtensionName]pp.ExtensionNumber{
895                                                 pp.ExtensionNameMetadata: metadataExtendedId,
896                                         },
897                                         V:            cl.config.ExtendedHandshakeClientVersion,
898                                         Reqq:         64, // TODO: Really?
899                                         YourIp:       pp.CompactIp(conn.remoteAddr.IP),
900                                         Encryption:   cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
901                                         Port:         cl.incomingPeerPort(),
902                                         MetadataSize: torrent.metadataSize(),
903                                         // TODO: We can figured these out specific to the socket
904                                         // used.
905                                         Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
906                                         Ipv6: cl.config.PublicIp6.To16(),
907                                 }
908                                 if !cl.config.DisablePEX {
909                                         msg.M[pp.ExtensionNamePex] = pexExtendedId
910                                 }
911                                 return bencode.MustMarshal(msg)
912                         }(),
913                 })
914         }
915         func() {
916                 if conn.fastEnabled() {
917                         if torrent.haveAllPieces() {
918                                 conn.Post(pp.Message{Type: pp.HaveAll})
919                                 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
920                                 return
921                         } else if !torrent.haveAnyPieces() {
922                                 conn.Post(pp.Message{Type: pp.HaveNone})
923                                 conn.sentHaves.Clear()
924                                 return
925                         }
926                 }
927                 conn.PostBitfield()
928         }()
929         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
930                 conn.Post(pp.Message{
931                         Type: pp.Port,
932                         Port: cl.dhtPort(),
933                 })
934         }
935 }
936
937 func (cl *Client) dhtPort() (ret uint16) {
938         cl.eachDhtServer(func(s *dht.Server) {
939                 ret = uint16(missinggo.AddrPort(s.Addr()))
940         })
941         return
942 }
943
944 func (cl *Client) haveDhtServer() (ret bool) {
945         cl.eachDhtServer(func(_ *dht.Server) {
946                 ret = true
947         })
948         return
949 }
950
951 // Process incoming ut_metadata message.
952 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
953         var d map[string]int
954         err := bencode.Unmarshal(payload, &d)
955         if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
956         } else if err != nil {
957                 return fmt.Errorf("error unmarshalling bencode: %s", err)
958         }
959         msgType, ok := d["msg_type"]
960         if !ok {
961                 return errors.New("missing msg_type field")
962         }
963         piece := d["piece"]
964         switch msgType {
965         case pp.DataMetadataExtensionMsgType:
966                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
967                 if !c.requestedMetadataPiece(piece) {
968                         return fmt.Errorf("got unexpected piece %d", piece)
969                 }
970                 c.metadataRequests[piece] = false
971                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
972                 if begin < 0 || begin >= len(payload) {
973                         return fmt.Errorf("data has bad offset in payload: %d", begin)
974                 }
975                 t.saveMetadataPiece(piece, payload[begin:])
976                 c.lastUsefulChunkReceived = time.Now()
977                 return t.maybeCompleteMetadata()
978         case pp.RequestMetadataExtensionMsgType:
979                 if !t.haveMetadataPiece(piece) {
980                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
981                         return nil
982                 }
983                 start := (1 << 14) * piece
984                 c.logger.Printf("sending metadata piece %d", piece)
985                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
986                 return nil
987         case pp.RejectMetadataExtensionMsgType:
988                 return nil
989         default:
990                 return errors.New("unknown msg_type value")
991         }
992 }
993
994 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
995         if port == 0 {
996                 return true
997         }
998         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
999                 return true
1000         }
1001         if _, ok := cl.ipBlockRange(ip); ok {
1002                 return true
1003         }
1004         if _, ok := cl.badPeerIPs[ip.String()]; ok {
1005                 return true
1006         }
1007         return false
1008 }
1009
1010 // Return a Torrent ready for insertion into a Client.
1011 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1012         // use provided storage, if provided
1013         storageClient := cl.defaultStorage
1014         if specStorage != nil {
1015                 storageClient = storage.NewClient(specStorage)
1016         }
1017
1018         t = &Torrent{
1019                 cl:       cl,
1020                 infoHash: ih,
1021                 peers: prioritizedPeers{
1022                         om: btree.New(32),
1023                         getPrio: func(p Peer) peerPriority {
1024                                 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
1025                         },
1026                 },
1027                 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1028
1029                 halfOpen:          make(map[string]Peer),
1030                 pieceStateChanges: pubsub.NewPubSub(),
1031
1032                 storageOpener:       storageClient,
1033                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1034
1035                 networkingEnabled: true,
1036                 requestStrategy:   2,
1037                 metadataChanged: sync.Cond{
1038                         L: cl.locker(),
1039                 },
1040                 duplicateRequestTimeout: 1 * time.Second,
1041         }
1042         t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1043                 return fmt.Sprintf("%v: %s", t, m.Text())
1044         })
1045         t.setChunkSize(defaultChunkSize)
1046         return
1047 }
1048
1049 // A file-like handle to some torrent data resource.
1050 type Handle interface {
1051         io.Reader
1052         io.Seeker
1053         io.Closer
1054         io.ReaderAt
1055 }
1056
1057 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1058         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1059 }
1060
1061 // Adds a torrent by InfoHash with a custom Storage implementation.
1062 // If the torrent already exists then this Storage is ignored and the
1063 // existing torrent returned with `new` set to `false`
1064 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1065         cl.lock()
1066         defer cl.unlock()
1067         t, ok := cl.torrents[infoHash]
1068         if ok {
1069                 return
1070         }
1071         new = true
1072
1073         t = cl.newTorrent(infoHash, specStorage)
1074         cl.eachDhtServer(func(s *dht.Server) {
1075                 go t.dhtAnnouncer(s)
1076         })
1077         cl.torrents[infoHash] = t
1078         cl.clearAcceptLimits()
1079         t.updateWantPeersEvent()
1080         // Tickle Client.waitAccept, new torrent may want conns.
1081         cl.event.Broadcast()
1082         return
1083 }
1084
1085 // Add or merge a torrent spec. If the torrent is already present, the
1086 // trackers will be merged with the existing ones. If the Info isn't yet
1087 // known, it will be set. The display name is replaced if the new spec
1088 // provides one. Returns new if the torrent wasn't already in the client.
1089 // Note that any `Storage` defined on the spec will be ignored if the
1090 // torrent is already present (i.e. `new` return value is `true`)
1091 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1092         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1093         if spec.DisplayName != "" {
1094                 t.SetDisplayName(spec.DisplayName)
1095         }
1096         if spec.InfoBytes != nil {
1097                 err = t.SetInfoBytes(spec.InfoBytes)
1098                 if err != nil {
1099                         return
1100                 }
1101         }
1102         cl.lock()
1103         defer cl.unlock()
1104         if spec.ChunkSize != 0 {
1105                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1106         }
1107         t.addTrackers(spec.Trackers)
1108         t.maybeNewConns()
1109         return
1110 }
1111
1112 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1113         t, ok := cl.torrents[infoHash]
1114         if !ok {
1115                 err = fmt.Errorf("no such torrent")
1116                 return
1117         }
1118         err = t.close()
1119         if err != nil {
1120                 panic(err)
1121         }
1122         delete(cl.torrents, infoHash)
1123         return
1124 }
1125
1126 func (cl *Client) allTorrentsCompleted() bool {
1127         for _, t := range cl.torrents {
1128                 if !t.haveInfo() {
1129                         return false
1130                 }
1131                 if !t.haveAllPieces() {
1132                         return false
1133                 }
1134         }
1135         return true
1136 }
1137
1138 // Returns true when all torrents are completely downloaded and false if the
1139 // client is stopped before that.
1140 func (cl *Client) WaitAll() bool {
1141         cl.lock()
1142         defer cl.unlock()
1143         for !cl.allTorrentsCompleted() {
1144                 if cl.closed.IsSet() {
1145                         return false
1146                 }
1147                 cl.event.Wait()
1148         }
1149         return true
1150 }
1151
1152 // Returns handles to all the torrents loaded in the Client.
1153 func (cl *Client) Torrents() []*Torrent {
1154         cl.lock()
1155         defer cl.unlock()
1156         return cl.torrentsAsSlice()
1157 }
1158
1159 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1160         for _, t := range cl.torrents {
1161                 ret = append(ret, t)
1162         }
1163         return
1164 }
1165
1166 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1167         spec, err := TorrentSpecFromMagnetURI(uri)
1168         if err != nil {
1169                 return
1170         }
1171         T, _, err = cl.AddTorrentSpec(spec)
1172         return
1173 }
1174
1175 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1176         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1177         var ss []string
1178         slices.MakeInto(&ss, mi.Nodes)
1179         cl.AddDHTNodes(ss)
1180         return
1181 }
1182
1183 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1184         mi, err := metainfo.LoadFromFile(filename)
1185         if err != nil {
1186                 return
1187         }
1188         return cl.AddTorrent(mi)
1189 }
1190
1191 func (cl *Client) DhtServers() []*dht.Server {
1192         return cl.dhtServers
1193 }
1194
1195 func (cl *Client) AddDHTNodes(nodes []string) {
1196         for _, n := range nodes {
1197                 hmp := missinggo.SplitHostMaybePort(n)
1198                 ip := net.ParseIP(hmp.Host)
1199                 if ip == nil {
1200                         cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1201                         continue
1202                 }
1203                 ni := krpc.NodeInfo{
1204                         Addr: krpc.NodeAddr{
1205                                 IP:   ip,
1206                                 Port: hmp.Port,
1207                         },
1208                 }
1209                 cl.eachDhtServer(func(s *dht.Server) {
1210                         s.AddNode(ni)
1211                 })
1212         }
1213 }
1214
1215 func (cl *Client) banPeerIP(ip net.IP) {
1216         cl.logger.Printf("banning ip %v", ip)
1217         if cl.badPeerIPs == nil {
1218                 cl.badPeerIPs = make(map[string]struct{})
1219         }
1220         cl.badPeerIPs[ip.String()] = struct{}{}
1221 }
1222
1223 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr IpPort, network string) (c *connection) {
1224         c = &connection{
1225                 conn:            nc,
1226                 outgoing:        outgoing,
1227                 Choked:          true,
1228                 PeerChoked:      true,
1229                 PeerMaxRequests: 250,
1230                 writeBuffer:     new(bytes.Buffer),
1231                 remoteAddr:      remoteAddr,
1232                 network:         network,
1233         }
1234         c.logger = cl.logger.WithValues(c,
1235                 log.Debug, // I want messages to default to debug, and can set it here as it's only used by new code
1236         ).WithText(func(m log.Msg) string {
1237                 return fmt.Sprintf("%v: %s", c, m.Text())
1238         })
1239         c.writerCond.L = cl.locker()
1240         c.setRW(connStatsReadWriter{nc, c})
1241         c.r = &rateLimitedReader{
1242                 l: cl.config.DownloadRateLimiter,
1243                 r: c.r,
1244         }
1245         c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1246         return
1247 }
1248
1249 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1250         cl.lock()
1251         defer cl.unlock()
1252         t := cl.torrent(ih)
1253         if t == nil {
1254                 return
1255         }
1256         t.addPeers([]Peer{{
1257                 IP:     ip,
1258                 Port:   port,
1259                 Source: peerSourceDhtAnnouncePeer,
1260         }})
1261 }
1262
1263 func firstNotNil(ips ...net.IP) net.IP {
1264         for _, ip := range ips {
1265                 if ip != nil {
1266                         return ip
1267                 }
1268         }
1269         return nil
1270 }
1271
1272 func (cl *Client) eachListener(f func(socket) bool) {
1273         for _, s := range cl.conns {
1274                 if !f(s) {
1275                         break
1276                 }
1277         }
1278 }
1279
1280 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1281         cl.eachListener(func(l socket) bool {
1282                 ret = l
1283                 return !f(l)
1284         })
1285         return
1286 }
1287
1288 func (cl *Client) publicIp(peer net.IP) net.IP {
1289         // TODO: Use BEP 10 to determine how peers are seeing us.
1290         if peer.To4() != nil {
1291                 return firstNotNil(
1292                         cl.config.PublicIp4,
1293                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1294                 )
1295         } else {
1296                 return firstNotNil(
1297                         cl.config.PublicIp6,
1298                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1299                 )
1300         }
1301 }
1302
1303 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1304         return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1305                 return f(missinggo.AddrIP(l.Addr()))
1306         }).Addr())
1307 }
1308
1309 // Our IP as a peer should see it.
1310 func (cl *Client) publicAddr(peer net.IP) IpPort {
1311         return IpPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1312 }
1313
1314 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1315         cl.lock()
1316         defer cl.unlock()
1317         cl.eachListener(func(l socket) bool {
1318                 ret = append(ret, l.Addr())
1319                 return true
1320         })
1321         return
1322 }
1323
1324 func (cl *Client) onBadAccept(addr IpPort) {
1325         ip := maskIpForAcceptLimiting(addr.IP)
1326         if cl.acceptLimiter == nil {
1327                 cl.acceptLimiter = make(map[ipStr]int)
1328         }
1329         cl.acceptLimiter[ipStr(ip.String())]++
1330 }
1331
1332 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1333         if ip4 := ip.To4(); ip4 != nil {
1334                 return ip4.Mask(net.CIDRMask(24, 32))
1335         }
1336         return ip
1337 }
1338
1339 func (cl *Client) clearAcceptLimits() {
1340         cl.acceptLimiter = nil
1341 }
1342
1343 func (cl *Client) acceptLimitClearer() {
1344         for {
1345                 select {
1346                 case <-cl.closed.LockedChan(cl.locker()):
1347                         return
1348                 case <-time.After(15 * time.Minute):
1349                         cl.lock()
1350                         cl.clearAcceptLimits()
1351                         cl.unlock()
1352                 }
1353         }
1354 }
1355
1356 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1357         if cl.config.DisableAcceptRateLimiting {
1358                 return false
1359         }
1360         return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1361 }
1362
1363 func (cl *Client) rLock() {
1364         cl._mu.RLock()
1365 }
1366
1367 func (cl *Client) rUnlock() {
1368         cl._mu.RUnlock()
1369 }
1370
1371 func (cl *Client) lock() {
1372         cl._mu.Lock()
1373 }
1374
1375 func (cl *Client) unlock() {
1376         cl._mu.Unlock()
1377 }
1378
1379 func (cl *Client) locker() sync.Locker {
1380         return clientLocker{cl}
1381 }
1382
1383 func (cl *Client) String() string {
1384         return fmt.Sprintf("<%[1]T %[1]p>", cl)
1385 }
1386
1387 type clientLocker struct {
1388         *Client
1389 }
1390
1391 func (cl clientLocker) Lock() {
1392         cl.lock()
1393 }
1394
1395 func (cl clientLocker) Unlock() {
1396         cl.unlock()
1397 }