]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
cmd/btrtrc client
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "context"
6         "crypto/rand"
7         "encoding/binary"
8         "encoding/hex"
9         "errors"
10         "expvar"
11         "fmt"
12         "io"
13         "math"
14         "net"
15         "net/http"
16         "net/netip"
17         "sort"
18         "strconv"
19         "time"
20
21         "github.com/anacrolix/chansync"
22         "github.com/anacrolix/chansync/events"
23         "github.com/anacrolix/dht/v2"
24         "github.com/anacrolix/dht/v2/krpc"
25         . "github.com/anacrolix/generics"
26         g "github.com/anacrolix/generics"
27         "github.com/anacrolix/log"
28         "github.com/anacrolix/missinggo/perf"
29         "github.com/anacrolix/missinggo/v2"
30         "github.com/anacrolix/missinggo/v2/bitmap"
31         "github.com/anacrolix/missinggo/v2/pproffd"
32         "github.com/anacrolix/sync"
33         "github.com/cespare/xxhash"
34         "github.com/davecgh/go-spew/spew"
35         "github.com/dustin/go-humanize"
36         gbtree "github.com/google/btree"
37         "github.com/pion/datachannel"
38         "github.com/pion/webrtc/v3"
39
40         "github.com/anacrolix/torrent/bencode"
41         "github.com/anacrolix/torrent/internal/check"
42         "github.com/anacrolix/torrent/internal/limiter"
43         "github.com/anacrolix/torrent/iplist"
44         "github.com/anacrolix/torrent/metainfo"
45         "github.com/anacrolix/torrent/mse"
46         pp "github.com/anacrolix/torrent/peer_protocol"
47         request_strategy "github.com/anacrolix/torrent/request-strategy"
48         "github.com/anacrolix/torrent/storage"
49         "github.com/anacrolix/torrent/tracker"
50         "github.com/anacrolix/torrent/types/infohash"
51         infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
52         "github.com/anacrolix/torrent/webtorrent"
53 )
54
55 // Clients contain zero or more Torrents. A Client manages a blocklist, the
56 // TCP/UDP protocol ports, and DHT as desired.
57 type Client struct {
58         // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
59         // fields. See #262.
60         connStats ConnStats
61
62         _mu    lockWithDeferreds
63         event  sync.Cond
64         closed chansync.SetOnce
65
66         config *ClientConfig
67         logger log.Logger
68
69         peerID         PeerID
70         defaultStorage *storage.Client
71         onClose        []func()
72         dialers        []Dialer
73         listeners      []Listener
74         dhtServers     []DhtServer
75         ipBlockList    iplist.Ranger
76
77         // Set of addresses that have our client ID. This intentionally will
78         // include ourselves if we end up trying to connect to our own address
79         // through legitimate channels.
80         dopplegangerAddrs map[string]struct{}
81         badPeerIPs        map[netip.Addr]struct{}
82         // All Torrents once.
83         torrents map[*Torrent]struct{}
84         // All Torrents by their short infohashes (v1 if valid, and truncated v2 if valid). Unless the
85         // info has been obtained, there's no knowing if an infohash belongs to v1 or v2.
86         torrentsByShortHash map[InfoHash]*Torrent
87
88         pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
89
90         acceptLimiter map[ipStr]int
91         numHalfOpen   int
92
93         websocketTrackers websocketTrackers
94
95         activeAnnounceLimiter limiter.Instance
96         httpClient            *http.Client
97
98         clientHolepunchAddrSets
99
100         defaultLocalLtepProtocolMap LocalLtepProtocolMap
101 }
102
103 type ipStr string
104
105 func (cl *Client) BadPeerIPs() (ips []string) {
106         cl.rLock()
107         ips = cl.badPeerIPsLocked()
108         cl.rUnlock()
109         return
110 }
111
112 func (cl *Client) badPeerIPsLocked() (ips []string) {
113         ips = make([]string, len(cl.badPeerIPs))
114         i := 0
115         for k := range cl.badPeerIPs {
116                 ips[i] = k.String()
117                 i += 1
118         }
119         return
120 }
121
122 func (cl *Client) PeerID() PeerID {
123         return cl.peerID
124 }
125
126 // Returns the port number for the first listener that has one. No longer assumes that all port
127 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
128 // found.
129 func (cl *Client) LocalPort() (port int) {
130         for i := 0; i < len(cl.listeners); i += 1 {
131                 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
132                         return
133                 }
134         }
135         return
136 }
137
138 func writeDhtServerStatus(w io.Writer, s DhtServer) {
139         dhtStats := s.Stats()
140         fmt.Fprintf(w, " ID: %x\n", s.ID())
141         spew.Fdump(w, dhtStats)
142 }
143
144 // Writes out a human readable status of the client, such as for writing to a
145 // HTTP status page.
146 func (cl *Client) WriteStatus(_w io.Writer) {
147         cl.rLock()
148         defer cl.rUnlock()
149         w := bufio.NewWriter(_w)
150         defer w.Flush()
151         fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
152         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
153         fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
154         fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
155         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
156         cl.eachDhtServer(func(s DhtServer) {
157                 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
158                 writeDhtServerStatus(w, s)
159         })
160         dumpStats(w, cl.statsLocked())
161         torrentsSlice := cl.torrentsAsSlice()
162         fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
163         fmt.Fprintln(w)
164         sort.Slice(torrentsSlice, func(l, r int) bool {
165                 return torrentsSlice[l].canonicalShortInfohash().AsString() < torrentsSlice[r].canonicalShortInfohash().AsString()
166         })
167         for _, t := range torrentsSlice {
168                 if t.name() == "" {
169                         fmt.Fprint(w, "<unknown name>")
170                 } else {
171                         fmt.Fprint(w, t.name())
172                 }
173                 fmt.Fprint(w, "\n")
174                 if t.info != nil {
175                         fmt.Fprintf(
176                                 w,
177                                 "%f%% of %d bytes (%s)",
178                                 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
179                                 t.length(),
180                                 humanize.Bytes(uint64(t.length())))
181                 } else {
182                         w.WriteString("<missing metainfo>")
183                 }
184                 fmt.Fprint(w, "\n")
185                 t.writeStatus(w)
186                 fmt.Fprintln(w)
187         }
188 }
189
190 func (cl *Client) initLogger() {
191         logger := cl.config.Logger
192         if logger.IsZero() {
193                 logger = log.Default
194         }
195         if cl.config.Debug {
196                 logger = logger.WithFilterLevel(log.Debug)
197         }
198         cl.logger = logger.WithValues(cl)
199 }
200
201 func (cl *Client) announceKey() int32 {
202         return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
203 }
204
205 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
206 func (cl *Client) init(cfg *ClientConfig) {
207         cl.config = cfg
208         g.MakeMap(&cl.dopplegangerAddrs)
209         g.MakeMap(&cl.torrentsByShortHash)
210         g.MakeMap(&cl.torrents)
211         cl.torrentsByShortHash = make(map[metainfo.Hash]*Torrent)
212         cl.activeAnnounceLimiter.SlotsPerKey = 2
213         cl.event.L = cl.locker()
214         cl.ipBlockList = cfg.IPBlocklist
215         cl.httpClient = &http.Client{
216                 Transport: cfg.WebTransport,
217         }
218         if cl.httpClient.Transport == nil {
219                 cl.httpClient.Transport = &http.Transport{
220                         Proxy:       cfg.HTTPProxy,
221                         DialContext: cfg.HTTPDialContext,
222                         // I think this value was observed from some webseeds. It seems reasonable to extend it
223                         // to other uses of HTTP from the client.
224                         MaxConnsPerHost: 10,
225                 }
226         }
227         cl.defaultLocalLtepProtocolMap = makeBuiltinLtepProtocols(!cfg.DisablePEX)
228 }
229
230 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
231         if cfg == nil {
232                 cfg = NewDefaultClientConfig()
233                 cfg.ListenPort = 0
234         }
235         cl = &Client{}
236         cl.init(cfg)
237         go cl.acceptLimitClearer()
238         cl.initLogger()
239         defer func() {
240                 if err != nil {
241                         cl.Close()
242                         cl = nil
243                 }
244         }()
245
246         storageImpl := cfg.DefaultStorage
247         if storageImpl == nil {
248                 // We'd use mmap by default but HFS+ doesn't support sparse files.
249                 storageImplCloser := storage.NewFile(cfg.DataDir)
250                 cl.onClose = append(cl.onClose, func() {
251                         if err := storageImplCloser.Close(); err != nil {
252                                 cl.logger.Printf("error closing default storage: %s", err)
253                         }
254                 })
255                 storageImpl = storageImplCloser
256         }
257         cl.defaultStorage = storage.NewClient(storageImpl)
258
259         if cfg.PeerID != "" {
260                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
261         } else {
262                 o := copy(cl.peerID[:], cfg.Bep20)
263                 _, err = rand.Read(cl.peerID[o:])
264                 if err != nil {
265                         panic("error generating peer id")
266                 }
267         }
268
269         builtinListenNetworks := cl.listenNetworks()
270         sockets, err := listenAll(
271                 builtinListenNetworks,
272                 cl.config.ListenHost,
273                 cl.config.ListenPort,
274                 cl.firewallCallback,
275                 cl.logger,
276         )
277         if err != nil {
278                 return
279         }
280         if len(sockets) == 0 && len(builtinListenNetworks) != 0 {
281                 err = fmt.Errorf("no sockets created for networks %v", builtinListenNetworks)
282                 return
283         }
284
285         // Check for panics.
286         cl.LocalPort()
287
288         for _, _s := range sockets {
289                 s := _s // Go is fucking retarded.
290                 cl.onClose = append(cl.onClose, func() { go s.Close() })
291                 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
292                         cl.dialers = append(cl.dialers, s)
293                         cl.listeners = append(cl.listeners, s)
294                         if cl.config.AcceptPeerConnections {
295                                 go cl.acceptConnections(s)
296                         }
297                 }
298         }
299
300         go cl.forwardPort()
301         if !cfg.NoDHT {
302                 for _, s := range sockets {
303                         if pc, ok := s.(net.PacketConn); ok {
304                                 ds, err := cl.NewAnacrolixDhtServer(pc)
305                                 if err != nil {
306                                         panic(err)
307                                 }
308                                 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
309                                 cl.onClose = append(cl.onClose, func() { ds.Close() })
310                         }
311                 }
312         }
313
314         cl.websocketTrackers = websocketTrackers{
315                 PeerId: cl.peerID,
316                 Logger: cl.logger,
317                 GetAnnounceRequest: func(
318                         event tracker.AnnounceEvent, infoHash [20]byte,
319                 ) (
320                         tracker.AnnounceRequest, error,
321                 ) {
322                         cl.lock()
323                         defer cl.unlock()
324                         t, ok := cl.torrentsByShortHash[infoHash]
325                         if !ok {
326                                 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
327                         }
328                         return t.announceRequest(event, infoHash), nil
329                 },
330                 Proxy:                      cl.config.HTTPProxy,
331                 WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
332                 ICEServers:                 cl.ICEServers(),
333                 DialContext:                cl.config.TrackerDialContext,
334                 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
335                         cl.lock()
336                         defer cl.unlock()
337                         t, ok := cl.torrentsByShortHash[dcc.InfoHash]
338                         if !ok {
339                                 cl.logger.WithDefaultLevel(log.Warning).Printf(
340                                         "got webrtc conn for unloaded torrent with infohash %x",
341                                         dcc.InfoHash,
342                                 )
343                                 dc.Close()
344                                 return
345                         }
346                         go t.onWebRtcConn(dc, dcc)
347                 },
348         }
349
350         return
351 }
352
353 func (cl *Client) AddDhtServer(d DhtServer) {
354         cl.dhtServers = append(cl.dhtServers, d)
355 }
356
357 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
358 // given address for any Torrent.
359 func (cl *Client) AddDialer(d Dialer) {
360         cl.lock()
361         defer cl.unlock()
362         cl.dialers = append(cl.dialers, d)
363         for t := range cl.torrents {
364                 t.openNewConns()
365         }
366 }
367
368 func (cl *Client) Listeners() []Listener {
369         return cl.listeners
370 }
371
372 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
373 // yourself.
374 func (cl *Client) AddListener(l Listener) {
375         cl.listeners = append(cl.listeners, l)
376         if cl.config.AcceptPeerConnections {
377                 go cl.acceptConnections(l)
378         }
379 }
380
381 func (cl *Client) firewallCallback(net.Addr) bool {
382         cl.rLock()
383         block := !cl.wantConns() || !cl.config.AcceptPeerConnections
384         cl.rUnlock()
385         if block {
386                 torrent.Add("connections firewalled", 1)
387         } else {
388                 torrent.Add("connections not firewalled", 1)
389         }
390         return block
391 }
392
393 func (cl *Client) listenOnNetwork(n network) bool {
394         if n.Ipv4 && cl.config.DisableIPv4 {
395                 return false
396         }
397         if n.Ipv6 && cl.config.DisableIPv6 {
398                 return false
399         }
400         if n.Tcp && cl.config.DisableTCP {
401                 return false
402         }
403         if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
404                 return false
405         }
406         return true
407 }
408
409 func (cl *Client) listenNetworks() (ns []network) {
410         for _, n := range allPeerNetworks {
411                 if cl.listenOnNetwork(n) {
412                         ns = append(ns, n)
413                 }
414         }
415         return
416 }
417
418 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
419 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
420         logger := cl.logger.WithNames("dht", conn.LocalAddr().String())
421         cfg := dht.ServerConfig{
422                 IPBlocklist:    cl.ipBlockList,
423                 Conn:           conn,
424                 OnAnnouncePeer: cl.onDHTAnnouncePeer,
425                 PublicIP: func() net.IP {
426                         if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
427                                 return cl.config.PublicIp6
428                         }
429                         return cl.config.PublicIp4
430                 }(),
431                 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
432                 OnQuery:       cl.config.DHTOnQuery,
433                 Logger:        logger,
434         }
435         if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
436                 f(&cfg)
437         }
438         s, err = dht.NewServer(&cfg)
439         if err == nil {
440                 go s.TableMaintainer()
441         }
442         return
443 }
444
445 func (cl *Client) Closed() events.Done {
446         return cl.closed.Done()
447 }
448
449 func (cl *Client) eachDhtServer(f func(DhtServer)) {
450         for _, ds := range cl.dhtServers {
451                 f(ds)
452         }
453 }
454
455 // Stops the client. All connections to peers are closed and all activity will come to a halt.
456 func (cl *Client) Close() (errs []error) {
457         var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
458         cl.lock()
459         for t := range cl.torrents {
460                 err := t.close(&closeGroup)
461                 if err != nil {
462                         errs = append(errs, err)
463                 }
464         }
465         for i := range cl.onClose {
466                 cl.onClose[len(cl.onClose)-1-i]()
467         }
468         cl.closed.Set()
469         cl.unlock()
470         cl.event.Broadcast()
471         closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
472         return
473 }
474
475 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
476         if cl.ipBlockList == nil {
477                 return
478         }
479         return cl.ipBlockList.Lookup(ip)
480 }
481
482 func (cl *Client) ipIsBlocked(ip net.IP) bool {
483         _, blocked := cl.ipBlockRange(ip)
484         return blocked
485 }
486
487 func (cl *Client) wantConns() bool {
488         if cl.config.AlwaysWantConns {
489                 return true
490         }
491         for t := range cl.torrents {
492                 if t.wantIncomingConns() {
493                         return true
494                 }
495         }
496         return false
497 }
498
499 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
500 func (cl *Client) rejectAccepted(conn net.Conn) error {
501         if !cl.wantConns() {
502                 return errors.New("don't want conns right now")
503         }
504         ra := conn.RemoteAddr()
505         if rip := addrIpOrNil(ra); rip != nil {
506                 if cl.config.DisableIPv4Peers && rip.To4() != nil {
507                         return errors.New("ipv4 peers disabled")
508                 }
509                 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
510                         return errors.New("ipv4 disabled")
511                 }
512                 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
513                         return errors.New("ipv6 disabled")
514                 }
515                 if cl.rateLimitAccept(rip) {
516                         return errors.New("source IP accepted rate limited")
517                 }
518                 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
519                         return errors.New("bad source addr")
520                 }
521         }
522         return nil
523 }
524
525 func (cl *Client) acceptConnections(l Listener) {
526         for {
527                 conn, err := l.Accept()
528                 torrent.Add("client listener accepts", 1)
529                 if err == nil {
530                         holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr())
531                         if holepunchErr == nil {
532                                 cl.lock()
533                                 if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
534                                         setAdd(&cl.accepted, holepunchAddr)
535                                 }
536                                 if g.MapContains(
537                                         cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
538                                         holepunchAddr,
539                                 ) {
540                                         setAdd(&cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr)
541                                 }
542                                 cl.unlock()
543                         }
544                 }
545                 conn = pproffd.WrapNetConn(conn)
546                 cl.rLock()
547                 closed := cl.closed.IsSet()
548                 var reject error
549                 if !closed && conn != nil {
550                         reject = cl.rejectAccepted(conn)
551                 }
552                 cl.rUnlock()
553                 if closed {
554                         if conn != nil {
555                                 conn.Close()
556                         }
557                         return
558                 }
559                 if err != nil {
560                         log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger)
561                         continue
562                 }
563                 go func() {
564                         if reject != nil {
565                                 torrent.Add("rejected accepted connections", 1)
566                                 cl.logger.LazyLog(log.Debug, func() log.Msg {
567                                         return log.Fmsg("rejecting accepted conn: %v", reject)
568                                 })
569                                 conn.Close()
570                         } else {
571                                 go cl.incomingConnection(conn)
572                         }
573                         cl.logger.LazyLog(log.Debug, func() log.Msg {
574                                 return log.Fmsg("accepted %q connection at %q from %q",
575                                         l.Addr().Network(),
576                                         conn.LocalAddr(),
577                                         conn.RemoteAddr(),
578                                 )
579                         })
580                         torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
581                         torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
582                         torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
583                 }()
584         }
585 }
586
587 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
588 func regularNetConnPeerConnConnString(nc net.Conn) string {
589         return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
590 }
591
592 func (cl *Client) incomingConnection(nc net.Conn) {
593         defer nc.Close()
594         if tc, ok := nc.(*net.TCPConn); ok {
595                 tc.SetLinger(0)
596         }
597         remoteAddr, _ := tryIpPortFromNetAddr(nc.RemoteAddr())
598         c := cl.newConnection(
599                 nc,
600                 newConnectionOpts{
601                         outgoing:        false,
602                         remoteAddr:      nc.RemoteAddr(),
603                         localPublicAddr: cl.publicAddr(remoteAddr.IP),
604                         network:         nc.RemoteAddr().Network(),
605                         connString:      regularNetConnPeerConnConnString(nc),
606                 })
607         c.Discovery = PeerSourceIncoming
608         cl.runReceivedConn(c)
609
610         cl.lock()
611         c.close()
612         cl.unlock()
613 }
614
615 // Returns a handle to the given torrent, if it's present in the client.
616 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
617         cl.rLock()
618         defer cl.rUnlock()
619         t, ok = cl.torrentsByShortHash[ih]
620         return
621 }
622
623 type DialResult struct {
624         Conn   net.Conn
625         Dialer Dialer
626 }
627
628 func countDialResult(err error) {
629         if err == nil {
630                 torrent.Add("successful dials", 1)
631         } else {
632                 torrent.Add("unsuccessful dials", 1)
633         }
634 }
635
636 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit, pendingPeers int) (ret time.Duration) {
637         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
638         if ret < minDialTimeout {
639                 ret = minDialTimeout
640         }
641         return
642 }
643
644 // Returns whether an address is known to connect to a client with our own ID.
645 func (cl *Client) dopplegangerAddr(addr string) bool {
646         _, ok := cl.dopplegangerAddrs[addr]
647         return ok
648 }
649
650 // Returns a connection over UTP or TCP, whichever is first to connect.
651 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
652         return DialFirst(ctx, addr, cl.dialers)
653 }
654
655 // Returns a connection over UTP or TCP, whichever is first to connect.
656 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
657         pool := dialPool{
658                 addr: addr,
659         }
660         defer pool.startDrainer()
661         for _, _s := range dialers {
662                 pool.add(ctx, _s)
663         }
664         return pool.getFirst()
665 }
666
667 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
668         c, err := s.Dial(ctx, addr)
669         if err != nil {
670                 log.ContextLogger(ctx).Levelf(log.Debug, "error dialing %q: %v", addr, err)
671         }
672         // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
673         // it now in case we close the connection forthwith. Note this is also done in the TCP dialer
674         // code to increase the chance it's done.
675         if tc, ok := c.(*net.TCPConn); ok {
676                 tc.SetLinger(0)
677         }
678         countDialResult(err)
679         return c
680 }
681
682 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string, attemptKey outgoingConnAttemptKey) {
683         path := t.getHalfOpenPath(addr, attemptKey)
684         if !path.Exists() {
685                 panic("should exist")
686         }
687         path.Delete()
688         cl.numHalfOpen--
689         if cl.numHalfOpen < 0 {
690                 panic("should not be possible")
691         }
692         for t := range cl.torrents {
693                 t.openNewConns()
694         }
695 }
696
697 func (cl *Client) countHalfOpenFromTorrents() (count int) {
698         for t := range cl.torrents {
699                 count += t.numHalfOpenAttempts()
700         }
701         return
702 }
703
704 // Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection
705 // for valid reasons.
706 func (cl *Client) initiateProtocolHandshakes(
707         ctx context.Context,
708         nc net.Conn,
709         t *Torrent,
710         encryptHeader bool,
711         newConnOpts newConnectionOpts,
712 ) (
713         c *PeerConn, err error,
714 ) {
715         c = cl.newConnection(nc, newConnOpts)
716         c.headerEncrypted = encryptHeader
717         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
718         defer cancel()
719         dl, ok := ctx.Deadline()
720         if !ok {
721                 panic(ctx)
722         }
723         err = nc.SetDeadline(dl)
724         if err != nil {
725                 panic(err)
726         }
727         err = cl.initiateHandshakes(c, t)
728         return
729 }
730
731 func doProtocolHandshakeOnDialResult(
732         t *Torrent,
733         obfuscatedHeader bool,
734         addr PeerRemoteAddr,
735         dr DialResult,
736 ) (
737         c *PeerConn, err error,
738 ) {
739         cl := t.cl
740         nc := dr.Conn
741         addrIpPort, _ := tryIpPortFromNetAddr(addr)
742         c, err = cl.initiateProtocolHandshakes(
743                 context.Background(), nc, t, obfuscatedHeader,
744                 newConnectionOpts{
745                         outgoing:   true,
746                         remoteAddr: addr,
747                         // It would be possible to retrieve a public IP from the dialer used here?
748                         localPublicAddr: cl.publicAddr(addrIpPort.IP),
749                         network:         dr.Dialer.DialerNetwork(),
750                         connString:      regularNetConnPeerConnConnString(nc),
751                 })
752         if err != nil {
753                 nc.Close()
754         }
755         return c, err
756 }
757
758 // Returns nil connection and nil error if no connection could be established for valid reasons.
759 func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, err error) {
760         // It would be better if dial rate limiting could be tested when considering to open connections
761         // instead. Doing it here means if the limit is low, and the half-open limit is high, we could
762         // end up with lots of outgoing connection attempts pending that were initiated on stale data.
763         {
764                 dialReservation := cl.config.DialRateLimiter.Reserve()
765                 if !opts.receivedHolepunchConnect {
766                         if !dialReservation.OK() {
767                                 err = errors.New("can't make dial limit reservation")
768                                 return
769                         }
770                         time.Sleep(dialReservation.Delay())
771                 }
772         }
773         torrent.Add("establish outgoing connection", 1)
774         addr := opts.peerInfo.Addr
775         dialPool := dialPool{
776                 resCh: make(chan DialResult),
777                 addr:  addr.String(),
778         }
779         defer dialPool.startDrainer()
780         dialTimeout := opts.t.getDialTimeoutUnlocked()
781         {
782                 ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
783                 defer cancel()
784                 for _, d := range cl.dialers {
785                         dialPool.add(ctx, d)
786                 }
787         }
788         holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr)
789         headerObfuscationPolicy := opts.HeaderObfuscationPolicy
790         obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred
791         firstDialResult := dialPool.getFirst()
792         if firstDialResult.Conn == nil {
793                 // No dialers worked. Try to initiate a holepunching rendezvous.
794                 if holepunchAddrErr == nil {
795                         cl.lock()
796                         if !opts.receivedHolepunchConnect {
797                                 g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
798                         }
799                         if !opts.skipHolepunchRendezvous {
800                                 opts.t.trySendHolepunchRendezvous(holepunchAddr)
801                         }
802                         cl.unlock()
803                 }
804                 err = fmt.Errorf("all initial dials failed")
805                 return
806         }
807         if opts.receivedHolepunchConnect && holepunchAddrErr == nil {
808                 cl.lock()
809                 if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
810                         g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
811                 }
812                 g.MakeMapIfNil(&cl.dialedSuccessfullyAfterHolepunchConnect)
813                 g.MapInsert(cl.dialedSuccessfullyAfterHolepunchConnect, holepunchAddr, struct{}{})
814                 cl.unlock()
815         }
816         c, err = doProtocolHandshakeOnDialResult(
817                 opts.t,
818                 obfuscatedHeaderFirst,
819                 addr,
820                 firstDialResult,
821         )
822         if err == nil {
823                 torrent.Add("initiated conn with preferred header obfuscation", 1)
824                 return
825         }
826         c.logger.Levelf(
827                 log.Debug,
828                 "error doing protocol handshake with header obfuscation %v",
829                 obfuscatedHeaderFirst,
830         )
831         firstDialResult.Conn.Close()
832         // We should have just tried with the preferred header obfuscation. If it was required, there's nothing else to try.
833         if headerObfuscationPolicy.RequirePreferred {
834                 return
835         }
836         // Reuse the dialer that returned already but failed to handshake.
837         {
838                 ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
839                 defer cancel()
840                 dialPool.add(ctx, firstDialResult.Dialer)
841         }
842         secondDialResult := dialPool.getFirst()
843         if secondDialResult.Conn == nil {
844                 return
845         }
846         c, err = doProtocolHandshakeOnDialResult(
847                 opts.t,
848                 !obfuscatedHeaderFirst,
849                 addr,
850                 secondDialResult,
851         )
852         if err == nil {
853                 torrent.Add("initiated conn with fallback header obfuscation", 1)
854                 return
855         }
856         c.logger.Levelf(
857                 log.Debug,
858                 "error doing protocol handshake with header obfuscation %v",
859                 !obfuscatedHeaderFirst,
860         )
861         secondDialResult.Conn.Close()
862         return
863 }
864
865 type outgoingConnOpts struct {
866         peerInfo PeerInfo
867         t        *Torrent
868         // Don't attempt to connect unless a connect message is received after initiating a rendezvous.
869         requireRendezvous bool
870         // Don't send rendezvous requests to eligible relays.
871         skipHolepunchRendezvous bool
872         // Outgoing connection attempt is in response to holepunch connect message.
873         receivedHolepunchConnect bool
874         HeaderObfuscationPolicy  HeaderObfuscationPolicy
875 }
876
877 // Called to dial out and run a connection. The addr we're given is already
878 // considered half-open.
879 func (cl *Client) outgoingConnection(
880         opts outgoingConnOpts,
881         attemptKey outgoingConnAttemptKey,
882 ) {
883         c, err := cl.dialAndCompleteHandshake(opts)
884         if err == nil {
885                 c.conn.SetWriteDeadline(time.Time{})
886         }
887         cl.lock()
888         defer cl.unlock()
889         // Don't release lock between here and addPeerConn, unless it's for failure.
890         cl.noLongerHalfOpen(opts.t, opts.peerInfo.Addr.String(), attemptKey)
891         if err != nil {
892                 if cl.config.Debug {
893                         cl.logger.Levelf(
894                                 log.Debug,
895                                 "error establishing outgoing connection to %v: %v",
896                                 opts.peerInfo.Addr,
897                                 err,
898                         )
899                 }
900                 return
901         }
902         defer c.close()
903         c.Discovery = opts.peerInfo.Source
904         c.trusted = opts.peerInfo.Trusted
905         opts.t.runHandshookConnLoggingErr(c)
906 }
907
908 // The port number for incoming peer connections. 0 if the client isn't listening.
909 func (cl *Client) incomingPeerPort() int {
910         return cl.LocalPort()
911 }
912
913 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) (err error) {
914         if c.headerEncrypted {
915                 var rw io.ReadWriter
916                 rw, c.cryptoMethod, err = mse.InitiateHandshake(
917                         struct {
918                                 io.Reader
919                                 io.Writer
920                         }{c.r, c.w},
921                         t.canonicalShortInfohash().Bytes(),
922                         nil,
923                         cl.config.CryptoProvides,
924                 )
925                 c.setRW(rw)
926                 if err != nil {
927                         return fmt.Errorf("header obfuscation handshake: %w", err)
928                 }
929         }
930         localReservedBits := cl.config.Extensions
931         handshakeIh := *t.canonicalShortInfohash()
932         // If we're sending the v1 infohash, and we know the v2 infohash, set the v2 upgrade bit. This
933         // means the peer can send the v2 infohash in the handshake to upgrade the connection.
934         localReservedBits.SetBit(pp.ExtensionBitV2Upgrade, g.Some(handshakeIh) == t.infoHash && t.infoHashV2.Ok)
935         ih, err := cl.connBtHandshake(c, &handshakeIh, localReservedBits)
936         if err != nil {
937                 return fmt.Errorf("bittorrent protocol handshake: %w", err)
938         }
939         if g.Some(ih) == t.infoHash {
940                 return nil
941         }
942         if t.infoHashV2.Ok && *t.infoHashV2.Value.ToShort() == ih {
943                 torrent.Add("initiated handshakes upgraded to v2", 1)
944                 c.v2 = true
945                 return nil
946         }
947         err = errors.New("bittorrent protocol handshake: peer infohash didn't match")
948         return
949 }
950
951 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
952 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
953 func (cl *Client) forSkeys(f func([]byte) bool) {
954         cl.rLock()
955         defer cl.rUnlock()
956         if false { // Emulate the bug from #114
957                 var firstIh InfoHash
958                 for ih := range cl.torrentsByShortHash {
959                         firstIh = ih
960                         break
961                 }
962                 for range cl.torrentsByShortHash {
963                         if !f(firstIh[:]) {
964                                 break
965                         }
966                 }
967                 return
968         }
969         for ih := range cl.torrentsByShortHash {
970                 if !f(ih[:]) {
971                         break
972                 }
973         }
974 }
975
976 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
977         if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
978                 return ret
979         }
980         return cl.forSkeys
981 }
982
983 // Do encryption and bittorrent handshakes as receiver.
984 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
985         defer perf.ScopeTimerErr(&err)()
986         var rw io.ReadWriter
987         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(
988                 c.rw(),
989                 cl.handshakeReceiverSecretKeys(),
990                 cl.config.HeaderObfuscationPolicy,
991                 cl.config.CryptoSelector,
992         )
993         c.setRW(rw)
994         if err == nil || err == mse.ErrNoSecretKeyMatch {
995                 if c.headerEncrypted {
996                         torrent.Add("handshakes received encrypted", 1)
997                 } else {
998                         torrent.Add("handshakes received unencrypted", 1)
999                 }
1000         } else {
1001                 torrent.Add("handshakes received with error while handling encryption", 1)
1002         }
1003         if err != nil {
1004                 if err == mse.ErrNoSecretKeyMatch {
1005                         err = nil
1006                 }
1007                 return
1008         }
1009         if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
1010                 err = errors.New("connection does not have required header obfuscation")
1011                 return
1012         }
1013         ih, err := cl.connBtHandshake(c, nil, cl.config.Extensions)
1014         if err != nil {
1015                 return nil, fmt.Errorf("during bt handshake: %w", err)
1016         }
1017
1018         cl.lock()
1019         t = cl.torrentsByShortHash[ih]
1020         if t != nil && t.infoHashV2.Ok && *t.infoHashV2.Value.ToShort() == ih {
1021                 torrent.Add("v2 handshakes received", 1)
1022                 c.v2 = true
1023         }
1024         cl.unlock()
1025
1026         return
1027 }
1028
1029 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
1030
1031 func init() {
1032         torrent.Set(
1033                 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
1034                 &successfulPeerWireProtocolHandshakePeerReservedBytes)
1035 }
1036
1037 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash, reservedBits PeerExtensionBits) (ret metainfo.Hash, err error) {
1038         res, err := pp.Handshake(c.rw(), ih, cl.peerID, reservedBits)
1039         if err != nil {
1040                 return
1041         }
1042         successfulPeerWireProtocolHandshakePeerReservedBytes.Add(
1043                 hex.EncodeToString(res.PeerExtensionBits[:]), 1)
1044         ret = res.Hash
1045         c.PeerExtensionBytes = res.PeerExtensionBits
1046         c.PeerID = res.PeerID
1047         c.completedHandshake = time.Now()
1048         if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
1049                 cb(c, res.Hash)
1050         }
1051         return
1052 }
1053
1054 func (cl *Client) runReceivedConn(c *PeerConn) {
1055         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
1056         if err != nil {
1057                 panic(err)
1058         }
1059         t, err := cl.receiveHandshakes(c)
1060         if err != nil {
1061                 cl.logger.LazyLog(log.Debug, func() log.Msg {
1062                         return log.Fmsg(
1063                                 "error receiving handshakes on %v: %s", c, err,
1064                         ).Add(
1065                                 "network", c.Network,
1066                         )
1067                 })
1068                 torrent.Add("error receiving handshake", 1)
1069                 cl.lock()
1070                 cl.onBadAccept(c.RemoteAddr)
1071                 cl.unlock()
1072                 return
1073         }
1074         if t == nil {
1075                 torrent.Add("received handshake for unloaded torrent", 1)
1076                 cl.logger.LazyLog(log.Debug, func() log.Msg {
1077                         return log.Fmsg("received handshake for unloaded torrent")
1078                 })
1079                 cl.lock()
1080                 cl.onBadAccept(c.RemoteAddr)
1081                 cl.unlock()
1082                 return
1083         }
1084         torrent.Add("received handshake for loaded torrent", 1)
1085         c.conn.SetWriteDeadline(time.Time{})
1086         cl.lock()
1087         defer cl.unlock()
1088         t.runHandshookConnLoggingErr(c)
1089 }
1090
1091 // Client lock must be held before entering this.
1092 func (t *Torrent) runHandshookConn(pc *PeerConn) error {
1093         pc.setTorrent(t)
1094         cl := t.cl
1095         for i, b := range cl.config.MinPeerExtensions {
1096                 if pc.PeerExtensionBytes[i]&b != b {
1097                         return fmt.Errorf("peer did not meet minimum peer extensions: %x", pc.PeerExtensionBytes[:])
1098                 }
1099         }
1100         if pc.PeerID == cl.peerID {
1101                 if pc.outgoing {
1102                         connsToSelf.Add(1)
1103                         addr := pc.RemoteAddr.String()
1104                         cl.dopplegangerAddrs[addr] = struct{}{}
1105                 } /* else {
1106                         // Because the remote address is not necessarily the same as its client's torrent listen
1107                         // address, we won't record the remote address as a doppleganger. Instead, the initiator
1108                         // can record *us* as the doppleganger.
1109                 } */
1110                 t.logger.Levelf(log.Debug, "local and remote peer ids are the same")
1111                 return nil
1112         }
1113         pc.r = deadlineReader{pc.conn, pc.r}
1114         completedHandshakeConnectionFlags.Add(pc.connectionFlags(), 1)
1115         if connIsIpv6(pc.conn) {
1116                 torrent.Add("completed handshake over ipv6", 1)
1117         }
1118         if err := t.addPeerConn(pc); err != nil {
1119                 return fmt.Errorf("adding connection: %w", err)
1120         }
1121         defer t.dropConnection(pc)
1122         pc.addBuiltinLtepProtocols(!cl.config.DisablePEX)
1123         for _, cb := range pc.callbacks.PeerConnAdded {
1124                 cb(pc)
1125         }
1126         pc.startMessageWriter()
1127         pc.sendInitialMessages()
1128         pc.initUpdateRequestsTimer()
1129         err := pc.mainReadLoop()
1130         if err != nil {
1131                 return fmt.Errorf("main read loop: %w", err)
1132         }
1133         return nil
1134 }
1135
1136 func (p *Peer) initUpdateRequestsTimer() {
1137         if check.Enabled {
1138                 if p.updateRequestsTimer != nil {
1139                         panic(p.updateRequestsTimer)
1140                 }
1141         }
1142         if enableUpdateRequestsTimer {
1143                 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
1144         }
1145 }
1146
1147 const peerUpdateRequestsTimerReason = "updateRequestsTimer"
1148
1149 func (c *Peer) updateRequestsTimerFunc() {
1150         c.locker().Lock()
1151         defer c.locker().Unlock()
1152         if c.closed.IsSet() {
1153                 return
1154         }
1155         if c.isLowOnRequests() {
1156                 // If there are no outstanding requests, then a request update should have already run.
1157                 return
1158         }
1159         if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
1160                 // These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's
1161                 // already been fired.
1162                 torrent.Add("spurious timer requests updates", 1)
1163                 return
1164         }
1165         c.updateRequests(peerUpdateRequestsTimerReason)
1166 }
1167
1168 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1169 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1170 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1171 const localClientReqq = 1024
1172
1173 // See the order given in Transmission's tr_peerMsgsNew.
1174 func (pc *PeerConn) sendInitialMessages() {
1175         t := pc.t
1176         cl := t.cl
1177         if pc.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1178                 pc.write(pp.Message{
1179                         Type:       pp.Extended,
1180                         ExtendedID: pp.HandshakeExtendedID,
1181                         ExtendedPayload: func() []byte {
1182                                 msg := pp.ExtendedHandshakeMessage{
1183                                         V:            cl.config.ExtendedHandshakeClientVersion,
1184                                         Reqq:         localClientReqq,
1185                                         YourIp:       pp.CompactIp(pc.remoteIp()),
1186                                         Encryption:   cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1187                                         Port:         cl.incomingPeerPort(),
1188                                         MetadataSize: t.metadataSize(),
1189                                         // TODO: We can figure these out specific to the socket used.
1190                                         Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1191                                         Ipv6: cl.config.PublicIp6.To16(),
1192                                 }
1193                                 msg.M = pc.LocalLtepProtocolMap.toSupportedExtensionDict()
1194                                 return bencode.MustMarshal(msg)
1195                         }(),
1196                 })
1197         }
1198         func() {
1199                 if pc.fastEnabled() {
1200                         if t.haveAllPieces() {
1201                                 pc.write(pp.Message{Type: pp.HaveAll})
1202                                 pc.sentHaves.AddRange(0, bitmap.BitRange(pc.t.NumPieces()))
1203                                 return
1204                         } else if !t.haveAnyPieces() {
1205                                 pc.write(pp.Message{Type: pp.HaveNone})
1206                                 pc.sentHaves.Clear()
1207                                 return
1208                         }
1209                 }
1210                 pc.postBitfield()
1211         }()
1212         if pc.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1213                 pc.write(pp.Message{
1214                         Type: pp.Port,
1215                         Port: cl.dhtPort(),
1216                 })
1217         }
1218 }
1219
1220 func (cl *Client) dhtPort() (ret uint16) {
1221         if len(cl.dhtServers) == 0 {
1222                 return
1223         }
1224         return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1225 }
1226
1227 func (cl *Client) haveDhtServer() bool {
1228         return len(cl.dhtServers) > 0
1229 }
1230
1231 // Process incoming ut_metadata message.
1232 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1233         var d pp.ExtendedMetadataRequestMsg
1234         err := bencode.Unmarshal(payload, &d)
1235         if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1236         } else if err != nil {
1237                 return fmt.Errorf("error unmarshalling bencode: %s", err)
1238         }
1239         piece := d.Piece
1240         switch d.Type {
1241         case pp.DataMetadataExtensionMsgType:
1242                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1243                 if !c.requestedMetadataPiece(piece) {
1244                         return fmt.Errorf("got unexpected piece %d", piece)
1245                 }
1246                 c.metadataRequests[piece] = false
1247                 begin := len(payload) - d.PieceSize()
1248                 if begin < 0 || begin >= len(payload) {
1249                         return fmt.Errorf("data has bad offset in payload: %d", begin)
1250                 }
1251                 t.saveMetadataPiece(piece, payload[begin:])
1252                 c.lastUsefulChunkReceived = time.Now()
1253                 err = t.maybeCompleteMetadata()
1254                 if err != nil {
1255                         // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1256                         // don't know who to blame. TODO: Also errors can be returned here that aren't related
1257                         // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1258                         // log consumers can filter for this message.
1259                         t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1260                 }
1261                 return err
1262         case pp.RequestMetadataExtensionMsgType:
1263                 if !t.haveMetadataPiece(piece) {
1264                         c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1265                         return nil
1266                 }
1267                 start := (1 << 14) * piece
1268                 c.protocolLogger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1269                 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1270                 return nil
1271         case pp.RejectMetadataExtensionMsgType:
1272                 return nil
1273         default:
1274                 return errors.New("unknown msg_type value")
1275         }
1276 }
1277
1278 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1279         if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1280                 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1281         }
1282         return false
1283 }
1284
1285 // Returns whether the IP address and port are considered "bad".
1286 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1287         if port == 0 || ip == nil {
1288                 return true
1289         }
1290         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1291                 return true
1292         }
1293         if _, ok := cl.ipBlockRange(ip); ok {
1294                 return true
1295         }
1296         ipAddr, ok := netip.AddrFromSlice(ip)
1297         if !ok {
1298                 panic(ip)
1299         }
1300         if _, ok := cl.badPeerIPs[ipAddr]; ok {
1301                 return true
1302         }
1303         return false
1304 }
1305
1306 // Return a Torrent ready for insertion into a Client.
1307 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1308         return cl.newTorrentOpt(AddTorrentOpts{
1309                 InfoHash: ih,
1310                 Storage:  specStorage,
1311         })
1312 }
1313
1314 // Return a Torrent ready for insertion into a Client.
1315 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1316         var v1InfoHash g.Option[infohash.T]
1317         if !opts.InfoHash.IsZero() {
1318                 v1InfoHash.Set(opts.InfoHash)
1319         }
1320         if !v1InfoHash.Ok && !opts.InfoHashV2.Ok {
1321                 panic("v1 infohash must be nonzero or v2 infohash must be set")
1322         }
1323         // use provided storage, if provided
1324         storageClient := cl.defaultStorage
1325         if opts.Storage != nil {
1326                 storageClient = storage.NewClient(opts.Storage)
1327         }
1328
1329         t = &Torrent{
1330                 cl:         cl,
1331                 infoHash:   v1InfoHash,
1332                 infoHashV2: opts.InfoHashV2,
1333                 peers: prioritizedPeers{
1334                         om: gbtree.New(32),
1335                         getPrio: func(p PeerInfo) peerPriority {
1336                                 ipPort := p.addr()
1337                                 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1338                         },
1339                 },
1340                 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1341
1342                 storageOpener:       storageClient,
1343                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1344
1345                 metadataChanged: sync.Cond{
1346                         L: cl.locker(),
1347                 },
1348                 webSeeds:     make(map[string]*Peer),
1349                 gotMetainfoC: make(chan struct{}),
1350         }
1351         var salt [8]byte
1352         rand.Read(salt[:])
1353         t.smartBanCache.Hash = func(b []byte) uint64 {
1354                 h := xxhash.New()
1355                 h.Write(salt[:])
1356                 h.Write(b)
1357                 return h.Sum64()
1358         }
1359         t.smartBanCache.Init()
1360         t.networkingEnabled.Set()
1361         t.logger = cl.logger.WithDefaultLevel(log.Debug)
1362         t.sourcesLogger = t.logger.WithNames("sources")
1363         if opts.ChunkSize == 0 {
1364                 opts.ChunkSize = defaultChunkSize
1365         }
1366         t.setChunkSize(opts.ChunkSize)
1367         return
1368 }
1369
1370 // A file-like handle to some torrent data resource.
1371 type Handle interface {
1372         io.Reader
1373         io.Seeker
1374         io.Closer
1375         io.ReaderAt
1376 }
1377
1378 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1379         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1380 }
1381
1382 // Deprecated. Adds a torrent by InfoHash with a custom Storage implementation.
1383 // If the torrent already exists then this Storage is ignored and the
1384 // existing torrent returned with `new` set to `false`
1385 func (cl *Client) AddTorrentInfoHashWithStorage(
1386         infoHash metainfo.Hash,
1387         specStorage storage.ClientImpl,
1388 ) (t *Torrent, new bool) {
1389         cl.lock()
1390         defer cl.unlock()
1391         t, ok := cl.torrentsByShortHash[infoHash]
1392         if ok {
1393                 return
1394         }
1395         new = true
1396
1397         t = cl.newTorrent(infoHash, specStorage)
1398         cl.eachDhtServer(func(s DhtServer) {
1399                 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1400                         go t.dhtAnnouncer(s)
1401                 }
1402         })
1403         cl.torrentsByShortHash[infoHash] = t
1404         cl.torrents[t] = struct{}{}
1405         cl.clearAcceptLimits()
1406         t.updateWantPeersEvent()
1407         // Tickle Client.waitAccept, new torrent may want conns.
1408         cl.event.Broadcast()
1409         return
1410 }
1411
1412 // Adds a torrent by InfoHash with a custom Storage implementation. If the torrent already exists
1413 // then this Storage is ignored and the existing torrent returned with `new` set to `false`.
1414 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1415         infoHash := opts.InfoHash
1416         cl.lock()
1417         defer cl.unlock()
1418         t, ok := cl.torrentsByShortHash[infoHash]
1419         if ok {
1420                 return
1421         }
1422         if opts.InfoHashV2.Ok {
1423                 t, ok = cl.torrentsByShortHash[*opts.InfoHashV2.Value.ToShort()]
1424                 if ok {
1425                         return
1426                 }
1427         }
1428         new = true
1429
1430         t = cl.newTorrentOpt(opts)
1431         cl.eachDhtServer(func(s DhtServer) {
1432                 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1433                         go t.dhtAnnouncer(s)
1434                 }
1435         })
1436         cl.torrentsByShortHash[infoHash] = t
1437         cl.torrents[t] = struct{}{}
1438         t.setInfoBytesLocked(opts.InfoBytes)
1439         cl.clearAcceptLimits()
1440         t.updateWantPeersEvent()
1441         // Tickle Client.waitAccept, new torrent may want conns.
1442         cl.event.Broadcast()
1443         return
1444 }
1445
1446 type AddTorrentOpts struct {
1447         InfoHash   infohash.T
1448         InfoHashV2 g.Option[infohash_v2.T]
1449         Storage    storage.ClientImpl
1450         ChunkSize  pp.Integer
1451         InfoBytes  []byte
1452 }
1453
1454 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1455 // Torrent.MergeSpec.
1456 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1457         t, new = cl.AddTorrentOpt(AddTorrentOpts{
1458                 InfoHash:   spec.InfoHash,
1459                 InfoHashV2: spec.InfoHashV2,
1460                 Storage:    spec.Storage,
1461                 ChunkSize:  spec.ChunkSize,
1462         })
1463         modSpec := *spec
1464         if new {
1465                 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1466                 // it.
1467                 modSpec.ChunkSize = 0
1468         }
1469         err = t.MergeSpec(&modSpec)
1470         if err != nil && new {
1471                 t.Drop()
1472         }
1473         return
1474 }
1475
1476 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1477 // spec.DisallowDataDownload/Upload will be read and applied
1478 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1479 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1480         if spec.DisplayName != "" {
1481                 t.SetDisplayName(spec.DisplayName)
1482         }
1483         if spec.InfoBytes != nil {
1484                 err := t.SetInfoBytes(spec.InfoBytes)
1485                 if err != nil {
1486                         return err
1487                 }
1488         }
1489         cl := t.cl
1490         cl.AddDhtNodes(spec.DhtNodes)
1491         t.UseSources(spec.Sources)
1492         cl.lock()
1493         defer cl.unlock()
1494         t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1495         for _, url := range spec.Webseeds {
1496                 t.addWebSeed(url)
1497         }
1498         for _, peerAddr := range spec.PeerAddrs {
1499                 t.addPeer(PeerInfo{
1500                         Addr:    StringAddr(peerAddr),
1501                         Source:  PeerSourceDirect,
1502                         Trusted: true,
1503                 })
1504         }
1505         if spec.ChunkSize != 0 {
1506                 panic("chunk size cannot be changed for existing Torrent")
1507         }
1508         t.addTrackers(spec.Trackers)
1509         t.maybeNewConns()
1510         t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1511         t.dataUploadDisallowed = spec.DisallowDataUpload
1512         return errors.Join(t.addPieceLayersLocked(spec.PieceLayers)...)
1513 }
1514
1515 func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) (err error) {
1516         t.eachShortInfohash(func(short [20]byte) {
1517                 delete(cl.torrentsByShortHash, short)
1518         })
1519         err = t.close(wg)
1520         delete(cl.torrents, t)
1521         return
1522 }
1523
1524 func (cl *Client) allTorrentsCompleted() bool {
1525         for t := range cl.torrents {
1526                 if !t.haveInfo() {
1527                         return false
1528                 }
1529                 if !t.haveAllPieces() {
1530                         return false
1531                 }
1532         }
1533         return true
1534 }
1535
1536 // Returns true when all torrents are completely downloaded and false if the
1537 // client is stopped before that.
1538 func (cl *Client) WaitAll() bool {
1539         cl.lock()
1540         defer cl.unlock()
1541         for !cl.allTorrentsCompleted() {
1542                 if cl.closed.IsSet() {
1543                         return false
1544                 }
1545                 cl.event.Wait()
1546         }
1547         return true
1548 }
1549
1550 // Returns handles to all the torrents loaded in the Client.
1551 func (cl *Client) Torrents() []*Torrent {
1552         cl.rLock()
1553         defer cl.rUnlock()
1554         return cl.torrentsAsSlice()
1555 }
1556
1557 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1558         for t := range cl.torrents {
1559                 ret = append(ret, t)
1560         }
1561         return
1562 }
1563
1564 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1565         spec, err := TorrentSpecFromMagnetUri(uri)
1566         if err != nil {
1567                 return
1568         }
1569         T, _, err = cl.AddTorrentSpec(spec)
1570         return
1571 }
1572
1573 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1574         ts, err := TorrentSpecFromMetaInfoErr(mi)
1575         if err != nil {
1576                 return
1577         }
1578         T, _, err = cl.AddTorrentSpec(ts)
1579         return
1580 }
1581
1582 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1583         mi, err := metainfo.LoadFromFile(filename)
1584         if err != nil {
1585                 return
1586         }
1587         return cl.AddTorrent(mi)
1588 }
1589
1590 func (cl *Client) DhtServers() []DhtServer {
1591         return cl.dhtServers
1592 }
1593
1594 func (cl *Client) AddDhtNodes(nodes []string) {
1595         for _, n := range nodes {
1596                 hmp := missinggo.SplitHostMaybePort(n)
1597                 ip := net.ParseIP(hmp.Host)
1598                 if ip == nil {
1599                         cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1600                         continue
1601                 }
1602                 ni := krpc.NodeInfo{
1603                         Addr: krpc.NodeAddr{
1604                                 IP:   ip,
1605                                 Port: hmp.Port,
1606                         },
1607                 }
1608                 cl.eachDhtServer(func(s DhtServer) {
1609                         s.AddNode(ni)
1610                 })
1611         }
1612 }
1613
1614 func (cl *Client) banPeerIP(ip net.IP) {
1615         // We can't take this from string, because it will lose netip's v4on6. net.ParseIP parses v4
1616         // addresses directly to v4on6, which doesn't compare equal with v4.
1617         ipAddr, ok := netip.AddrFromSlice(ip)
1618         if !ok {
1619                 panic(ip)
1620         }
1621         g.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{})
1622         for t := range cl.torrents {
1623                 t.iterPeers(func(p *Peer) {
1624                         if p.remoteIp().Equal(ip) {
1625                                 t.logger.Levelf(log.Warning, "dropping peer %v with banned ip %v", p, ip)
1626                                 // Should this be a close?
1627                                 p.drop()
1628                         }
1629                 })
1630         }
1631 }
1632
1633 type newConnectionOpts struct {
1634         outgoing        bool
1635         remoteAddr      PeerRemoteAddr
1636         localPublicAddr peerLocalPublicAddr
1637         network         string
1638         connString      string
1639 }
1640
1641 func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) {
1642         if opts.network == "" {
1643                 panic(opts.remoteAddr)
1644         }
1645         c = &PeerConn{
1646                 Peer: Peer{
1647                         outgoing:        opts.outgoing,
1648                         choking:         true,
1649                         peerChoking:     true,
1650                         PeerMaxRequests: 250,
1651
1652                         RemoteAddr:      opts.remoteAddr,
1653                         localPublicAddr: opts.localPublicAddr,
1654                         Network:         opts.network,
1655                         callbacks:       &cl.config.Callbacks,
1656                 },
1657                 connString: opts.connString,
1658                 conn:       nc,
1659         }
1660         c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn
1661         c.initRequestState()
1662         // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
1663         if opts.remoteAddr != nil {
1664                 netipAddrPort, err := netip.ParseAddrPort(opts.remoteAddr.String())
1665                 if err == nil {
1666                         c.bannableAddr = Some(netipAddrPort.Addr())
1667                 }
1668         }
1669         c.peerImpl = c
1670         c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextText(fmt.Sprintf("%T %p", c, c))
1671         c.protocolLogger = c.logger.WithNames(protocolLoggingName)
1672         c.setRW(connStatsReadWriter{nc, c})
1673         c.r = &rateLimitedReader{
1674                 l: cl.config.DownloadRateLimiter,
1675                 r: c.r,
1676         }
1677         c.logger.Levelf(
1678                 log.Debug,
1679                 "inited with remoteAddr %v network %v outgoing %t",
1680                 opts.remoteAddr, opts.network, opts.outgoing,
1681         )
1682         for _, f := range cl.config.Callbacks.NewPeer {
1683                 f(&c.Peer)
1684         }
1685         return
1686 }
1687
1688 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1689         cl.lock()
1690         defer cl.unlock()
1691         t := cl.torrentsByShortHash[ih]
1692         if t == nil {
1693                 return
1694         }
1695         t.addPeers([]PeerInfo{{
1696                 Addr:   ipPortAddr{ip, port},
1697                 Source: PeerSourceDhtAnnouncePeer,
1698         }})
1699 }
1700
1701 func firstNotNil(ips ...net.IP) net.IP {
1702         for _, ip := range ips {
1703                 if ip != nil {
1704                         return ip
1705                 }
1706         }
1707         return nil
1708 }
1709
1710 func (cl *Client) eachListener(f func(Listener) bool) {
1711         for _, s := range cl.listeners {
1712                 if !f(s) {
1713                         break
1714                 }
1715         }
1716 }
1717
1718 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1719         for i := 0; i < len(cl.listeners); i += 1 {
1720                 if ret = cl.listeners[i]; f(ret) {
1721                         return
1722                 }
1723         }
1724         return nil
1725 }
1726
1727 func (cl *Client) publicIp(peer net.IP) net.IP {
1728         // TODO: Use BEP 10 to determine how peers are seeing us.
1729         if peer.To4() != nil {
1730                 return firstNotNil(
1731                         cl.config.PublicIp4,
1732                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1733                 )
1734         }
1735
1736         return firstNotNil(
1737                 cl.config.PublicIp6,
1738                 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1739         )
1740 }
1741
1742 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1743         l := cl.findListener(
1744                 func(l Listener) bool {
1745                         return f(addrIpOrNil(l.Addr()))
1746                 },
1747         )
1748         if l == nil {
1749                 return nil
1750         }
1751         return addrIpOrNil(l.Addr())
1752 }
1753
1754 // Our IP as a peer should see it.
1755 func (cl *Client) publicAddr(peer net.IP) IpPort {
1756         return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1757 }
1758
1759 // ListenAddrs addresses currently being listened to.
1760 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1761         cl.lock()
1762         ret = make([]net.Addr, len(cl.listeners))
1763         for i := 0; i < len(cl.listeners); i += 1 {
1764                 ret[i] = cl.listeners[i].Addr()
1765         }
1766         cl.unlock()
1767         return
1768 }
1769
1770 func (cl *Client) PublicIPs() (ips []net.IP) {
1771         if ip := cl.config.PublicIp4; ip != nil {
1772                 ips = append(ips, ip)
1773         }
1774         if ip := cl.config.PublicIp6; ip != nil {
1775                 ips = append(ips, ip)
1776         }
1777         return
1778 }
1779
1780 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1781         ipa, ok := tryIpPortFromNetAddr(addr)
1782         if !ok {
1783                 return
1784         }
1785         ip := maskIpForAcceptLimiting(ipa.IP)
1786         if cl.acceptLimiter == nil {
1787                 cl.acceptLimiter = make(map[ipStr]int)
1788         }
1789         cl.acceptLimiter[ipStr(ip.String())]++
1790 }
1791
1792 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1793         if ip4 := ip.To4(); ip4 != nil {
1794                 return ip4.Mask(net.CIDRMask(24, 32))
1795         }
1796         return ip
1797 }
1798
1799 func (cl *Client) clearAcceptLimits() {
1800         cl.acceptLimiter = nil
1801 }
1802
1803 func (cl *Client) acceptLimitClearer() {
1804         for {
1805                 select {
1806                 case <-cl.closed.Done():
1807                         return
1808                 case <-time.After(15 * time.Minute):
1809                         cl.lock()
1810                         cl.clearAcceptLimits()
1811                         cl.unlock()
1812                 }
1813         }
1814 }
1815
1816 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1817         if cl.config.DisableAcceptRateLimiting {
1818                 return false
1819         }
1820         return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1821 }
1822
1823 func (cl *Client) rLock() {
1824         cl._mu.RLock()
1825 }
1826
1827 func (cl *Client) rUnlock() {
1828         cl._mu.RUnlock()
1829 }
1830
1831 func (cl *Client) lock() {
1832         cl._mu.Lock()
1833 }
1834
1835 func (cl *Client) unlock() {
1836         cl._mu.Unlock()
1837 }
1838
1839 func (cl *Client) locker() *lockWithDeferreds {
1840         return &cl._mu
1841 }
1842
1843 func (cl *Client) String() string {
1844         return fmt.Sprintf("<%[1]T %[1]p>", cl)
1845 }
1846
1847 func (cl *Client) ICEServers() []webrtc.ICEServer {
1848         var ICEServers []webrtc.ICEServer
1849         if cl.config.ICEServerList != nil {
1850                 ICEServers = cl.config.ICEServerList
1851         } else if cl.config.ICEServers != nil {
1852                 ICEServers = []webrtc.ICEServer{{URLs: cl.config.ICEServers}}
1853         }
1854         return ICEServers
1855 }
1856
1857 // Returns connection-level aggregate connStats at the Client level. See the comment on
1858 // TorrentStats.ConnStats.
1859 func (cl *Client) ConnStats() ConnStats {
1860         return cl.connStats.Copy()
1861 }
1862
1863 func (cl *Client) Stats() ClientStats {
1864         cl.rLock()
1865         defer cl.rUnlock()
1866         return cl.statsLocked()
1867 }