]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Extract the request timeout stuff into requestStrategyThree
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "crypto/rand"
8         "encoding/binary"
9         "errors"
10         "fmt"
11         "io"
12         "net"
13         "net/http"
14         "net/url"
15         "strconv"
16         "strings"
17         "time"
18
19         "github.com/anacrolix/dht/v2"
20         "github.com/anacrolix/dht/v2/krpc"
21         "github.com/anacrolix/log"
22         "github.com/anacrolix/missinggo/bitmap"
23         "github.com/anacrolix/missinggo/perf"
24         "github.com/anacrolix/missinggo/pproffd"
25         "github.com/anacrolix/missinggo/pubsub"
26         "github.com/anacrolix/missinggo/slices"
27         "github.com/anacrolix/missinggo/v2"
28         "github.com/anacrolix/missinggo/v2/conntrack"
29         "github.com/anacrolix/sync"
30         "github.com/davecgh/go-spew/spew"
31         "github.com/dustin/go-humanize"
32         "github.com/google/btree"
33         "golang.org/x/time/rate"
34         "golang.org/x/xerrors"
35
36         "github.com/anacrolix/torrent/bencode"
37         "github.com/anacrolix/torrent/iplist"
38         "github.com/anacrolix/torrent/metainfo"
39         "github.com/anacrolix/torrent/mse"
40         pp "github.com/anacrolix/torrent/peer_protocol"
41         "github.com/anacrolix/torrent/storage"
42 )
43
44 // Clients contain zero or more Torrents. A Client manages a blocklist, the
45 // TCP/UDP protocol ports, and DHT as desired.
46 type Client struct {
47         // An aggregate of stats over all connections. First in struct to ensure
48         // 64-bit alignment of fields. See #262.
49         stats ConnStats
50
51         _mu    lockWithDeferreds
52         event  sync.Cond
53         closed missinggo.Event
54
55         config *ClientConfig
56         logger log.Logger
57
58         peerID         PeerID
59         defaultStorage *storage.Client
60         onClose        []func()
61         conns          []socket
62         dhtServers     []*dht.Server
63         ipBlockList    iplist.Ranger
64         // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
65         extensionBytes pp.PeerExtensionBits
66
67         // Set of addresses that have our client ID. This intentionally will
68         // include ourselves if we end up trying to connect to our own address
69         // through legitimate channels.
70         dopplegangerAddrs map[string]struct{}
71         badPeerIPs        map[string]struct{}
72         torrents          map[InfoHash]*Torrent
73
74         acceptLimiter   map[ipStr]int
75         dialRateLimiter *rate.Limiter
76 }
77
78 type ipStr string
79
80 func (cl *Client) BadPeerIPs() []string {
81         cl.rLock()
82         defer cl.rUnlock()
83         return cl.badPeerIPsLocked()
84 }
85
86 func (cl *Client) badPeerIPsLocked() []string {
87         return slices.FromMapKeys(cl.badPeerIPs).([]string)
88 }
89
90 func (cl *Client) PeerID() PeerID {
91         return cl.peerID
92 }
93
94 func (cl *Client) LocalPort() (port int) {
95         cl.eachListener(func(l socket) bool {
96                 _port := missinggo.AddrPort(l.Addr())
97                 if _port == 0 {
98                         panic(l)
99                 }
100                 if port == 0 {
101                         port = _port
102                 } else if port != _port {
103                         panic("mismatched ports")
104                 }
105                 return true
106         })
107         return
108 }
109
110 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
111         dhtStats := s.Stats()
112         fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
113         fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
114         fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
115         fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
116 }
117
118 // Writes out a human readable status of the client, such as for writing to a
119 // HTTP status page.
120 func (cl *Client) WriteStatus(_w io.Writer) {
121         cl.rLock()
122         defer cl.rUnlock()
123         w := bufio.NewWriter(_w)
124         defer w.Flush()
125         fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
126         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
127         fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
128         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
129         cl.eachDhtServer(func(s *dht.Server) {
130                 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
131                 writeDhtServerStatus(w, s)
132         })
133         spew.Fdump(w, &cl.stats)
134         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
135         fmt.Fprintln(w)
136         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
137                 return l.InfoHash().AsString() < r.InfoHash().AsString()
138         }).([]*Torrent) {
139                 if t.name() == "" {
140                         fmt.Fprint(w, "<unknown name>")
141                 } else {
142                         fmt.Fprint(w, t.name())
143                 }
144                 fmt.Fprint(w, "\n")
145                 if t.info != nil {
146                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
147                 } else {
148                         w.WriteString("<missing metainfo>")
149                 }
150                 fmt.Fprint(w, "\n")
151                 t.writeStatus(w)
152                 fmt.Fprintln(w)
153         }
154 }
155
156 const debugLogValue = log.Debug
157
158 func (cl *Client) debugLogFilter(m log.Msg) bool {
159         if cl.config.Debug {
160                 return true
161         }
162         return !m.HasValue(debugLogValue)
163 }
164
165 func (cl *Client) initLogger() {
166         cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
167 }
168
169 func (cl *Client) announceKey() int32 {
170         return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
171 }
172
173 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
174         if cfg == nil {
175                 cfg = NewDefaultClientConfig()
176                 cfg.ListenPort = 0
177         }
178         defer func() {
179                 if err != nil {
180                         cl = nil
181                 }
182         }()
183         cl = &Client{
184                 config:            cfg,
185                 dopplegangerAddrs: make(map[string]struct{}),
186                 torrents:          make(map[metainfo.Hash]*Torrent),
187                 dialRateLimiter:   rate.NewLimiter(10, 10),
188         }
189         go cl.acceptLimitClearer()
190         cl.initLogger()
191         defer func() {
192                 if err == nil {
193                         return
194                 }
195                 cl.Close()
196         }()
197         cl.extensionBytes = defaultPeerExtensionBytes()
198         cl.event.L = cl.locker()
199         storageImpl := cfg.DefaultStorage
200         if storageImpl == nil {
201                 // We'd use mmap but HFS+ doesn't support sparse files.
202                 storageImpl = storage.NewFile(cfg.DataDir)
203                 cl.onClose = append(cl.onClose, func() {
204                         if err := storageImpl.Close(); err != nil {
205                                 cl.logger.Printf("error closing default storage: %s", err)
206                         }
207                 })
208         }
209         cl.defaultStorage = storage.NewClient(storageImpl)
210         if cfg.IPBlocklist != nil {
211                 cl.ipBlockList = cfg.IPBlocklist
212         }
213
214         if cfg.PeerID != "" {
215                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
216         } else {
217                 o := copy(cl.peerID[:], cfg.Bep20)
218                 _, err = rand.Read(cl.peerID[o:])
219                 if err != nil {
220                         panic("error generating peer id")
221                 }
222         }
223
224         if cl.config.HTTPProxy == nil && cl.config.ProxyURL != "" {
225                 if fixedURL, err := url.Parse(cl.config.ProxyURL); err == nil {
226                         cl.config.HTTPProxy = http.ProxyURL(fixedURL)
227                 }
228         }
229
230         cl.conns, err = listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL, cl.firewallCallback)
231         if err != nil {
232                 return
233         }
234         // Check for panics.
235         cl.LocalPort()
236
237         for _, s := range cl.conns {
238                 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
239                         go cl.acceptConnections(s)
240                 }
241         }
242
243         go cl.forwardPort()
244         if !cfg.NoDHT {
245                 for _, s := range cl.conns {
246                         if pc, ok := s.(net.PacketConn); ok {
247                                 ds, err := cl.newDhtServer(pc)
248                                 if err != nil {
249                                         panic(err)
250                                 }
251                                 cl.dhtServers = append(cl.dhtServers, ds)
252                         }
253                 }
254         }
255
256         return
257 }
258
259 func (cl *Client) firewallCallback(net.Addr) bool {
260         cl.rLock()
261         block := !cl.wantConns()
262         cl.rUnlock()
263         if block {
264                 torrent.Add("connections firewalled", 1)
265         } else {
266                 torrent.Add("connections not firewalled", 1)
267         }
268         return block
269 }
270
271 func (cl *Client) enabledPeerNetworks() (ns []network) {
272         for _, n := range allPeerNetworks {
273                 if peerNetworkEnabled(n, cl.config) {
274                         ns = append(ns, n)
275                 }
276         }
277         return
278 }
279
280 func (cl *Client) listenOnNetwork(n network) bool {
281         if n.Ipv4 && cl.config.DisableIPv4 {
282                 return false
283         }
284         if n.Ipv6 && cl.config.DisableIPv6 {
285                 return false
286         }
287         if n.Tcp && cl.config.DisableTCP {
288                 return false
289         }
290         if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
291                 return false
292         }
293         return true
294 }
295
296 func (cl *Client) listenNetworks() (ns []network) {
297         for _, n := range allPeerNetworks {
298                 if cl.listenOnNetwork(n) {
299                         ns = append(ns, n)
300                 }
301         }
302         return
303 }
304
305 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
306         cfg := dht.ServerConfig{
307                 IPBlocklist:    cl.ipBlockList,
308                 Conn:           conn,
309                 OnAnnouncePeer: cl.onDHTAnnouncePeer,
310                 PublicIP: func() net.IP {
311                         if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
312                                 return cl.config.PublicIp6
313                         }
314                         return cl.config.PublicIp4
315                 }(),
316                 StartingNodes:      cl.config.DhtStartingNodes,
317                 ConnectionTracking: cl.config.ConnTracker,
318                 OnQuery:            cl.config.DHTOnQuery,
319                 Logger:             cl.logger.WithValues("dht", conn.LocalAddr().String()),
320         }
321         s, err = dht.NewServer(&cfg)
322         if err == nil {
323                 go func() {
324                         ts, err := s.Bootstrap()
325                         if err != nil {
326                                 cl.logger.Printf("error bootstrapping dht: %s", err)
327                         }
328                         log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
329                 }()
330         }
331         return
332 }
333
334 func (cl *Client) Closed() <-chan struct{} {
335         cl.lock()
336         defer cl.unlock()
337         return cl.closed.C()
338 }
339
340 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
341         for _, ds := range cl.dhtServers {
342                 f(ds)
343         }
344 }
345
346 func (cl *Client) closeSockets() {
347         cl.eachListener(func(l socket) bool {
348                 l.Close()
349                 return true
350         })
351         cl.conns = nil
352 }
353
354 // Stops the client. All connections to peers are closed and all activity will
355 // come to a halt.
356 func (cl *Client) Close() {
357         cl.lock()
358         defer cl.unlock()
359         cl.closed.Set()
360         cl.eachDhtServer(func(s *dht.Server) { s.Close() })
361         cl.closeSockets()
362         for _, t := range cl.torrents {
363                 t.close()
364         }
365         for _, f := range cl.onClose {
366                 f()
367         }
368         cl.event.Broadcast()
369 }
370
371 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
372         if cl.ipBlockList == nil {
373                 return
374         }
375         return cl.ipBlockList.Lookup(ip)
376 }
377
378 func (cl *Client) ipIsBlocked(ip net.IP) bool {
379         _, blocked := cl.ipBlockRange(ip)
380         return blocked
381 }
382
383 func (cl *Client) wantConns() bool {
384         for _, t := range cl.torrents {
385                 if t.wantConns() {
386                         return true
387                 }
388         }
389         return false
390 }
391
392 func (cl *Client) waitAccept() {
393         for {
394                 if cl.closed.IsSet() {
395                         return
396                 }
397                 if cl.wantConns() {
398                         return
399                 }
400                 cl.event.Wait()
401         }
402 }
403
404 func (cl *Client) rejectAccepted(conn net.Conn) error {
405         ra := conn.RemoteAddr()
406         rip := missinggo.AddrIP(ra)
407         if cl.config.DisableIPv4Peers && rip.To4() != nil {
408                 return errors.New("ipv4 peers disabled")
409         }
410         if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
411                 return errors.New("ipv4 disabled")
412
413         }
414         if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
415                 return errors.New("ipv6 disabled")
416         }
417         if cl.rateLimitAccept(rip) {
418                 return errors.New("source IP accepted rate limited")
419         }
420         if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
421                 return errors.New("bad source addr")
422         }
423         return nil
424 }
425
426 func (cl *Client) acceptConnections(l net.Listener) {
427         for {
428                 conn, err := l.Accept()
429                 torrent.Add("client listener accepts", 1)
430                 conn = pproffd.WrapNetConn(conn)
431                 cl.rLock()
432                 closed := cl.closed.IsSet()
433                 var reject error
434                 if conn != nil {
435                         reject = cl.rejectAccepted(conn)
436                 }
437                 cl.rUnlock()
438                 if closed {
439                         if conn != nil {
440                                 conn.Close()
441                         }
442                         return
443                 }
444                 if err != nil {
445                         log.Fmsg("error accepting connection: %s", err).AddValue(debugLogValue).Log(cl.logger)
446                         continue
447                 }
448                 go func() {
449                         if reject != nil {
450                                 torrent.Add("rejected accepted connections", 1)
451                                 log.Fmsg("rejecting accepted conn: %v", reject).AddValue(debugLogValue).Log(cl.logger)
452                                 conn.Close()
453                         } else {
454                                 go cl.incomingConnection(conn)
455                         }
456                         log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
457                         torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
458                         torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
459                         torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
460                 }()
461         }
462 }
463
464 func (cl *Client) incomingConnection(nc net.Conn) {
465         defer nc.Close()
466         if tc, ok := nc.(*net.TCPConn); ok {
467                 tc.SetLinger(0)
468         }
469         c := cl.newConnection(nc, false, missinggo.IpPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
470         c.Discovery = peerSourceIncoming
471         cl.runReceivedConn(c)
472 }
473
474 // Returns a handle to the given torrent, if it's present in the client.
475 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
476         cl.lock()
477         defer cl.unlock()
478         t, ok = cl.torrents[ih]
479         return
480 }
481
482 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
483         return cl.torrents[ih]
484 }
485
486 type dialResult struct {
487         Conn    net.Conn
488         Network string
489 }
490
491 func countDialResult(err error) {
492         if err == nil {
493                 torrent.Add("successful dials", 1)
494         } else {
495                 torrent.Add("unsuccessful dials", 1)
496         }
497 }
498
499 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
500         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
501         if ret < minDialTimeout {
502                 ret = minDialTimeout
503         }
504         return
505 }
506
507 // Returns whether an address is known to connect to a client with our own ID.
508 func (cl *Client) dopplegangerAddr(addr string) bool {
509         _, ok := cl.dopplegangerAddrs[addr]
510         return ok
511 }
512
513 // Returns a connection over UTP or TCP, whichever is first to connect.
514 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
515         {
516                 t := perf.NewTimer(perf.CallerName(0))
517                 defer func() {
518                         if res.Conn == nil {
519                                 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
520                         } else {
521                                 t.Mark("returned conn over " + res.Network)
522                         }
523                 }()
524         }
525         ctx, cancel := context.WithCancel(ctx)
526         // As soon as we return one connection, cancel the others.
527         defer cancel()
528         left := 0
529         resCh := make(chan dialResult, left)
530         func() {
531                 cl.lock()
532                 defer cl.unlock()
533                 cl.eachListener(func(s socket) bool {
534                         func() {
535                                 network := s.Addr().Network()
536                                 if !peerNetworkEnabled(parseNetworkString(network), cl.config) {
537                                         return
538                                 }
539                                 left++
540                                 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
541                                 go func() {
542                                         resCh <- dialResult{
543                                                 cl.dialFromSocket(ctx, s, addr),
544                                                 network,
545                                         }
546                                 }()
547                         }()
548                         return true
549                 })
550         }()
551         // Wait for a successful connection.
552         func() {
553                 defer perf.ScopeTimer()()
554                 for ; left > 0 && res.Conn == nil; left-- {
555                         res = <-resCh
556                 }
557         }()
558         // There are still incompleted dials.
559         go func() {
560                 for ; left > 0; left-- {
561                         conn := (<-resCh).Conn
562                         if conn != nil {
563                                 conn.Close()
564                         }
565                 }
566         }()
567         if res.Conn != nil {
568                 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
569         }
570         //if res.Conn != nil {
571         //      cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
572         //} else {
573         //      cl.logger.Printf("failed to dial %s", addr)
574         //}
575         return res
576 }
577
578 func (cl *Client) dialFromSocket(ctx context.Context, s socket, addr string) net.Conn {
579         network := s.Addr().Network()
580         cte := cl.config.ConnTracker.Wait(
581                 ctx,
582                 conntrack.Entry{network, s.Addr().String(), addr},
583                 "dial torrent client",
584                 0,
585         )
586         // Try to avoid committing to a dial if the context is complete as it's difficult to determine
587         // which dial errors allow us to forget the connection tracking entry handle.
588         if ctx.Err() != nil {
589                 if cte != nil {
590                         cte.Forget()
591                 }
592                 return nil
593         }
594         c, err := s.dial(ctx, addr)
595         // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
596         // it now in case we close the connection forthwith.
597         if tc, ok := c.(*net.TCPConn); ok {
598                 tc.SetLinger(0)
599         }
600         countDialResult(err)
601         if c == nil {
602                 if err != nil && forgettableDialError(err) {
603                         cte.Forget()
604                 } else {
605                         cte.Done()
606                 }
607                 return nil
608         }
609         return closeWrapper{c, func() error {
610                 err := c.Close()
611                 cte.Done()
612                 return err
613         }}
614 }
615
616 func forgettableDialError(err error) bool {
617         return strings.Contains(err.Error(), "no suitable address found")
618 }
619
620 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
621         if _, ok := t.halfOpen[addr]; !ok {
622                 panic("invariant broken")
623         }
624         delete(t.halfOpen, addr)
625         t.openNewConns()
626 }
627
628 // Performs initiator handshakes and returns a connection. Returns nil
629 // *connection if no connection for valid reasons.
630 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr IpPort, network string) (c *connection, err error) {
631         c = cl.newConnection(nc, true, remoteAddr, network)
632         c.headerEncrypted = encryptHeader
633         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
634         defer cancel()
635         dl, ok := ctx.Deadline()
636         if !ok {
637                 panic(ctx)
638         }
639         err = nc.SetDeadline(dl)
640         if err != nil {
641                 panic(err)
642         }
643         err = cl.initiateHandshakes(c, t)
644         return
645 }
646
647 // Returns nil connection and nil error if no connection could be established
648 // for valid reasons.
649 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr IpPort, obfuscatedHeader bool) (*connection, error) {
650         dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
651                 cl.rLock()
652                 defer cl.rUnlock()
653                 return t.dialTimeout()
654         }())
655         defer cancel()
656         dr := cl.dialFirst(dialCtx, addr.String())
657         nc := dr.Conn
658         if nc == nil {
659                 if dialCtx.Err() != nil {
660                         return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
661                 }
662                 return nil, errors.New("dial failed")
663         }
664         c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network)
665         if err != nil {
666                 nc.Close()
667         }
668         return c, err
669 }
670
671 // Returns nil connection and nil error if no connection could be established
672 // for valid reasons.
673 func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection, err error) {
674         torrent.Add("establish outgoing connection", 1)
675         obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
676         c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
677         if err == nil {
678                 torrent.Add("initiated conn with preferred header obfuscation", 1)
679                 return
680         }
681         //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
682         if cl.config.HeaderObfuscationPolicy.RequirePreferred {
683                 // We should have just tried with the preferred header obfuscation. If it was required,
684                 // there's nothing else to try.
685                 return
686         }
687         // Try again with encryption if we didn't earlier, or without if we did.
688         c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
689         if err == nil {
690                 torrent.Add("initiated conn with fallback header obfuscation", 1)
691         }
692         //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
693         return
694 }
695
696 // Called to dial out and run a connection. The addr we're given is already
697 // considered half-open.
698 func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource, trusted bool) {
699         cl.dialRateLimiter.Wait(context.Background())
700         c, err := cl.establishOutgoingConn(t, addr)
701         cl.lock()
702         defer cl.unlock()
703         // Don't release lock between here and addConnection, unless it's for
704         // failure.
705         cl.noLongerHalfOpen(t, addr.String())
706         if err != nil {
707                 if cl.config.Debug {
708                         cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
709                 }
710                 return
711         }
712         defer c.Close()
713         c.Discovery = ps
714         c.trusted = trusted
715         cl.runHandshookConn(c, t)
716 }
717
718 // The port number for incoming peer connections. 0 if the client isn't
719 // listening.
720 func (cl *Client) incomingPeerPort() int {
721         return cl.LocalPort()
722 }
723
724 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) error {
725         if c.headerEncrypted {
726                 var rw io.ReadWriter
727                 var err error
728                 rw, c.cryptoMethod, err = mse.InitiateHandshake(
729                         struct {
730                                 io.Reader
731                                 io.Writer
732                         }{c.r, c.w},
733                         t.infoHash[:],
734                         nil,
735                         cl.config.CryptoProvides,
736                 )
737                 c.setRW(rw)
738                 if err != nil {
739                         return xerrors.Errorf("header obfuscation handshake: %w", err)
740                 }
741         }
742         ih, err := cl.connBtHandshake(c, &t.infoHash)
743         if err != nil {
744                 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
745         }
746         if ih != t.infoHash {
747                 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
748         }
749         return nil
750 }
751
752 // Calls f with any secret keys.
753 func (cl *Client) forSkeys(f func([]byte) bool) {
754         cl.lock()
755         defer cl.unlock()
756         if false { // Emulate the bug from #114
757                 var firstIh InfoHash
758                 for ih := range cl.torrents {
759                         firstIh = ih
760                         break
761                 }
762                 for range cl.torrents {
763                         if !f(firstIh[:]) {
764                                 break
765                         }
766                 }
767                 return
768         }
769         for ih := range cl.torrents {
770                 if !f(ih[:]) {
771                         break
772                 }
773         }
774 }
775
776 // Do encryption and bittorrent handshakes as receiver.
777 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
778         defer perf.ScopeTimerErr(&err)()
779         var rw io.ReadWriter
780         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
781         c.setRW(rw)
782         if err == nil || err == mse.ErrNoSecretKeyMatch {
783                 if c.headerEncrypted {
784                         torrent.Add("handshakes received encrypted", 1)
785                 } else {
786                         torrent.Add("handshakes received unencrypted", 1)
787                 }
788         } else {
789                 torrent.Add("handshakes received with error while handling encryption", 1)
790         }
791         if err != nil {
792                 if err == mse.ErrNoSecretKeyMatch {
793                         err = nil
794                 }
795                 return
796         }
797         if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
798                 err = errors.New("connection not have required header obfuscation")
799                 return
800         }
801         ih, err := cl.connBtHandshake(c, nil)
802         if err != nil {
803                 err = xerrors.Errorf("during bt handshake: %w", err)
804                 return
805         }
806         cl.lock()
807         t = cl.torrents[ih]
808         cl.unlock()
809         return
810 }
811
812 func (cl *Client) connBtHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
813         res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
814         if err != nil {
815                 return
816         }
817         ret = res.Hash
818         c.PeerExtensionBytes = res.PeerExtensionBits
819         c.PeerID = res.PeerID
820         c.completedHandshake = time.Now()
821         return
822 }
823
824 func (cl *Client) runReceivedConn(c *connection) {
825         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
826         if err != nil {
827                 panic(err)
828         }
829         t, err := cl.receiveHandshakes(c)
830         if err != nil {
831                 log.Fmsg(
832                         "error receiving handshakes: %s", err,
833                 ).AddValue(
834                         debugLogValue,
835                 ).Add(
836                         "network", c.network,
837                 ).Log(cl.logger)
838                 torrent.Add("error receiving handshake", 1)
839                 cl.lock()
840                 cl.onBadAccept(c.remoteAddr)
841                 cl.unlock()
842                 return
843         }
844         if t == nil {
845                 torrent.Add("received handshake for unloaded torrent", 1)
846                 log.Fmsg("received handshake for unloaded torrent").AddValue(debugLogValue).Log(cl.logger)
847                 cl.lock()
848                 cl.onBadAccept(c.remoteAddr)
849                 cl.unlock()
850                 return
851         }
852         torrent.Add("received handshake for loaded torrent", 1)
853         cl.lock()
854         defer cl.unlock()
855         cl.runHandshookConn(c, t)
856 }
857
858 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
859         c.setTorrent(t)
860         if c.PeerID == cl.peerID {
861                 if c.outgoing {
862                         connsToSelf.Add(1)
863                         addr := c.conn.RemoteAddr().String()
864                         cl.dopplegangerAddrs[addr] = struct{}{}
865                 } else {
866                         // Because the remote address is not necessarily the same as its
867                         // client's torrent listen address, we won't record the remote address
868                         // as a doppleganger. Instead, the initiator can record *us* as the
869                         // doppleganger.
870                 }
871                 return
872         }
873         c.conn.SetWriteDeadline(time.Time{})
874         c.r = deadlineReader{c.conn, c.r}
875         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
876         if connIsIpv6(c.conn) {
877                 torrent.Add("completed handshake over ipv6", 1)
878         }
879         if err := t.addConnection(c); err != nil {
880                 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
881                 return
882         }
883         defer t.dropConnection(c)
884         go c.writer(time.Minute)
885         cl.sendInitialMessages(c, t)
886         err := c.mainReadLoop()
887         if err != nil && cl.config.Debug {
888                 cl.logger.Printf("error during connection main read loop: %s", err)
889         }
890 }
891
892 // See the order given in Transmission's tr_peerMsgsNew.
893 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
894         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
895                 conn.Post(pp.Message{
896                         Type:       pp.Extended,
897                         ExtendedID: pp.HandshakeExtendedID,
898                         ExtendedPayload: func() []byte {
899                                 msg := pp.ExtendedHandshakeMessage{
900                                         M: map[pp.ExtensionName]pp.ExtensionNumber{
901                                                 pp.ExtensionNameMetadata: metadataExtendedId,
902                                         },
903                                         V:            cl.config.ExtendedHandshakeClientVersion,
904                                         Reqq:         64, // TODO: Really?
905                                         YourIp:       pp.CompactIp(conn.remoteAddr.IP),
906                                         Encryption:   cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
907                                         Port:         cl.incomingPeerPort(),
908                                         MetadataSize: torrent.metadataSize(),
909                                         // TODO: We can figured these out specific to the socket
910                                         // used.
911                                         Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
912                                         Ipv6: cl.config.PublicIp6.To16(),
913                                 }
914                                 if !cl.config.DisablePEX {
915                                         msg.M[pp.ExtensionNamePex] = pexExtendedId
916                                 }
917                                 return bencode.MustMarshal(msg)
918                         }(),
919                 })
920         }
921         func() {
922                 if conn.fastEnabled() {
923                         if torrent.haveAllPieces() {
924                                 conn.Post(pp.Message{Type: pp.HaveAll})
925                                 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
926                                 return
927                         } else if !torrent.haveAnyPieces() {
928                                 conn.Post(pp.Message{Type: pp.HaveNone})
929                                 conn.sentHaves.Clear()
930                                 return
931                         }
932                 }
933                 conn.PostBitfield()
934         }()
935         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
936                 conn.Post(pp.Message{
937                         Type: pp.Port,
938                         Port: cl.dhtPort(),
939                 })
940         }
941 }
942
943 func (cl *Client) dhtPort() (ret uint16) {
944         cl.eachDhtServer(func(s *dht.Server) {
945                 ret = uint16(missinggo.AddrPort(s.Addr()))
946         })
947         return
948 }
949
950 func (cl *Client) haveDhtServer() (ret bool) {
951         cl.eachDhtServer(func(_ *dht.Server) {
952                 ret = true
953         })
954         return
955 }
956
957 // Process incoming ut_metadata message.
958 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
959         var d map[string]int
960         err := bencode.Unmarshal(payload, &d)
961         if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
962         } else if err != nil {
963                 return fmt.Errorf("error unmarshalling bencode: %s", err)
964         }
965         msgType, ok := d["msg_type"]
966         if !ok {
967                 return errors.New("missing msg_type field")
968         }
969         piece := d["piece"]
970         switch msgType {
971         case pp.DataMetadataExtensionMsgType:
972                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
973                 if !c.requestedMetadataPiece(piece) {
974                         return fmt.Errorf("got unexpected piece %d", piece)
975                 }
976                 c.metadataRequests[piece] = false
977                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
978                 if begin < 0 || begin >= len(payload) {
979                         return fmt.Errorf("data has bad offset in payload: %d", begin)
980                 }
981                 t.saveMetadataPiece(piece, payload[begin:])
982                 c.lastUsefulChunkReceived = time.Now()
983                 return t.maybeCompleteMetadata()
984         case pp.RequestMetadataExtensionMsgType:
985                 if !t.haveMetadataPiece(piece) {
986                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
987                         return nil
988                 }
989                 start := (1 << 14) * piece
990                 c.logger.Printf("sending metadata piece %d", piece)
991                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
992                 return nil
993         case pp.RejectMetadataExtensionMsgType:
994                 return nil
995         default:
996                 return errors.New("unknown msg_type value")
997         }
998 }
999
1000 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1001         if port == 0 {
1002                 return true
1003         }
1004         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1005                 return true
1006         }
1007         if _, ok := cl.ipBlockRange(ip); ok {
1008                 return true
1009         }
1010         if _, ok := cl.badPeerIPs[ip.String()]; ok {
1011                 return true
1012         }
1013         return false
1014 }
1015
1016 // Return a Torrent ready for insertion into a Client.
1017 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1018         // use provided storage, if provided
1019         storageClient := cl.defaultStorage
1020         if specStorage != nil {
1021                 storageClient = storage.NewClient(specStorage)
1022         }
1023
1024         t = &Torrent{
1025                 cl:       cl,
1026                 infoHash: ih,
1027                 peers: prioritizedPeers{
1028                         om: btree.New(32),
1029                         getPrio: func(p Peer) peerPriority {
1030                                 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
1031                         },
1032                 },
1033                 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1034
1035                 halfOpen:          make(map[string]Peer),
1036                 pieceStateChanges: pubsub.NewPubSub(),
1037
1038                 storageOpener:       storageClient,
1039                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1040
1041                 networkingEnabled: true,
1042                 requestStrategy:   cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks()),
1043                 metadataChanged: sync.Cond{
1044                         L: cl.locker(),
1045                 },
1046         }
1047         t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1048                 return fmt.Sprintf("%v: %s", t, m.Text())
1049         })
1050         t.setChunkSize(defaultChunkSize)
1051         return
1052 }
1053
1054 // A file-like handle to some torrent data resource.
1055 type Handle interface {
1056         io.Reader
1057         io.Seeker
1058         io.Closer
1059         io.ReaderAt
1060 }
1061
1062 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1063         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1064 }
1065
1066 // Adds a torrent by InfoHash with a custom Storage implementation.
1067 // If the torrent already exists then this Storage is ignored and the
1068 // existing torrent returned with `new` set to `false`
1069 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1070         cl.lock()
1071         defer cl.unlock()
1072         t, ok := cl.torrents[infoHash]
1073         if ok {
1074                 return
1075         }
1076         new = true
1077
1078         t = cl.newTorrent(infoHash, specStorage)
1079         cl.eachDhtServer(func(s *dht.Server) {
1080                 go t.dhtAnnouncer(s)
1081         })
1082         cl.torrents[infoHash] = t
1083         cl.clearAcceptLimits()
1084         t.updateWantPeersEvent()
1085         // Tickle Client.waitAccept, new torrent may want conns.
1086         cl.event.Broadcast()
1087         return
1088 }
1089
1090 // Add or merge a torrent spec. If the torrent is already present, the
1091 // trackers will be merged with the existing ones. If the Info isn't yet
1092 // known, it will be set. The display name is replaced if the new spec
1093 // provides one. Returns new if the torrent wasn't already in the client.
1094 // Note that any `Storage` defined on the spec will be ignored if the
1095 // torrent is already present (i.e. `new` return value is `true`)
1096 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1097         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1098         if spec.DisplayName != "" {
1099                 t.SetDisplayName(spec.DisplayName)
1100         }
1101         if spec.InfoBytes != nil {
1102                 err = t.SetInfoBytes(spec.InfoBytes)
1103                 if err != nil {
1104                         return
1105                 }
1106         }
1107         cl.lock()
1108         defer cl.unlock()
1109         if spec.ChunkSize != 0 {
1110                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1111         }
1112         t.addTrackers(spec.Trackers)
1113         t.maybeNewConns()
1114         return
1115 }
1116
1117 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1118         t, ok := cl.torrents[infoHash]
1119         if !ok {
1120                 err = fmt.Errorf("no such torrent")
1121                 return
1122         }
1123         err = t.close()
1124         if err != nil {
1125                 panic(err)
1126         }
1127         delete(cl.torrents, infoHash)
1128         return
1129 }
1130
1131 func (cl *Client) allTorrentsCompleted() bool {
1132         for _, t := range cl.torrents {
1133                 if !t.haveInfo() {
1134                         return false
1135                 }
1136                 if !t.haveAllPieces() {
1137                         return false
1138                 }
1139         }
1140         return true
1141 }
1142
1143 // Returns true when all torrents are completely downloaded and false if the
1144 // client is stopped before that.
1145 func (cl *Client) WaitAll() bool {
1146         cl.lock()
1147         defer cl.unlock()
1148         for !cl.allTorrentsCompleted() {
1149                 if cl.closed.IsSet() {
1150                         return false
1151                 }
1152                 cl.event.Wait()
1153         }
1154         return true
1155 }
1156
1157 // Returns handles to all the torrents loaded in the Client.
1158 func (cl *Client) Torrents() []*Torrent {
1159         cl.lock()
1160         defer cl.unlock()
1161         return cl.torrentsAsSlice()
1162 }
1163
1164 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1165         for _, t := range cl.torrents {
1166                 ret = append(ret, t)
1167         }
1168         return
1169 }
1170
1171 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1172         spec, err := TorrentSpecFromMagnetURI(uri)
1173         if err != nil {
1174                 return
1175         }
1176         T, _, err = cl.AddTorrentSpec(spec)
1177         return
1178 }
1179
1180 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1181         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1182         var ss []string
1183         slices.MakeInto(&ss, mi.Nodes)
1184         cl.AddDHTNodes(ss)
1185         return
1186 }
1187
1188 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1189         mi, err := metainfo.LoadFromFile(filename)
1190         if err != nil {
1191                 return
1192         }
1193         return cl.AddTorrent(mi)
1194 }
1195
1196 func (cl *Client) DhtServers() []*dht.Server {
1197         return cl.dhtServers
1198 }
1199
1200 func (cl *Client) AddDHTNodes(nodes []string) {
1201         for _, n := range nodes {
1202                 hmp := missinggo.SplitHostMaybePort(n)
1203                 ip := net.ParseIP(hmp.Host)
1204                 if ip == nil {
1205                         cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1206                         continue
1207                 }
1208                 ni := krpc.NodeInfo{
1209                         Addr: krpc.NodeAddr{
1210                                 IP:   ip,
1211                                 Port: hmp.Port,
1212                         },
1213                 }
1214                 cl.eachDhtServer(func(s *dht.Server) {
1215                         s.AddNode(ni)
1216                 })
1217         }
1218 }
1219
1220 func (cl *Client) banPeerIP(ip net.IP) {
1221         cl.logger.Printf("banning ip %v", ip)
1222         if cl.badPeerIPs == nil {
1223                 cl.badPeerIPs = make(map[string]struct{})
1224         }
1225         cl.badPeerIPs[ip.String()] = struct{}{}
1226 }
1227
1228 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr IpPort, network string) (c *connection) {
1229         c = &connection{
1230                 conn:            nc,
1231                 outgoing:        outgoing,
1232                 Choked:          true,
1233                 PeerChoked:      true,
1234                 PeerMaxRequests: 250,
1235                 writeBuffer:     new(bytes.Buffer),
1236                 remoteAddr:      remoteAddr,
1237                 network:         network,
1238         }
1239         c.logger = cl.logger.WithValues(c,
1240                 log.Debug, // I want messages to default to debug, and can set it here as it's only used by new code
1241         ).WithText(func(m log.Msg) string {
1242                 return fmt.Sprintf("%v: %s", c, m.Text())
1243         })
1244         c.writerCond.L = cl.locker()
1245         c.setRW(connStatsReadWriter{nc, c})
1246         c.r = &rateLimitedReader{
1247                 l: cl.config.DownloadRateLimiter,
1248                 r: c.r,
1249         }
1250         c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1251         return
1252 }
1253
1254 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1255         cl.lock()
1256         defer cl.unlock()
1257         t := cl.torrent(ih)
1258         if t == nil {
1259                 return
1260         }
1261         t.addPeers([]Peer{{
1262                 IP:     ip,
1263                 Port:   port,
1264                 Source: peerSourceDhtAnnouncePeer,
1265         }})
1266 }
1267
1268 func firstNotNil(ips ...net.IP) net.IP {
1269         for _, ip := range ips {
1270                 if ip != nil {
1271                         return ip
1272                 }
1273         }
1274         return nil
1275 }
1276
1277 func (cl *Client) eachListener(f func(socket) bool) {
1278         for _, s := range cl.conns {
1279                 if !f(s) {
1280                         break
1281                 }
1282         }
1283 }
1284
1285 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1286         cl.eachListener(func(l socket) bool {
1287                 ret = l
1288                 return !f(l)
1289         })
1290         return
1291 }
1292
1293 func (cl *Client) publicIp(peer net.IP) net.IP {
1294         // TODO: Use BEP 10 to determine how peers are seeing us.
1295         if peer.To4() != nil {
1296                 return firstNotNil(
1297                         cl.config.PublicIp4,
1298                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1299                 )
1300         }
1301
1302         return firstNotNil(
1303                 cl.config.PublicIp6,
1304                 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1305         )
1306 }
1307
1308 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1309         return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1310                 return f(missinggo.AddrIP(l.Addr()))
1311         }).Addr())
1312 }
1313
1314 // Our IP as a peer should see it.
1315 func (cl *Client) publicAddr(peer net.IP) IpPort {
1316         return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1317 }
1318
1319 // ListenAddrs addresses currently being listened to.
1320 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1321         cl.lock()
1322         defer cl.unlock()
1323         cl.eachListener(func(l socket) bool {
1324                 ret = append(ret, l.Addr())
1325                 return true
1326         })
1327         return
1328 }
1329
1330 func (cl *Client) onBadAccept(addr IpPort) {
1331         ip := maskIpForAcceptLimiting(addr.IP)
1332         if cl.acceptLimiter == nil {
1333                 cl.acceptLimiter = make(map[ipStr]int)
1334         }
1335         cl.acceptLimiter[ipStr(ip.String())]++
1336 }
1337
1338 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1339         if ip4 := ip.To4(); ip4 != nil {
1340                 return ip4.Mask(net.CIDRMask(24, 32))
1341         }
1342         return ip
1343 }
1344
1345 func (cl *Client) clearAcceptLimits() {
1346         cl.acceptLimiter = nil
1347 }
1348
1349 func (cl *Client) acceptLimitClearer() {
1350         for {
1351                 select {
1352                 case <-cl.closed.LockedChan(cl.locker()):
1353                         return
1354                 case <-time.After(15 * time.Minute):
1355                         cl.lock()
1356                         cl.clearAcceptLimits()
1357                         cl.unlock()
1358                 }
1359         }
1360 }
1361
1362 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1363         if cl.config.DisableAcceptRateLimiting {
1364                 return false
1365         }
1366         return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1367 }
1368
1369 func (cl *Client) rLock() {
1370         cl._mu.RLock()
1371 }
1372
1373 func (cl *Client) rUnlock() {
1374         cl._mu.RUnlock()
1375 }
1376
1377 func (cl *Client) lock() {
1378         cl._mu.Lock()
1379 }
1380
1381 func (cl *Client) unlock() {
1382         cl._mu.Unlock()
1383 }
1384
1385 func (cl *Client) locker() sync.Locker {
1386         return clientLocker{cl}
1387 }
1388
1389 func (cl *Client) String() string {
1390         return fmt.Sprintf("<%[1]T %[1]p>", cl)
1391 }
1392
1393 type clientLocker struct {
1394         *Client
1395 }
1396
1397 func (cl clientLocker) Lock() {
1398         cl.lock()
1399 }
1400
1401 func (cl clientLocker) Unlock() {
1402         cl.unlock()
1403 }