]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Add TestWebseedPartialSeed
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "context"
6         "crypto/rand"
7         "encoding/binary"
8         "encoding/hex"
9         "errors"
10         "expvar"
11         "fmt"
12         "io"
13         "math"
14         "net"
15         "net/http"
16         "net/netip"
17         "sort"
18         "strconv"
19         "time"
20
21         "github.com/anacrolix/chansync"
22         "github.com/anacrolix/chansync/events"
23         "github.com/anacrolix/dht/v2"
24         "github.com/anacrolix/dht/v2/krpc"
25         . "github.com/anacrolix/generics"
26         g "github.com/anacrolix/generics"
27         "github.com/anacrolix/log"
28         "github.com/anacrolix/missinggo/perf"
29         "github.com/anacrolix/missinggo/v2"
30         "github.com/anacrolix/missinggo/v2/bitmap"
31         "github.com/anacrolix/missinggo/v2/pproffd"
32         "github.com/anacrolix/sync"
33         "github.com/cespare/xxhash"
34         "github.com/davecgh/go-spew/spew"
35         "github.com/dustin/go-humanize"
36         gbtree "github.com/google/btree"
37         "github.com/pion/datachannel"
38         "github.com/pion/webrtc/v3"
39
40         "github.com/anacrolix/torrent/bencode"
41         "github.com/anacrolix/torrent/internal/check"
42         "github.com/anacrolix/torrent/internal/limiter"
43         "github.com/anacrolix/torrent/iplist"
44         "github.com/anacrolix/torrent/metainfo"
45         "github.com/anacrolix/torrent/mse"
46         pp "github.com/anacrolix/torrent/peer_protocol"
47         request_strategy "github.com/anacrolix/torrent/request-strategy"
48         "github.com/anacrolix/torrent/storage"
49         "github.com/anacrolix/torrent/tracker"
50         "github.com/anacrolix/torrent/types/infohash"
51         infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
52         "github.com/anacrolix/torrent/webtorrent"
53 )
54
55 // Clients contain zero or more Torrents. A Client manages a blocklist, the
56 // TCP/UDP protocol ports, and DHT as desired.
57 type Client struct {
58         // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
59         // fields. See #262.
60         connStats ConnStats
61
62         _mu    lockWithDeferreds
63         event  sync.Cond
64         closed chansync.SetOnce
65
66         config *ClientConfig
67         logger log.Logger
68
69         peerID         PeerID
70         defaultStorage *storage.Client
71         onClose        []func()
72         dialers        []Dialer
73         listeners      []Listener
74         dhtServers     []DhtServer
75         ipBlockList    iplist.Ranger
76
77         // Set of addresses that have our client ID. This intentionally will
78         // include ourselves if we end up trying to connect to our own address
79         // through legitimate channels.
80         dopplegangerAddrs map[string]struct{}
81         badPeerIPs        map[netip.Addr]struct{}
82         // All Torrents once.
83         torrents map[*Torrent]struct{}
84         // All Torrents by their short infohashes (v1 if valid, and truncated v2 if valid). Unless the
85         // info has been obtained, there's no knowing if an infohash belongs to v1 or v2.
86         torrentsByShortHash map[InfoHash]*Torrent
87
88         pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
89
90         acceptLimiter map[ipStr]int
91         numHalfOpen   int
92
93         websocketTrackers websocketTrackers
94
95         activeAnnounceLimiter limiter.Instance
96         httpClient            *http.Client
97
98         clientHolepunchAddrSets
99
100         defaultLocalLtepProtocolMap LocalLtepProtocolMap
101 }
102
103 type ipStr string
104
105 func (cl *Client) BadPeerIPs() (ips []string) {
106         cl.rLock()
107         ips = cl.badPeerIPsLocked()
108         cl.rUnlock()
109         return
110 }
111
112 func (cl *Client) badPeerIPsLocked() (ips []string) {
113         ips = make([]string, len(cl.badPeerIPs))
114         i := 0
115         for k := range cl.badPeerIPs {
116                 ips[i] = k.String()
117                 i += 1
118         }
119         return
120 }
121
122 func (cl *Client) PeerID() PeerID {
123         return cl.peerID
124 }
125
126 // Returns the port number for the first listener that has one. No longer assumes that all port
127 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
128 // found.
129 func (cl *Client) LocalPort() (port int) {
130         for i := 0; i < len(cl.listeners); i += 1 {
131                 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
132                         return
133                 }
134         }
135         return
136 }
137
138 func writeDhtServerStatus(w io.Writer, s DhtServer) {
139         dhtStats := s.Stats()
140         fmt.Fprintf(w, " ID: %x\n", s.ID())
141         spew.Fdump(w, dhtStats)
142 }
143
144 // Writes out a human readable status of the client, such as for writing to a
145 // HTTP status page.
146 func (cl *Client) WriteStatus(_w io.Writer) {
147         cl.rLock()
148         defer cl.rUnlock()
149         w := bufio.NewWriter(_w)
150         defer w.Flush()
151         fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
152         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
153         fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
154         fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
155         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
156         cl.eachDhtServer(func(s DhtServer) {
157                 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
158                 writeDhtServerStatus(w, s)
159         })
160         dumpStats(w, cl.statsLocked())
161         torrentsSlice := cl.torrentsAsSlice()
162         fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
163         fmt.Fprintln(w)
164         sort.Slice(torrentsSlice, func(l, r int) bool {
165                 return torrentsSlice[l].canonicalShortInfohash().AsString() < torrentsSlice[r].canonicalShortInfohash().AsString()
166         })
167         for _, t := range torrentsSlice {
168                 if t.name() == "" {
169                         fmt.Fprint(w, "<unknown name>")
170                 } else {
171                         fmt.Fprint(w, t.name())
172                 }
173                 fmt.Fprint(w, "\n")
174                 if t.info != nil {
175                         fmt.Fprintf(
176                                 w,
177                                 "%f%% of %d bytes (%s)",
178                                 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
179                                 t.length(),
180                                 humanize.Bytes(uint64(t.length())))
181                 } else {
182                         w.WriteString("<missing metainfo>")
183                 }
184                 fmt.Fprint(w, "\n")
185                 t.writeStatus(w)
186                 fmt.Fprintln(w)
187         }
188 }
189
190 func (cl *Client) initLogger() {
191         logger := cl.config.Logger
192         if logger.IsZero() {
193                 logger = log.Default
194         }
195         if cl.config.Debug {
196                 logger = logger.WithFilterLevel(log.Debug)
197         }
198         cl.logger = logger.WithValues(cl)
199 }
200
201 func (cl *Client) announceKey() int32 {
202         return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
203 }
204
205 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
206 func (cl *Client) init(cfg *ClientConfig) {
207         cl.config = cfg
208         g.MakeMap(&cl.dopplegangerAddrs)
209         g.MakeMap(&cl.torrentsByShortHash)
210         g.MakeMap(&cl.torrents)
211         cl.torrentsByShortHash = make(map[metainfo.Hash]*Torrent)
212         cl.activeAnnounceLimiter.SlotsPerKey = 2
213         cl.event.L = cl.locker()
214         cl.ipBlockList = cfg.IPBlocklist
215         cl.httpClient = &http.Client{
216                 Transport: cfg.WebTransport,
217         }
218         if cl.httpClient.Transport == nil {
219                 cl.httpClient.Transport = &http.Transport{
220                         Proxy:       cfg.HTTPProxy,
221                         DialContext: cfg.HTTPDialContext,
222                         // I think this value was observed from some webseeds. It seems reasonable to extend it
223                         // to other uses of HTTP from the client.
224                         MaxConnsPerHost: 10,
225                 }
226         }
227         cl.defaultLocalLtepProtocolMap = makeBuiltinLtepProtocols(!cfg.DisablePEX)
228 }
229
230 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
231         if cfg == nil {
232                 cfg = NewDefaultClientConfig()
233                 cfg.ListenPort = 0
234         }
235         cl = &Client{}
236         cl.init(cfg)
237         go cl.acceptLimitClearer()
238         cl.initLogger()
239         defer func() {
240                 if err != nil {
241                         cl.Close()
242                         cl = nil
243                 }
244         }()
245
246         storageImpl := cfg.DefaultStorage
247         if storageImpl == nil {
248                 // We'd use mmap by default but HFS+ doesn't support sparse files.
249                 storageImplCloser := storage.NewFile(cfg.DataDir)
250                 cl.onClose = append(cl.onClose, func() {
251                         if err := storageImplCloser.Close(); err != nil {
252                                 cl.logger.Printf("error closing default storage: %s", err)
253                         }
254                 })
255                 storageImpl = storageImplCloser
256         }
257         cl.defaultStorage = storage.NewClient(storageImpl)
258
259         if cfg.PeerID != "" {
260                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
261         } else {
262                 o := copy(cl.peerID[:], cfg.Bep20)
263                 _, err = rand.Read(cl.peerID[o:])
264                 if err != nil {
265                         panic("error generating peer id")
266                 }
267         }
268
269         builtinListenNetworks := cl.listenNetworks()
270         sockets, err := listenAll(
271                 builtinListenNetworks,
272                 cl.config.ListenHost,
273                 cl.config.ListenPort,
274                 cl.firewallCallback,
275                 cl.logger,
276         )
277         if err != nil {
278                 return
279         }
280         if len(sockets) == 0 && len(builtinListenNetworks) != 0 {
281                 err = fmt.Errorf("no sockets created for networks %v", builtinListenNetworks)
282                 return
283         }
284
285         // Check for panics.
286         cl.LocalPort()
287
288         for _, _s := range sockets {
289                 s := _s // Go is fucking retarded.
290                 cl.onClose = append(cl.onClose, func() { go s.Close() })
291                 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
292                         cl.dialers = append(cl.dialers, s)
293                         cl.listeners = append(cl.listeners, s)
294                         if cl.config.AcceptPeerConnections {
295                                 go cl.acceptConnections(s)
296                         }
297                 }
298         }
299
300         go cl.forwardPort()
301         if !cfg.NoDHT {
302                 for _, s := range sockets {
303                         if pc, ok := s.(net.PacketConn); ok {
304                                 ds, err := cl.NewAnacrolixDhtServer(pc)
305                                 if err != nil {
306                                         panic(err)
307                                 }
308                                 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
309                                 cl.onClose = append(cl.onClose, func() { ds.Close() })
310                         }
311                 }
312         }
313
314         var ICEServers []webrtc.ICEServer
315         if cl.config.ICEServerList != nil {
316                 ICEServers = cl.config.ICEServerList
317         } else if cl.config.ICEServers != nil {
318                 ICEServers = []webrtc.ICEServer{{URLs: cl.config.ICEServers}}
319         }
320
321         cl.websocketTrackers = websocketTrackers{
322                 PeerId: cl.peerID,
323                 Logger: cl.logger,
324                 GetAnnounceRequest: func(
325                         event tracker.AnnounceEvent, infoHash [20]byte,
326                 ) (
327                         tracker.AnnounceRequest, error,
328                 ) {
329                         cl.lock()
330                         defer cl.unlock()
331                         t, ok := cl.torrentsByShortHash[infoHash]
332                         if !ok {
333                                 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
334                         }
335                         return t.announceRequest(event, infoHash), nil
336                 },
337                 Proxy:                      cl.config.HTTPProxy,
338                 WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
339                 ICEServers:                 ICEServers,
340                 DialContext:                cl.config.TrackerDialContext,
341                 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
342                         cl.lock()
343                         defer cl.unlock()
344                         t, ok := cl.torrentsByShortHash[dcc.InfoHash]
345                         if !ok {
346                                 cl.logger.WithDefaultLevel(log.Warning).Printf(
347                                         "got webrtc conn for unloaded torrent with infohash %x",
348                                         dcc.InfoHash,
349                                 )
350                                 dc.Close()
351                                 return
352                         }
353                         go t.onWebRtcConn(dc, dcc)
354                 },
355         }
356
357         return
358 }
359
360 func (cl *Client) AddDhtServer(d DhtServer) {
361         cl.dhtServers = append(cl.dhtServers, d)
362 }
363
364 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
365 // given address for any Torrent.
366 func (cl *Client) AddDialer(d Dialer) {
367         cl.lock()
368         defer cl.unlock()
369         cl.dialers = append(cl.dialers, d)
370         for t := range cl.torrents {
371                 t.openNewConns()
372         }
373 }
374
375 func (cl *Client) Listeners() []Listener {
376         return cl.listeners
377 }
378
379 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
380 // yourself.
381 func (cl *Client) AddListener(l Listener) {
382         cl.listeners = append(cl.listeners, l)
383         if cl.config.AcceptPeerConnections {
384                 go cl.acceptConnections(l)
385         }
386 }
387
388 func (cl *Client) firewallCallback(net.Addr) bool {
389         cl.rLock()
390         block := !cl.wantConns() || !cl.config.AcceptPeerConnections
391         cl.rUnlock()
392         if block {
393                 torrent.Add("connections firewalled", 1)
394         } else {
395                 torrent.Add("connections not firewalled", 1)
396         }
397         return block
398 }
399
400 func (cl *Client) listenOnNetwork(n network) bool {
401         if n.Ipv4 && cl.config.DisableIPv4 {
402                 return false
403         }
404         if n.Ipv6 && cl.config.DisableIPv6 {
405                 return false
406         }
407         if n.Tcp && cl.config.DisableTCP {
408                 return false
409         }
410         if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
411                 return false
412         }
413         return true
414 }
415
416 func (cl *Client) listenNetworks() (ns []network) {
417         for _, n := range allPeerNetworks {
418                 if cl.listenOnNetwork(n) {
419                         ns = append(ns, n)
420                 }
421         }
422         return
423 }
424
425 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
426 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
427         logger := cl.logger.WithNames("dht", conn.LocalAddr().String())
428         cfg := dht.ServerConfig{
429                 IPBlocklist:    cl.ipBlockList,
430                 Conn:           conn,
431                 OnAnnouncePeer: cl.onDHTAnnouncePeer,
432                 PublicIP: func() net.IP {
433                         if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
434                                 return cl.config.PublicIp6
435                         }
436                         return cl.config.PublicIp4
437                 }(),
438                 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
439                 OnQuery:       cl.config.DHTOnQuery,
440                 Logger:        logger,
441         }
442         if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
443                 f(&cfg)
444         }
445         s, err = dht.NewServer(&cfg)
446         if err == nil {
447                 go s.TableMaintainer()
448         }
449         return
450 }
451
452 func (cl *Client) Closed() events.Done {
453         return cl.closed.Done()
454 }
455
456 func (cl *Client) eachDhtServer(f func(DhtServer)) {
457         for _, ds := range cl.dhtServers {
458                 f(ds)
459         }
460 }
461
462 // Stops the client. All connections to peers are closed and all activity will come to a halt.
463 func (cl *Client) Close() (errs []error) {
464         var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
465         cl.lock()
466         for t := range cl.torrents {
467                 err := t.close(&closeGroup)
468                 if err != nil {
469                         errs = append(errs, err)
470                 }
471         }
472         for i := range cl.onClose {
473                 cl.onClose[len(cl.onClose)-1-i]()
474         }
475         cl.closed.Set()
476         cl.unlock()
477         cl.event.Broadcast()
478         closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
479         return
480 }
481
482 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
483         if cl.ipBlockList == nil {
484                 return
485         }
486         return cl.ipBlockList.Lookup(ip)
487 }
488
489 func (cl *Client) ipIsBlocked(ip net.IP) bool {
490         _, blocked := cl.ipBlockRange(ip)
491         return blocked
492 }
493
494 func (cl *Client) wantConns() bool {
495         if cl.config.AlwaysWantConns {
496                 return true
497         }
498         for t := range cl.torrents {
499                 if t.wantIncomingConns() {
500                         return true
501                 }
502         }
503         return false
504 }
505
506 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
507 func (cl *Client) rejectAccepted(conn net.Conn) error {
508         if !cl.wantConns() {
509                 return errors.New("don't want conns right now")
510         }
511         ra := conn.RemoteAddr()
512         if rip := addrIpOrNil(ra); rip != nil {
513                 if cl.config.DisableIPv4Peers && rip.To4() != nil {
514                         return errors.New("ipv4 peers disabled")
515                 }
516                 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
517                         return errors.New("ipv4 disabled")
518                 }
519                 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
520                         return errors.New("ipv6 disabled")
521                 }
522                 if cl.rateLimitAccept(rip) {
523                         return errors.New("source IP accepted rate limited")
524                 }
525                 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
526                         return errors.New("bad source addr")
527                 }
528         }
529         return nil
530 }
531
532 func (cl *Client) acceptConnections(l Listener) {
533         for {
534                 conn, err := l.Accept()
535                 torrent.Add("client listener accepts", 1)
536                 if err == nil {
537                         holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr())
538                         if holepunchErr == nil {
539                                 cl.lock()
540                                 if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
541                                         setAdd(&cl.accepted, holepunchAddr)
542                                 }
543                                 if g.MapContains(
544                                         cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
545                                         holepunchAddr,
546                                 ) {
547                                         setAdd(&cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr)
548                                 }
549                                 cl.unlock()
550                         }
551                 }
552                 conn = pproffd.WrapNetConn(conn)
553                 cl.rLock()
554                 closed := cl.closed.IsSet()
555                 var reject error
556                 if !closed && conn != nil {
557                         reject = cl.rejectAccepted(conn)
558                 }
559                 cl.rUnlock()
560                 if closed {
561                         if conn != nil {
562                                 conn.Close()
563                         }
564                         return
565                 }
566                 if err != nil {
567                         log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger)
568                         continue
569                 }
570                 go func() {
571                         if reject != nil {
572                                 torrent.Add("rejected accepted connections", 1)
573                                 cl.logger.LazyLog(log.Debug, func() log.Msg {
574                                         return log.Fmsg("rejecting accepted conn: %v", reject)
575                                 })
576                                 conn.Close()
577                         } else {
578                                 go cl.incomingConnection(conn)
579                         }
580                         cl.logger.LazyLog(log.Debug, func() log.Msg {
581                                 return log.Fmsg("accepted %q connection at %q from %q",
582                                         l.Addr().Network(),
583                                         conn.LocalAddr(),
584                                         conn.RemoteAddr(),
585                                 )
586                         })
587                         torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
588                         torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
589                         torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
590                 }()
591         }
592 }
593
594 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
595 func regularNetConnPeerConnConnString(nc net.Conn) string {
596         return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
597 }
598
599 func (cl *Client) incomingConnection(nc net.Conn) {
600         defer nc.Close()
601         if tc, ok := nc.(*net.TCPConn); ok {
602                 tc.SetLinger(0)
603         }
604         remoteAddr, _ := tryIpPortFromNetAddr(nc.RemoteAddr())
605         c := cl.newConnection(
606                 nc,
607                 newConnectionOpts{
608                         outgoing:        false,
609                         remoteAddr:      nc.RemoteAddr(),
610                         localPublicAddr: cl.publicAddr(remoteAddr.IP),
611                         network:         nc.RemoteAddr().Network(),
612                         connString:      regularNetConnPeerConnConnString(nc),
613                 })
614         c.Discovery = PeerSourceIncoming
615         cl.runReceivedConn(c)
616
617         cl.lock()
618         c.close()
619         cl.unlock()
620 }
621
622 // Returns a handle to the given torrent, if it's present in the client.
623 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
624         cl.rLock()
625         defer cl.rUnlock()
626         t, ok = cl.torrentsByShortHash[ih]
627         return
628 }
629
630 type DialResult struct {
631         Conn   net.Conn
632         Dialer Dialer
633 }
634
635 func countDialResult(err error) {
636         if err == nil {
637                 torrent.Add("successful dials", 1)
638         } else {
639                 torrent.Add("unsuccessful dials", 1)
640         }
641 }
642
643 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit, pendingPeers int) (ret time.Duration) {
644         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
645         if ret < minDialTimeout {
646                 ret = minDialTimeout
647         }
648         return
649 }
650
651 // Returns whether an address is known to connect to a client with our own ID.
652 func (cl *Client) dopplegangerAddr(addr string) bool {
653         _, ok := cl.dopplegangerAddrs[addr]
654         return ok
655 }
656
657 // Returns a connection over UTP or TCP, whichever is first to connect.
658 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
659         return DialFirst(ctx, addr, cl.dialers)
660 }
661
662 // Returns a connection over UTP or TCP, whichever is first to connect.
663 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
664         pool := dialPool{
665                 addr: addr,
666         }
667         defer pool.startDrainer()
668         for _, _s := range dialers {
669                 pool.add(ctx, _s)
670         }
671         return pool.getFirst()
672 }
673
674 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
675         c, err := s.Dial(ctx, addr)
676         if err != nil {
677                 log.ContextLogger(ctx).Levelf(log.Debug, "error dialing %q: %v", addr, err)
678         }
679         // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
680         // it now in case we close the connection forthwith. Note this is also done in the TCP dialer
681         // code to increase the chance it's done.
682         if tc, ok := c.(*net.TCPConn); ok {
683                 tc.SetLinger(0)
684         }
685         countDialResult(err)
686         return c
687 }
688
689 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string, attemptKey outgoingConnAttemptKey) {
690         path := t.getHalfOpenPath(addr, attemptKey)
691         if !path.Exists() {
692                 panic("should exist")
693         }
694         path.Delete()
695         cl.numHalfOpen--
696         if cl.numHalfOpen < 0 {
697                 panic("should not be possible")
698         }
699         for t := range cl.torrents {
700                 t.openNewConns()
701         }
702 }
703
704 func (cl *Client) countHalfOpenFromTorrents() (count int) {
705         for t := range cl.torrents {
706                 count += t.numHalfOpenAttempts()
707         }
708         return
709 }
710
711 // Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection
712 // for valid reasons.
713 func (cl *Client) initiateProtocolHandshakes(
714         ctx context.Context,
715         nc net.Conn,
716         t *Torrent,
717         encryptHeader bool,
718         newConnOpts newConnectionOpts,
719 ) (
720         c *PeerConn, err error,
721 ) {
722         c = cl.newConnection(nc, newConnOpts)
723         c.headerEncrypted = encryptHeader
724         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
725         defer cancel()
726         dl, ok := ctx.Deadline()
727         if !ok {
728                 panic(ctx)
729         }
730         err = nc.SetDeadline(dl)
731         if err != nil {
732                 panic(err)
733         }
734         err = cl.initiateHandshakes(c, t)
735         return
736 }
737
738 func doProtocolHandshakeOnDialResult(
739         t *Torrent,
740         obfuscatedHeader bool,
741         addr PeerRemoteAddr,
742         dr DialResult,
743 ) (
744         c *PeerConn, err error,
745 ) {
746         cl := t.cl
747         nc := dr.Conn
748         addrIpPort, _ := tryIpPortFromNetAddr(addr)
749         c, err = cl.initiateProtocolHandshakes(
750                 context.Background(), nc, t, obfuscatedHeader,
751                 newConnectionOpts{
752                         outgoing:   true,
753                         remoteAddr: addr,
754                         // It would be possible to retrieve a public IP from the dialer used here?
755                         localPublicAddr: cl.publicAddr(addrIpPort.IP),
756                         network:         dr.Dialer.DialerNetwork(),
757                         connString:      regularNetConnPeerConnConnString(nc),
758                 })
759         if err != nil {
760                 nc.Close()
761         }
762         return c, err
763 }
764
765 // Returns nil connection and nil error if no connection could be established for valid reasons.
766 func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, err error) {
767         // It would be better if dial rate limiting could be tested when considering to open connections
768         // instead. Doing it here means if the limit is low, and the half-open limit is high, we could
769         // end up with lots of outgoing connection attempts pending that were initiated on stale data.
770         {
771                 dialReservation := cl.config.DialRateLimiter.Reserve()
772                 if !opts.receivedHolepunchConnect {
773                         if !dialReservation.OK() {
774                                 err = errors.New("can't make dial limit reservation")
775                                 return
776                         }
777                         time.Sleep(dialReservation.Delay())
778                 }
779         }
780         torrent.Add("establish outgoing connection", 1)
781         addr := opts.peerInfo.Addr
782         dialPool := dialPool{
783                 resCh: make(chan DialResult),
784                 addr:  addr.String(),
785         }
786         defer dialPool.startDrainer()
787         dialTimeout := opts.t.getDialTimeoutUnlocked()
788         {
789                 ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
790                 defer cancel()
791                 for _, d := range cl.dialers {
792                         dialPool.add(ctx, d)
793                 }
794         }
795         holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr)
796         headerObfuscationPolicy := opts.HeaderObfuscationPolicy
797         obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred
798         firstDialResult := dialPool.getFirst()
799         if firstDialResult.Conn == nil {
800                 // No dialers worked. Try to initiate a holepunching rendezvous.
801                 if holepunchAddrErr == nil {
802                         cl.lock()
803                         if !opts.receivedHolepunchConnect {
804                                 g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
805                         }
806                         if !opts.skipHolepunchRendezvous {
807                                 opts.t.trySendHolepunchRendezvous(holepunchAddr)
808                         }
809                         cl.unlock()
810                 }
811                 err = fmt.Errorf("all initial dials failed")
812                 return
813         }
814         if opts.receivedHolepunchConnect && holepunchAddrErr == nil {
815                 cl.lock()
816                 if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
817                         g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
818                 }
819                 g.MakeMapIfNil(&cl.dialedSuccessfullyAfterHolepunchConnect)
820                 g.MapInsert(cl.dialedSuccessfullyAfterHolepunchConnect, holepunchAddr, struct{}{})
821                 cl.unlock()
822         }
823         c, err = doProtocolHandshakeOnDialResult(
824                 opts.t,
825                 obfuscatedHeaderFirst,
826                 addr,
827                 firstDialResult,
828         )
829         if err == nil {
830                 torrent.Add("initiated conn with preferred header obfuscation", 1)
831                 return
832         }
833         c.logger.Levelf(
834                 log.Debug,
835                 "error doing protocol handshake with header obfuscation %v",
836                 obfuscatedHeaderFirst,
837         )
838         firstDialResult.Conn.Close()
839         // We should have just tried with the preferred header obfuscation. If it was required, there's nothing else to try.
840         if headerObfuscationPolicy.RequirePreferred {
841                 return
842         }
843         // Reuse the dialer that returned already but failed to handshake.
844         {
845                 ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
846                 defer cancel()
847                 dialPool.add(ctx, firstDialResult.Dialer)
848         }
849         secondDialResult := dialPool.getFirst()
850         if secondDialResult.Conn == nil {
851                 return
852         }
853         c, err = doProtocolHandshakeOnDialResult(
854                 opts.t,
855                 !obfuscatedHeaderFirst,
856                 addr,
857                 secondDialResult,
858         )
859         if err == nil {
860                 torrent.Add("initiated conn with fallback header obfuscation", 1)
861                 return
862         }
863         c.logger.Levelf(
864                 log.Debug,
865                 "error doing protocol handshake with header obfuscation %v",
866                 !obfuscatedHeaderFirst,
867         )
868         secondDialResult.Conn.Close()
869         return
870 }
871
872 type outgoingConnOpts struct {
873         peerInfo PeerInfo
874         t        *Torrent
875         // Don't attempt to connect unless a connect message is received after initiating a rendezvous.
876         requireRendezvous bool
877         // Don't send rendezvous requests to eligible relays.
878         skipHolepunchRendezvous bool
879         // Outgoing connection attempt is in response to holepunch connect message.
880         receivedHolepunchConnect bool
881         HeaderObfuscationPolicy  HeaderObfuscationPolicy
882 }
883
884 // Called to dial out and run a connection. The addr we're given is already
885 // considered half-open.
886 func (cl *Client) outgoingConnection(
887         opts outgoingConnOpts,
888         attemptKey outgoingConnAttemptKey,
889 ) {
890         c, err := cl.dialAndCompleteHandshake(opts)
891         if err == nil {
892                 c.conn.SetWriteDeadline(time.Time{})
893         }
894         cl.lock()
895         defer cl.unlock()
896         // Don't release lock between here and addPeerConn, unless it's for failure.
897         cl.noLongerHalfOpen(opts.t, opts.peerInfo.Addr.String(), attemptKey)
898         if err != nil {
899                 if cl.config.Debug {
900                         cl.logger.Levelf(
901                                 log.Debug,
902                                 "error establishing outgoing connection to %v: %v",
903                                 opts.peerInfo.Addr,
904                                 err,
905                         )
906                 }
907                 return
908         }
909         defer c.close()
910         c.Discovery = opts.peerInfo.Source
911         c.trusted = opts.peerInfo.Trusted
912         opts.t.runHandshookConnLoggingErr(c)
913 }
914
915 // The port number for incoming peer connections. 0 if the client isn't listening.
916 func (cl *Client) incomingPeerPort() int {
917         return cl.LocalPort()
918 }
919
920 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) (err error) {
921         if c.headerEncrypted {
922                 var rw io.ReadWriter
923                 rw, c.cryptoMethod, err = mse.InitiateHandshake(
924                         struct {
925                                 io.Reader
926                                 io.Writer
927                         }{c.r, c.w},
928                         t.canonicalShortInfohash().Bytes(),
929                         nil,
930                         cl.config.CryptoProvides,
931                 )
932                 c.setRW(rw)
933                 if err != nil {
934                         return fmt.Errorf("header obfuscation handshake: %w", err)
935                 }
936         }
937         localReservedBits := cl.config.Extensions
938         handshakeIh := *t.canonicalShortInfohash()
939         // If we're sending the v1 infohash, and we know the v2 infohash, set the v2 upgrade bit. This
940         // means the peer can send the v2 infohash in the handshake to upgrade the connection.
941         localReservedBits.SetBit(pp.ExtensionBitV2Upgrade, g.Some(handshakeIh) == t.infoHash && t.infoHashV2.Ok)
942         ih, err := cl.connBtHandshake(c, &handshakeIh, localReservedBits)
943         if err != nil {
944                 return fmt.Errorf("bittorrent protocol handshake: %w", err)
945         }
946         if g.Some(ih) == t.infoHash {
947                 return nil
948         }
949         if t.infoHashV2.Ok && *t.infoHashV2.Value.ToShort() == ih {
950                 torrent.Add("initiated handshakes upgraded to v2", 1)
951                 c.v2 = true
952                 return nil
953         }
954         err = errors.New("bittorrent protocol handshake: peer infohash didn't match")
955         return
956 }
957
958 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
959 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
960 func (cl *Client) forSkeys(f func([]byte) bool) {
961         cl.rLock()
962         defer cl.rUnlock()
963         if false { // Emulate the bug from #114
964                 var firstIh InfoHash
965                 for ih := range cl.torrentsByShortHash {
966                         firstIh = ih
967                         break
968                 }
969                 for range cl.torrentsByShortHash {
970                         if !f(firstIh[:]) {
971                                 break
972                         }
973                 }
974                 return
975         }
976         for ih := range cl.torrentsByShortHash {
977                 if !f(ih[:]) {
978                         break
979                 }
980         }
981 }
982
983 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
984         if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
985                 return ret
986         }
987         return cl.forSkeys
988 }
989
990 // Do encryption and bittorrent handshakes as receiver.
991 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
992         defer perf.ScopeTimerErr(&err)()
993         var rw io.ReadWriter
994         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(
995                 c.rw(),
996                 cl.handshakeReceiverSecretKeys(),
997                 cl.config.HeaderObfuscationPolicy,
998                 cl.config.CryptoSelector,
999         )
1000         c.setRW(rw)
1001         if err == nil || err == mse.ErrNoSecretKeyMatch {
1002                 if c.headerEncrypted {
1003                         torrent.Add("handshakes received encrypted", 1)
1004                 } else {
1005                         torrent.Add("handshakes received unencrypted", 1)
1006                 }
1007         } else {
1008                 torrent.Add("handshakes received with error while handling encryption", 1)
1009         }
1010         if err != nil {
1011                 if err == mse.ErrNoSecretKeyMatch {
1012                         err = nil
1013                 }
1014                 return
1015         }
1016         if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
1017                 err = errors.New("connection does not have required header obfuscation")
1018                 return
1019         }
1020         ih, err := cl.connBtHandshake(c, nil, cl.config.Extensions)
1021         if err != nil {
1022                 return nil, fmt.Errorf("during bt handshake: %w", err)
1023         }
1024
1025         cl.lock()
1026         t = cl.torrentsByShortHash[ih]
1027         if t != nil && t.infoHashV2.Ok && *t.infoHashV2.Value.ToShort() == ih {
1028                 torrent.Add("v2 handshakes received", 1)
1029                 c.v2 = true
1030         }
1031         cl.unlock()
1032
1033         return
1034 }
1035
1036 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
1037
1038 func init() {
1039         torrent.Set(
1040                 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
1041                 &successfulPeerWireProtocolHandshakePeerReservedBytes)
1042 }
1043
1044 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash, reservedBits PeerExtensionBits) (ret metainfo.Hash, err error) {
1045         res, err := pp.Handshake(c.rw(), ih, cl.peerID, reservedBits)
1046         if err != nil {
1047                 return
1048         }
1049         successfulPeerWireProtocolHandshakePeerReservedBytes.Add(
1050                 hex.EncodeToString(res.PeerExtensionBits[:]), 1)
1051         ret = res.Hash
1052         c.PeerExtensionBytes = res.PeerExtensionBits
1053         c.PeerID = res.PeerID
1054         c.completedHandshake = time.Now()
1055         if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
1056                 cb(c, res.Hash)
1057         }
1058         return
1059 }
1060
1061 func (cl *Client) runReceivedConn(c *PeerConn) {
1062         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
1063         if err != nil {
1064                 panic(err)
1065         }
1066         t, err := cl.receiveHandshakes(c)
1067         if err != nil {
1068                 cl.logger.LazyLog(log.Debug, func() log.Msg {
1069                         return log.Fmsg(
1070                                 "error receiving handshakes on %v: %s", c, err,
1071                         ).Add(
1072                                 "network", c.Network,
1073                         )
1074                 })
1075                 torrent.Add("error receiving handshake", 1)
1076                 cl.lock()
1077                 cl.onBadAccept(c.RemoteAddr)
1078                 cl.unlock()
1079                 return
1080         }
1081         if t == nil {
1082                 torrent.Add("received handshake for unloaded torrent", 1)
1083                 cl.logger.LazyLog(log.Debug, func() log.Msg {
1084                         return log.Fmsg("received handshake for unloaded torrent")
1085                 })
1086                 cl.lock()
1087                 cl.onBadAccept(c.RemoteAddr)
1088                 cl.unlock()
1089                 return
1090         }
1091         torrent.Add("received handshake for loaded torrent", 1)
1092         c.conn.SetWriteDeadline(time.Time{})
1093         cl.lock()
1094         defer cl.unlock()
1095         t.runHandshookConnLoggingErr(c)
1096 }
1097
1098 // Client lock must be held before entering this.
1099 func (t *Torrent) runHandshookConn(pc *PeerConn) error {
1100         pc.setTorrent(t)
1101         cl := t.cl
1102         for i, b := range cl.config.MinPeerExtensions {
1103                 if pc.PeerExtensionBytes[i]&b != b {
1104                         return fmt.Errorf("peer did not meet minimum peer extensions: %x", pc.PeerExtensionBytes[:])
1105                 }
1106         }
1107         if pc.PeerID == cl.peerID {
1108                 if pc.outgoing {
1109                         connsToSelf.Add(1)
1110                         addr := pc.RemoteAddr.String()
1111                         cl.dopplegangerAddrs[addr] = struct{}{}
1112                 } /* else {
1113                         // Because the remote address is not necessarily the same as its client's torrent listen
1114                         // address, we won't record the remote address as a doppleganger. Instead, the initiator
1115                         // can record *us* as the doppleganger.
1116                 } */
1117                 t.logger.Levelf(log.Debug, "local and remote peer ids are the same")
1118                 return nil
1119         }
1120         pc.r = deadlineReader{pc.conn, pc.r}
1121         completedHandshakeConnectionFlags.Add(pc.connectionFlags(), 1)
1122         if connIsIpv6(pc.conn) {
1123                 torrent.Add("completed handshake over ipv6", 1)
1124         }
1125         if err := t.addPeerConn(pc); err != nil {
1126                 return fmt.Errorf("adding connection: %w", err)
1127         }
1128         defer t.dropConnection(pc)
1129         pc.addBuiltinLtepProtocols(!cl.config.DisablePEX)
1130         for _, cb := range pc.callbacks.PeerConnAdded {
1131                 cb(pc)
1132         }
1133         pc.startMessageWriter()
1134         pc.sendInitialMessages()
1135         pc.initUpdateRequestsTimer()
1136         err := pc.mainReadLoop()
1137         if err != nil {
1138                 return fmt.Errorf("main read loop: %w", err)
1139         }
1140         return nil
1141 }
1142
1143 func (p *Peer) initUpdateRequestsTimer() {
1144         if check.Enabled {
1145                 if p.updateRequestsTimer != nil {
1146                         panic(p.updateRequestsTimer)
1147                 }
1148         }
1149         if enableUpdateRequestsTimer {
1150                 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
1151         }
1152 }
1153
1154 const peerUpdateRequestsTimerReason = "updateRequestsTimer"
1155
1156 func (c *Peer) updateRequestsTimerFunc() {
1157         c.locker().Lock()
1158         defer c.locker().Unlock()
1159         if c.closed.IsSet() {
1160                 return
1161         }
1162         if c.isLowOnRequests() {
1163                 // If there are no outstanding requests, then a request update should have already run.
1164                 return
1165         }
1166         if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
1167                 // These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's
1168                 // already been fired.
1169                 torrent.Add("spurious timer requests updates", 1)
1170                 return
1171         }
1172         c.updateRequests(peerUpdateRequestsTimerReason)
1173 }
1174
1175 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1176 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1177 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1178 const localClientReqq = 1024
1179
1180 // See the order given in Transmission's tr_peerMsgsNew.
1181 func (pc *PeerConn) sendInitialMessages() {
1182         t := pc.t
1183         cl := t.cl
1184         if pc.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1185                 pc.write(pp.Message{
1186                         Type:       pp.Extended,
1187                         ExtendedID: pp.HandshakeExtendedID,
1188                         ExtendedPayload: func() []byte {
1189                                 msg := pp.ExtendedHandshakeMessage{
1190                                         V:            cl.config.ExtendedHandshakeClientVersion,
1191                                         Reqq:         localClientReqq,
1192                                         YourIp:       pp.CompactIp(pc.remoteIp()),
1193                                         Encryption:   cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1194                                         Port:         cl.incomingPeerPort(),
1195                                         MetadataSize: t.metadataSize(),
1196                                         // TODO: We can figure these out specific to the socket used.
1197                                         Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1198                                         Ipv6: cl.config.PublicIp6.To16(),
1199                                 }
1200                                 msg.M = pc.LocalLtepProtocolMap.toSupportedExtensionDict()
1201                                 return bencode.MustMarshal(msg)
1202                         }(),
1203                 })
1204         }
1205         func() {
1206                 if pc.fastEnabled() {
1207                         if t.haveAllPieces() {
1208                                 pc.write(pp.Message{Type: pp.HaveAll})
1209                                 pc.sentHaves.AddRange(0, bitmap.BitRange(pc.t.NumPieces()))
1210                                 return
1211                         } else if !t.haveAnyPieces() {
1212                                 pc.write(pp.Message{Type: pp.HaveNone})
1213                                 pc.sentHaves.Clear()
1214                                 return
1215                         }
1216                 }
1217                 pc.postBitfield()
1218         }()
1219         if pc.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1220                 pc.write(pp.Message{
1221                         Type: pp.Port,
1222                         Port: cl.dhtPort(),
1223                 })
1224         }
1225 }
1226
1227 func (cl *Client) dhtPort() (ret uint16) {
1228         if len(cl.dhtServers) == 0 {
1229                 return
1230         }
1231         return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1232 }
1233
1234 func (cl *Client) haveDhtServer() bool {
1235         return len(cl.dhtServers) > 0
1236 }
1237
1238 // Process incoming ut_metadata message.
1239 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1240         var d pp.ExtendedMetadataRequestMsg
1241         err := bencode.Unmarshal(payload, &d)
1242         if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1243         } else if err != nil {
1244                 return fmt.Errorf("error unmarshalling bencode: %s", err)
1245         }
1246         piece := d.Piece
1247         switch d.Type {
1248         case pp.DataMetadataExtensionMsgType:
1249                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1250                 if !c.requestedMetadataPiece(piece) {
1251                         return fmt.Errorf("got unexpected piece %d", piece)
1252                 }
1253                 c.metadataRequests[piece] = false
1254                 begin := len(payload) - d.PieceSize()
1255                 if begin < 0 || begin >= len(payload) {
1256                         return fmt.Errorf("data has bad offset in payload: %d", begin)
1257                 }
1258                 t.saveMetadataPiece(piece, payload[begin:])
1259                 c.lastUsefulChunkReceived = time.Now()
1260                 err = t.maybeCompleteMetadata()
1261                 if err != nil {
1262                         // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1263                         // don't know who to blame. TODO: Also errors can be returned here that aren't related
1264                         // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1265                         // log consumers can filter for this message.
1266                         t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1267                 }
1268                 return err
1269         case pp.RequestMetadataExtensionMsgType:
1270                 if !t.haveMetadataPiece(piece) {
1271                         c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1272                         return nil
1273                 }
1274                 start := (1 << 14) * piece
1275                 c.protocolLogger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1276                 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1277                 return nil
1278         case pp.RejectMetadataExtensionMsgType:
1279                 return nil
1280         default:
1281                 return errors.New("unknown msg_type value")
1282         }
1283 }
1284
1285 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1286         if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1287                 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1288         }
1289         return false
1290 }
1291
1292 // Returns whether the IP address and port are considered "bad".
1293 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1294         if port == 0 || ip == nil {
1295                 return true
1296         }
1297         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1298                 return true
1299         }
1300         if _, ok := cl.ipBlockRange(ip); ok {
1301                 return true
1302         }
1303         ipAddr, ok := netip.AddrFromSlice(ip)
1304         if !ok {
1305                 panic(ip)
1306         }
1307         if _, ok := cl.badPeerIPs[ipAddr]; ok {
1308                 return true
1309         }
1310         return false
1311 }
1312
1313 // Return a Torrent ready for insertion into a Client.
1314 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1315         return cl.newTorrentOpt(AddTorrentOpts{
1316                 InfoHash: ih,
1317                 Storage:  specStorage,
1318         })
1319 }
1320
1321 // Return a Torrent ready for insertion into a Client.
1322 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1323         var v1InfoHash g.Option[infohash.T]
1324         if !opts.InfoHash.IsZero() {
1325                 v1InfoHash.Set(opts.InfoHash)
1326         }
1327         if !v1InfoHash.Ok && !opts.InfoHashV2.Ok {
1328                 panic("v1 infohash must be nonzero or v2 infohash must be set")
1329         }
1330         // use provided storage, if provided
1331         storageClient := cl.defaultStorage
1332         if opts.Storage != nil {
1333                 storageClient = storage.NewClient(opts.Storage)
1334         }
1335
1336         t = &Torrent{
1337                 cl:         cl,
1338                 infoHash:   v1InfoHash,
1339                 infoHashV2: opts.InfoHashV2,
1340                 peers: prioritizedPeers{
1341                         om: gbtree.New(32),
1342                         getPrio: func(p PeerInfo) peerPriority {
1343                                 ipPort := p.addr()
1344                                 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1345                         },
1346                 },
1347                 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1348
1349                 storageOpener:       storageClient,
1350                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1351
1352                 metadataChanged: sync.Cond{
1353                         L: cl.locker(),
1354                 },
1355                 webSeeds:     make(map[string]*Peer),
1356                 gotMetainfoC: make(chan struct{}),
1357         }
1358         var salt [8]byte
1359         rand.Read(salt[:])
1360         t.smartBanCache.Hash = func(b []byte) uint64 {
1361                 h := xxhash.New()
1362                 h.Write(salt[:])
1363                 h.Write(b)
1364                 return h.Sum64()
1365         }
1366         t.smartBanCache.Init()
1367         t.networkingEnabled.Set()
1368         t.logger = cl.logger.WithDefaultLevel(log.Debug)
1369         t.sourcesLogger = t.logger.WithNames("sources")
1370         if opts.ChunkSize == 0 {
1371                 opts.ChunkSize = defaultChunkSize
1372         }
1373         t.setChunkSize(opts.ChunkSize)
1374         return
1375 }
1376
1377 // A file-like handle to some torrent data resource.
1378 type Handle interface {
1379         io.Reader
1380         io.Seeker
1381         io.Closer
1382         io.ReaderAt
1383 }
1384
1385 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1386         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1387 }
1388
1389 // Deprecated. Adds a torrent by InfoHash with a custom Storage implementation.
1390 // If the torrent already exists then this Storage is ignored and the
1391 // existing torrent returned with `new` set to `false`
1392 func (cl *Client) AddTorrentInfoHashWithStorage(
1393         infoHash metainfo.Hash,
1394         specStorage storage.ClientImpl,
1395 ) (t *Torrent, new bool) {
1396         cl.lock()
1397         defer cl.unlock()
1398         t, ok := cl.torrentsByShortHash[infoHash]
1399         if ok {
1400                 return
1401         }
1402         new = true
1403
1404         t = cl.newTorrent(infoHash, specStorage)
1405         cl.eachDhtServer(func(s DhtServer) {
1406                 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1407                         go t.dhtAnnouncer(s)
1408                 }
1409         })
1410         cl.torrentsByShortHash[infoHash] = t
1411         cl.torrents[t] = struct{}{}
1412         cl.clearAcceptLimits()
1413         t.updateWantPeersEvent()
1414         // Tickle Client.waitAccept, new torrent may want conns.
1415         cl.event.Broadcast()
1416         return
1417 }
1418
1419 // Adds a torrent by InfoHash with a custom Storage implementation. If the torrent already exists
1420 // then this Storage is ignored and the existing torrent returned with `new` set to `false`.
1421 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1422         infoHash := opts.InfoHash
1423         cl.lock()
1424         defer cl.unlock()
1425         t, ok := cl.torrentsByShortHash[infoHash]
1426         if ok {
1427                 return
1428         }
1429         if opts.InfoHashV2.Ok {
1430                 t, ok = cl.torrentsByShortHash[*opts.InfoHashV2.Value.ToShort()]
1431                 if ok {
1432                         return
1433                 }
1434         }
1435         new = true
1436
1437         t = cl.newTorrentOpt(opts)
1438         cl.eachDhtServer(func(s DhtServer) {
1439                 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1440                         go t.dhtAnnouncer(s)
1441                 }
1442         })
1443         cl.torrentsByShortHash[infoHash] = t
1444         cl.torrents[t] = struct{}{}
1445         t.setInfoBytesLocked(opts.InfoBytes)
1446         cl.clearAcceptLimits()
1447         t.updateWantPeersEvent()
1448         // Tickle Client.waitAccept, new torrent may want conns.
1449         cl.event.Broadcast()
1450         return
1451 }
1452
1453 type AddTorrentOpts struct {
1454         InfoHash   infohash.T
1455         InfoHashV2 g.Option[infohash_v2.T]
1456         Storage    storage.ClientImpl
1457         ChunkSize  pp.Integer
1458         InfoBytes  []byte
1459 }
1460
1461 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1462 // Torrent.MergeSpec.
1463 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1464         t, new = cl.AddTorrentOpt(AddTorrentOpts{
1465                 InfoHash:   spec.InfoHash,
1466                 InfoHashV2: spec.InfoHashV2,
1467                 Storage:    spec.Storage,
1468                 ChunkSize:  spec.ChunkSize,
1469         })
1470         modSpec := *spec
1471         if new {
1472                 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1473                 // it.
1474                 modSpec.ChunkSize = 0
1475         }
1476         err = t.MergeSpec(&modSpec)
1477         if err != nil && new {
1478                 t.Drop()
1479         }
1480         return
1481 }
1482
1483 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1484 // spec.DisallowDataDownload/Upload will be read and applied
1485 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1486 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1487         if spec.DisplayName != "" {
1488                 t.SetDisplayName(spec.DisplayName)
1489         }
1490         if spec.InfoBytes != nil {
1491                 err := t.SetInfoBytes(spec.InfoBytes)
1492                 if err != nil {
1493                         return err
1494                 }
1495         }
1496         cl := t.cl
1497         cl.AddDhtNodes(spec.DhtNodes)
1498         t.UseSources(spec.Sources)
1499         cl.lock()
1500         defer cl.unlock()
1501         t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1502         for _, url := range spec.Webseeds {
1503                 t.addWebSeed(url)
1504         }
1505         for _, peerAddr := range spec.PeerAddrs {
1506                 t.addPeer(PeerInfo{
1507                         Addr:    StringAddr(peerAddr),
1508                         Source:  PeerSourceDirect,
1509                         Trusted: true,
1510                 })
1511         }
1512         if spec.ChunkSize != 0 {
1513                 panic("chunk size cannot be changed for existing Torrent")
1514         }
1515         t.addTrackers(spec.Trackers)
1516         t.maybeNewConns()
1517         t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1518         t.dataUploadDisallowed = spec.DisallowDataUpload
1519         return errors.Join(t.addPieceLayersLocked(spec.PieceLayers)...)
1520 }
1521
1522 func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) (err error) {
1523         t.eachShortInfohash(func(short [20]byte) {
1524                 delete(cl.torrentsByShortHash, short)
1525         })
1526         err = t.close(wg)
1527         delete(cl.torrents, t)
1528         return
1529 }
1530
1531 func (cl *Client) allTorrentsCompleted() bool {
1532         for t := range cl.torrents {
1533                 if !t.haveInfo() {
1534                         return false
1535                 }
1536                 if !t.haveAllPieces() {
1537                         return false
1538                 }
1539         }
1540         return true
1541 }
1542
1543 // Returns true when all torrents are completely downloaded and false if the
1544 // client is stopped before that.
1545 func (cl *Client) WaitAll() bool {
1546         cl.lock()
1547         defer cl.unlock()
1548         for !cl.allTorrentsCompleted() {
1549                 if cl.closed.IsSet() {
1550                         return false
1551                 }
1552                 cl.event.Wait()
1553         }
1554         return true
1555 }
1556
1557 // Returns handles to all the torrents loaded in the Client.
1558 func (cl *Client) Torrents() []*Torrent {
1559         cl.rLock()
1560         defer cl.rUnlock()
1561         return cl.torrentsAsSlice()
1562 }
1563
1564 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1565         for t := range cl.torrents {
1566                 ret = append(ret, t)
1567         }
1568         return
1569 }
1570
1571 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1572         spec, err := TorrentSpecFromMagnetUri(uri)
1573         if err != nil {
1574                 return
1575         }
1576         T, _, err = cl.AddTorrentSpec(spec)
1577         return
1578 }
1579
1580 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1581         ts, err := TorrentSpecFromMetaInfoErr(mi)
1582         if err != nil {
1583                 return
1584         }
1585         T, _, err = cl.AddTorrentSpec(ts)
1586         return
1587 }
1588
1589 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1590         mi, err := metainfo.LoadFromFile(filename)
1591         if err != nil {
1592                 return
1593         }
1594         return cl.AddTorrent(mi)
1595 }
1596
1597 func (cl *Client) DhtServers() []DhtServer {
1598         return cl.dhtServers
1599 }
1600
1601 func (cl *Client) AddDhtNodes(nodes []string) {
1602         for _, n := range nodes {
1603                 hmp := missinggo.SplitHostMaybePort(n)
1604                 ip := net.ParseIP(hmp.Host)
1605                 if ip == nil {
1606                         cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1607                         continue
1608                 }
1609                 ni := krpc.NodeInfo{
1610                         Addr: krpc.NodeAddr{
1611                                 IP:   ip,
1612                                 Port: hmp.Port,
1613                         },
1614                 }
1615                 cl.eachDhtServer(func(s DhtServer) {
1616                         s.AddNode(ni)
1617                 })
1618         }
1619 }
1620
1621 func (cl *Client) banPeerIP(ip net.IP) {
1622         // We can't take this from string, because it will lose netip's v4on6. net.ParseIP parses v4
1623         // addresses directly to v4on6, which doesn't compare equal with v4.
1624         ipAddr, ok := netip.AddrFromSlice(ip)
1625         if !ok {
1626                 panic(ip)
1627         }
1628         g.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{})
1629         for t := range cl.torrents {
1630                 t.iterPeers(func(p *Peer) {
1631                         if p.remoteIp().Equal(ip) {
1632                                 t.logger.Levelf(log.Warning, "dropping peer %v with banned ip %v", p, ip)
1633                                 // Should this be a close?
1634                                 p.drop()
1635                         }
1636                 })
1637         }
1638 }
1639
1640 type newConnectionOpts struct {
1641         outgoing        bool
1642         remoteAddr      PeerRemoteAddr
1643         localPublicAddr peerLocalPublicAddr
1644         network         string
1645         connString      string
1646 }
1647
1648 func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) {
1649         if opts.network == "" {
1650                 panic(opts.remoteAddr)
1651         }
1652         c = &PeerConn{
1653                 Peer: Peer{
1654                         outgoing:        opts.outgoing,
1655                         choking:         true,
1656                         peerChoking:     true,
1657                         PeerMaxRequests: 250,
1658
1659                         RemoteAddr:      opts.remoteAddr,
1660                         localPublicAddr: opts.localPublicAddr,
1661                         Network:         opts.network,
1662                         callbacks:       &cl.config.Callbacks,
1663                 },
1664                 connString: opts.connString,
1665                 conn:       nc,
1666         }
1667         c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn
1668         c.initRequestState()
1669         // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
1670         if opts.remoteAddr != nil {
1671                 netipAddrPort, err := netip.ParseAddrPort(opts.remoteAddr.String())
1672                 if err == nil {
1673                         c.bannableAddr = Some(netipAddrPort.Addr())
1674                 }
1675         }
1676         c.peerImpl = c
1677         c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextText(fmt.Sprintf("%T %p", c, c))
1678         c.protocolLogger = c.logger.WithNames(protocolLoggingName)
1679         c.setRW(connStatsReadWriter{nc, c})
1680         c.r = &rateLimitedReader{
1681                 l: cl.config.DownloadRateLimiter,
1682                 r: c.r,
1683         }
1684         c.logger.Levelf(
1685                 log.Debug,
1686                 "inited with remoteAddr %v network %v outgoing %t",
1687                 opts.remoteAddr, opts.network, opts.outgoing,
1688         )
1689         for _, f := range cl.config.Callbacks.NewPeer {
1690                 f(&c.Peer)
1691         }
1692         return
1693 }
1694
1695 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1696         cl.lock()
1697         defer cl.unlock()
1698         t := cl.torrentsByShortHash[ih]
1699         if t == nil {
1700                 return
1701         }
1702         t.addPeers([]PeerInfo{{
1703                 Addr:   ipPortAddr{ip, port},
1704                 Source: PeerSourceDhtAnnouncePeer,
1705         }})
1706 }
1707
1708 func firstNotNil(ips ...net.IP) net.IP {
1709         for _, ip := range ips {
1710                 if ip != nil {
1711                         return ip
1712                 }
1713         }
1714         return nil
1715 }
1716
1717 func (cl *Client) eachListener(f func(Listener) bool) {
1718         for _, s := range cl.listeners {
1719                 if !f(s) {
1720                         break
1721                 }
1722         }
1723 }
1724
1725 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1726         for i := 0; i < len(cl.listeners); i += 1 {
1727                 if ret = cl.listeners[i]; f(ret) {
1728                         return
1729                 }
1730         }
1731         return nil
1732 }
1733
1734 func (cl *Client) publicIp(peer net.IP) net.IP {
1735         // TODO: Use BEP 10 to determine how peers are seeing us.
1736         if peer.To4() != nil {
1737                 return firstNotNil(
1738                         cl.config.PublicIp4,
1739                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1740                 )
1741         }
1742
1743         return firstNotNil(
1744                 cl.config.PublicIp6,
1745                 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1746         )
1747 }
1748
1749 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1750         l := cl.findListener(
1751                 func(l Listener) bool {
1752                         return f(addrIpOrNil(l.Addr()))
1753                 },
1754         )
1755         if l == nil {
1756                 return nil
1757         }
1758         return addrIpOrNil(l.Addr())
1759 }
1760
1761 // Our IP as a peer should see it.
1762 func (cl *Client) publicAddr(peer net.IP) IpPort {
1763         return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1764 }
1765
1766 // ListenAddrs addresses currently being listened to.
1767 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1768         cl.lock()
1769         ret = make([]net.Addr, len(cl.listeners))
1770         for i := 0; i < len(cl.listeners); i += 1 {
1771                 ret[i] = cl.listeners[i].Addr()
1772         }
1773         cl.unlock()
1774         return
1775 }
1776
1777 func (cl *Client) PublicIPs() (ips []net.IP) {
1778         if ip := cl.config.PublicIp4; ip != nil {
1779                 ips = append(ips, ip)
1780         }
1781         if ip := cl.config.PublicIp6; ip != nil {
1782                 ips = append(ips, ip)
1783         }
1784         return
1785 }
1786
1787 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1788         ipa, ok := tryIpPortFromNetAddr(addr)
1789         if !ok {
1790                 return
1791         }
1792         ip := maskIpForAcceptLimiting(ipa.IP)
1793         if cl.acceptLimiter == nil {
1794                 cl.acceptLimiter = make(map[ipStr]int)
1795         }
1796         cl.acceptLimiter[ipStr(ip.String())]++
1797 }
1798
1799 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1800         if ip4 := ip.To4(); ip4 != nil {
1801                 return ip4.Mask(net.CIDRMask(24, 32))
1802         }
1803         return ip
1804 }
1805
1806 func (cl *Client) clearAcceptLimits() {
1807         cl.acceptLimiter = nil
1808 }
1809
1810 func (cl *Client) acceptLimitClearer() {
1811         for {
1812                 select {
1813                 case <-cl.closed.Done():
1814                         return
1815                 case <-time.After(15 * time.Minute):
1816                         cl.lock()
1817                         cl.clearAcceptLimits()
1818                         cl.unlock()
1819                 }
1820         }
1821 }
1822
1823 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1824         if cl.config.DisableAcceptRateLimiting {
1825                 return false
1826         }
1827         return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1828 }
1829
1830 func (cl *Client) rLock() {
1831         cl._mu.RLock()
1832 }
1833
1834 func (cl *Client) rUnlock() {
1835         cl._mu.RUnlock()
1836 }
1837
1838 func (cl *Client) lock() {
1839         cl._mu.Lock()
1840 }
1841
1842 func (cl *Client) unlock() {
1843         cl._mu.Unlock()
1844 }
1845
1846 func (cl *Client) locker() *lockWithDeferreds {
1847         return &cl._mu
1848 }
1849
1850 func (cl *Client) String() string {
1851         return fmt.Sprintf("<%[1]T %[1]p>", cl)
1852 }
1853
1854 func (cl *Client) ICEServers() []webrtc.ICEServer {
1855         var ICEServers []webrtc.ICEServer
1856         if cl.config.ICEServerList != nil {
1857                 ICEServers = cl.config.ICEServerList
1858         } else if cl.config.ICEServers != nil {
1859                 ICEServers = []webrtc.ICEServer{{URLs: cl.config.ICEServers}}
1860         }
1861         return ICEServers
1862 }
1863
1864 // Returns connection-level aggregate connStats at the Client level. See the comment on
1865 // TorrentStats.ConnStats.
1866 func (cl *Client) ConnStats() ConnStats {
1867         return cl.connStats.Copy()
1868 }
1869
1870 func (cl *Client) Stats() ClientStats {
1871         cl.rLock()
1872         defer cl.rUnlock()
1873         return cl.statsLocked()
1874 }