]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
95c21aabb7576333b80cdbd9b4d3a2cef71b0460
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "context"
6         "crypto/rand"
7         "crypto/sha1"
8         "encoding/binary"
9         "encoding/hex"
10         "errors"
11         "expvar"
12         "fmt"
13         "io"
14         "math"
15         "net"
16         "net/http"
17         "net/netip"
18         "sort"
19         "strconv"
20         "time"
21
22         "github.com/anacrolix/chansync"
23         "github.com/anacrolix/chansync/events"
24         "github.com/anacrolix/dht/v2"
25         "github.com/anacrolix/dht/v2/krpc"
26         . "github.com/anacrolix/generics"
27         g "github.com/anacrolix/generics"
28         "github.com/anacrolix/log"
29         "github.com/anacrolix/missinggo/perf"
30         "github.com/anacrolix/missinggo/v2"
31         "github.com/anacrolix/missinggo/v2/bitmap"
32         "github.com/anacrolix/missinggo/v2/pproffd"
33         "github.com/anacrolix/sync"
34         "github.com/davecgh/go-spew/spew"
35         "github.com/dustin/go-humanize"
36         gbtree "github.com/google/btree"
37         "github.com/pion/datachannel"
38         "golang.org/x/time/rate"
39
40         "github.com/anacrolix/torrent/bencode"
41         "github.com/anacrolix/torrent/internal/check"
42         "github.com/anacrolix/torrent/internal/limiter"
43         "github.com/anacrolix/torrent/iplist"
44         "github.com/anacrolix/torrent/metainfo"
45         "github.com/anacrolix/torrent/mse"
46         pp "github.com/anacrolix/torrent/peer_protocol"
47         utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
48         request_strategy "github.com/anacrolix/torrent/request-strategy"
49         "github.com/anacrolix/torrent/storage"
50         "github.com/anacrolix/torrent/tracker"
51         "github.com/anacrolix/torrent/types/infohash"
52         "github.com/anacrolix/torrent/webtorrent"
53 )
54
55 // Clients contain zero or more Torrents. A Client manages a blocklist, the
56 // TCP/UDP protocol ports, and DHT as desired.
57 type Client struct {
58         // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
59         // fields. See #262.
60         connStats ConnStats
61
62         _mu    lockWithDeferreds
63         event  sync.Cond
64         closed chansync.SetOnce
65
66         config *ClientConfig
67         logger log.Logger
68
69         peerID         PeerID
70         defaultStorage *storage.Client
71         onClose        []func()
72         dialers        []Dialer
73         listeners      []Listener
74         dhtServers     []DhtServer
75         ipBlockList    iplist.Ranger
76
77         // Set of addresses that have our client ID. This intentionally will
78         // include ourselves if we end up trying to connect to our own address
79         // through legitimate channels.
80         dopplegangerAddrs map[string]struct{}
81         badPeerIPs        map[netip.Addr]struct{}
82         torrents          map[InfoHash]*Torrent
83         pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
84
85         acceptLimiter   map[ipStr]int
86         dialRateLimiter *rate.Limiter
87         numHalfOpen     int
88
89         websocketTrackers websocketTrackers
90
91         activeAnnounceLimiter limiter.Instance
92         httpClient            *http.Client
93
94         undialableWithoutHolepunch                                   map[netip.AddrPort]struct{}
95         undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect map[netip.AddrPort]struct{}
96         dialableOnlyAfterHolepunch                                   map[netip.AddrPort]struct{}
97 }
98
99 type ipStr string
100
101 func (cl *Client) BadPeerIPs() (ips []string) {
102         cl.rLock()
103         ips = cl.badPeerIPsLocked()
104         cl.rUnlock()
105         return
106 }
107
108 func (cl *Client) badPeerIPsLocked() (ips []string) {
109         ips = make([]string, len(cl.badPeerIPs))
110         i := 0
111         for k := range cl.badPeerIPs {
112                 ips[i] = k.String()
113                 i += 1
114         }
115         return
116 }
117
118 func (cl *Client) PeerID() PeerID {
119         return cl.peerID
120 }
121
122 // Returns the port number for the first listener that has one. No longer assumes that all port
123 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
124 // found.
125 func (cl *Client) LocalPort() (port int) {
126         for i := 0; i < len(cl.listeners); i += 1 {
127                 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
128                         return
129                 }
130         }
131         return
132 }
133
134 func writeDhtServerStatus(w io.Writer, s DhtServer) {
135         dhtStats := s.Stats()
136         fmt.Fprintf(w, " ID: %x\n", s.ID())
137         spew.Fdump(w, dhtStats)
138 }
139
140 // Writes out a human readable status of the client, such as for writing to a
141 // HTTP status page.
142 func (cl *Client) WriteStatus(_w io.Writer) {
143         cl.rLock()
144         defer cl.rUnlock()
145         w := bufio.NewWriter(_w)
146         defer w.Flush()
147         fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
148         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
149         fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
150         fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
151         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
152         cl.eachDhtServer(func(s DhtServer) {
153                 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
154                 writeDhtServerStatus(w, s)
155         })
156         dumpStats(w, cl.statsLocked())
157         torrentsSlice := cl.torrentsAsSlice()
158         fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
159         fmt.Fprintln(w)
160         sort.Slice(torrentsSlice, func(l, r int) bool {
161                 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
162         })
163         for _, t := range torrentsSlice {
164                 if t.name() == "" {
165                         fmt.Fprint(w, "<unknown name>")
166                 } else {
167                         fmt.Fprint(w, t.name())
168                 }
169                 fmt.Fprint(w, "\n")
170                 if t.info != nil {
171                         fmt.Fprintf(
172                                 w,
173                                 "%f%% of %d bytes (%s)",
174                                 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
175                                 t.length(),
176                                 humanize.Bytes(uint64(t.length())))
177                 } else {
178                         w.WriteString("<missing metainfo>")
179                 }
180                 fmt.Fprint(w, "\n")
181                 t.writeStatus(w)
182                 fmt.Fprintln(w)
183         }
184 }
185
186 func (cl *Client) initLogger() {
187         logger := cl.config.Logger
188         if logger.IsZero() {
189                 logger = log.Default
190         }
191         if cl.config.Debug {
192                 logger = logger.FilterLevel(log.Debug)
193         }
194         cl.logger = logger.WithValues(cl)
195 }
196
197 func (cl *Client) announceKey() int32 {
198         return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
199 }
200
201 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
202 func (cl *Client) init(cfg *ClientConfig) {
203         cl.config = cfg
204         g.MakeMap(&cl.dopplegangerAddrs)
205         cl.torrents = make(map[metainfo.Hash]*Torrent)
206         cl.dialRateLimiter = rate.NewLimiter(10, 10)
207         cl.activeAnnounceLimiter.SlotsPerKey = 2
208         cl.event.L = cl.locker()
209         cl.ipBlockList = cfg.IPBlocklist
210         cl.httpClient = &http.Client{
211                 Transport: &http.Transport{
212                         Proxy:       cfg.HTTPProxy,
213                         DialContext: cfg.HTTPDialContext,
214                         // I think this value was observed from some webseeds. It seems reasonable to extend it
215                         // to other uses of HTTP from the client.
216                         MaxConnsPerHost: 10,
217                 },
218         }
219 }
220
221 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
222         if cfg == nil {
223                 cfg = NewDefaultClientConfig()
224                 cfg.ListenPort = 0
225         }
226         var client Client
227         client.init(cfg)
228         cl = &client
229         go cl.acceptLimitClearer()
230         cl.initLogger()
231         defer func() {
232                 if err != nil {
233                         cl.Close()
234                         cl = nil
235                 }
236         }()
237
238         storageImpl := cfg.DefaultStorage
239         if storageImpl == nil {
240                 // We'd use mmap by default but HFS+ doesn't support sparse files.
241                 storageImplCloser := storage.NewFile(cfg.DataDir)
242                 cl.onClose = append(cl.onClose, func() {
243                         if err := storageImplCloser.Close(); err != nil {
244                                 cl.logger.Printf("error closing default storage: %s", err)
245                         }
246                 })
247                 storageImpl = storageImplCloser
248         }
249         cl.defaultStorage = storage.NewClient(storageImpl)
250
251         if cfg.PeerID != "" {
252                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
253         } else {
254                 o := copy(cl.peerID[:], cfg.Bep20)
255                 _, err = rand.Read(cl.peerID[o:])
256                 if err != nil {
257                         panic("error generating peer id")
258                 }
259         }
260
261         sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback, cl.logger)
262         if err != nil {
263                 return
264         }
265
266         // Check for panics.
267         cl.LocalPort()
268
269         for _, _s := range sockets {
270                 s := _s // Go is fucking retarded.
271                 cl.onClose = append(cl.onClose, func() { go s.Close() })
272                 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
273                         cl.dialers = append(cl.dialers, s)
274                         cl.listeners = append(cl.listeners, s)
275                         if cl.config.AcceptPeerConnections {
276                                 go cl.acceptConnections(s)
277                         }
278                 }
279         }
280
281         go cl.forwardPort()
282         if !cfg.NoDHT {
283                 for _, s := range sockets {
284                         if pc, ok := s.(net.PacketConn); ok {
285                                 ds, err := cl.NewAnacrolixDhtServer(pc)
286                                 if err != nil {
287                                         panic(err)
288                                 }
289                                 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
290                                 cl.onClose = append(cl.onClose, func() { ds.Close() })
291                         }
292                 }
293         }
294
295         cl.websocketTrackers = websocketTrackers{
296                 PeerId: cl.peerID,
297                 Logger: cl.logger,
298                 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
299                         cl.lock()
300                         defer cl.unlock()
301                         t, ok := cl.torrents[infoHash]
302                         if !ok {
303                                 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
304                         }
305                         return t.announceRequest(event), nil
306                 },
307                 Proxy:                      cl.config.HTTPProxy,
308                 WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
309                 DialContext:                cl.config.TrackerDialContext,
310                 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
311                         cl.lock()
312                         defer cl.unlock()
313                         t, ok := cl.torrents[dcc.InfoHash]
314                         if !ok {
315                                 cl.logger.WithDefaultLevel(log.Warning).Printf(
316                                         "got webrtc conn for unloaded torrent with infohash %x",
317                                         dcc.InfoHash,
318                                 )
319                                 dc.Close()
320                                 return
321                         }
322                         go t.onWebRtcConn(dc, dcc)
323                 },
324         }
325
326         return
327 }
328
329 func (cl *Client) AddDhtServer(d DhtServer) {
330         cl.dhtServers = append(cl.dhtServers, d)
331 }
332
333 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
334 // given address for any Torrent.
335 func (cl *Client) AddDialer(d Dialer) {
336         cl.lock()
337         defer cl.unlock()
338         cl.dialers = append(cl.dialers, d)
339         for _, t := range cl.torrents {
340                 t.openNewConns()
341         }
342 }
343
344 func (cl *Client) Listeners() []Listener {
345         return cl.listeners
346 }
347
348 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
349 // yourself.
350 func (cl *Client) AddListener(l Listener) {
351         cl.listeners = append(cl.listeners, l)
352         if cl.config.AcceptPeerConnections {
353                 go cl.acceptConnections(l)
354         }
355 }
356
357 func (cl *Client) firewallCallback(net.Addr) bool {
358         cl.rLock()
359         block := !cl.wantConns() || !cl.config.AcceptPeerConnections
360         cl.rUnlock()
361         if block {
362                 torrent.Add("connections firewalled", 1)
363         } else {
364                 torrent.Add("connections not firewalled", 1)
365         }
366         return block
367 }
368
369 func (cl *Client) listenOnNetwork(n network) bool {
370         if n.Ipv4 && cl.config.DisableIPv4 {
371                 return false
372         }
373         if n.Ipv6 && cl.config.DisableIPv6 {
374                 return false
375         }
376         if n.Tcp && cl.config.DisableTCP {
377                 return false
378         }
379         if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
380                 return false
381         }
382         return true
383 }
384
385 func (cl *Client) listenNetworks() (ns []network) {
386         for _, n := range allPeerNetworks {
387                 if cl.listenOnNetwork(n) {
388                         ns = append(ns, n)
389                 }
390         }
391         return
392 }
393
394 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
395 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
396         logger := cl.logger.WithNames("dht", conn.LocalAddr().String())
397         cfg := dht.ServerConfig{
398                 IPBlocklist:    cl.ipBlockList,
399                 Conn:           conn,
400                 OnAnnouncePeer: cl.onDHTAnnouncePeer,
401                 PublicIP: func() net.IP {
402                         if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
403                                 return cl.config.PublicIp6
404                         }
405                         return cl.config.PublicIp4
406                 }(),
407                 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
408                 OnQuery:       cl.config.DHTOnQuery,
409                 Logger:        logger,
410         }
411         if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
412                 f(&cfg)
413         }
414         s, err = dht.NewServer(&cfg)
415         if err == nil {
416                 go s.TableMaintainer()
417         }
418         return
419 }
420
421 func (cl *Client) Closed() events.Done {
422         return cl.closed.Done()
423 }
424
425 func (cl *Client) eachDhtServer(f func(DhtServer)) {
426         for _, ds := range cl.dhtServers {
427                 f(ds)
428         }
429 }
430
431 // Stops the client. All connections to peers are closed and all activity will come to a halt.
432 func (cl *Client) Close() (errs []error) {
433         var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
434         cl.lock()
435         for _, t := range cl.torrents {
436                 err := t.close(&closeGroup)
437                 if err != nil {
438                         errs = append(errs, err)
439                 }
440         }
441         for i := range cl.onClose {
442                 cl.onClose[len(cl.onClose)-1-i]()
443         }
444         cl.closed.Set()
445         cl.unlock()
446         cl.event.Broadcast()
447         closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
448         return
449 }
450
451 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
452         if cl.ipBlockList == nil {
453                 return
454         }
455         return cl.ipBlockList.Lookup(ip)
456 }
457
458 func (cl *Client) ipIsBlocked(ip net.IP) bool {
459         _, blocked := cl.ipBlockRange(ip)
460         return blocked
461 }
462
463 func (cl *Client) wantConns() bool {
464         if cl.config.AlwaysWantConns {
465                 return true
466         }
467         for _, t := range cl.torrents {
468                 if t.wantIncomingConns() {
469                         return true
470                 }
471         }
472         return false
473 }
474
475 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
476 func (cl *Client) rejectAccepted(conn net.Conn) error {
477         if !cl.wantConns() {
478                 return errors.New("don't want conns right now")
479         }
480         ra := conn.RemoteAddr()
481         if rip := addrIpOrNil(ra); rip != nil {
482                 if cl.config.DisableIPv4Peers && rip.To4() != nil {
483                         return errors.New("ipv4 peers disabled")
484                 }
485                 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
486                         return errors.New("ipv4 disabled")
487                 }
488                 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
489                         return errors.New("ipv6 disabled")
490                 }
491                 if cl.rateLimitAccept(rip) {
492                         return errors.New("source IP accepted rate limited")
493                 }
494                 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
495                         return errors.New("bad source addr")
496                 }
497         }
498         return nil
499 }
500
501 func (cl *Client) acceptConnections(l Listener) {
502         for {
503                 conn, err := l.Accept()
504                 torrent.Add("client listener accepts", 1)
505                 conn = pproffd.WrapNetConn(conn)
506                 cl.rLock()
507                 closed := cl.closed.IsSet()
508                 var reject error
509                 if !closed && conn != nil {
510                         reject = cl.rejectAccepted(conn)
511                 }
512                 cl.rUnlock()
513                 if closed {
514                         if conn != nil {
515                                 conn.Close()
516                         }
517                         return
518                 }
519                 if err != nil {
520                         log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger)
521                         continue
522                 }
523                 go func() {
524                         if reject != nil {
525                                 torrent.Add("rejected accepted connections", 1)
526                                 cl.logger.LazyLog(log.Debug, func() log.Msg {
527                                         return log.Fmsg("rejecting accepted conn: %v", reject)
528                                 })
529                                 conn.Close()
530                         } else {
531                                 go cl.incomingConnection(conn)
532                         }
533                         cl.logger.LazyLog(log.Debug, func() log.Msg {
534                                 return log.Fmsg("accepted %q connection at %q from %q",
535                                         l.Addr().Network(),
536                                         conn.LocalAddr(),
537                                         conn.RemoteAddr(),
538                                 )
539                         })
540                         torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
541                         torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
542                         torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
543                 }()
544         }
545 }
546
547 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
548 func regularNetConnPeerConnConnString(nc net.Conn) string {
549         return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
550 }
551
552 func (cl *Client) incomingConnection(nc net.Conn) {
553         defer nc.Close()
554         if tc, ok := nc.(*net.TCPConn); ok {
555                 tc.SetLinger(0)
556         }
557         remoteAddr, _ := tryIpPortFromNetAddr(nc.RemoteAddr())
558         c := cl.newConnection(
559                 nc,
560                 newConnectionOpts{
561                         outgoing:        false,
562                         remoteAddr:      nc.RemoteAddr(),
563                         localPublicAddr: cl.publicAddr(remoteAddr.IP),
564                         network:         nc.RemoteAddr().Network(),
565                         connString:      regularNetConnPeerConnConnString(nc),
566                 })
567         defer func() {
568                 cl.lock()
569                 defer cl.unlock()
570                 c.close()
571         }()
572         c.Discovery = PeerSourceIncoming
573         cl.runReceivedConn(c)
574 }
575
576 // Returns a handle to the given torrent, if it's present in the client.
577 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
578         cl.rLock()
579         defer cl.rUnlock()
580         t, ok = cl.torrents[ih]
581         return
582 }
583
584 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
585         return cl.torrents[ih]
586 }
587
588 type DialResult struct {
589         Conn   net.Conn
590         Dialer Dialer
591 }
592
593 func countDialResult(err error) {
594         if err == nil {
595                 torrent.Add("successful dials", 1)
596         } else {
597                 torrent.Add("unsuccessful dials", 1)
598         }
599 }
600
601 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit, pendingPeers int) (ret time.Duration) {
602         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
603         if ret < minDialTimeout {
604                 ret = minDialTimeout
605         }
606         return
607 }
608
609 // Returns whether an address is known to connect to a client with our own ID.
610 func (cl *Client) dopplegangerAddr(addr string) bool {
611         _, ok := cl.dopplegangerAddrs[addr]
612         return ok
613 }
614
615 // Returns a connection over UTP or TCP, whichever is first to connect.
616 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
617         return DialFirst(ctx, addr, cl.dialers)
618 }
619
620 // Returns a connection over UTP or TCP, whichever is first to connect.
621 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
622         pool := dialPool{
623                 addr: addr,
624         }
625         defer pool.startDrainer()
626         for _, _s := range dialers {
627                 pool.add(ctx, _s)
628         }
629         return pool.getFirst()
630 }
631
632 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
633         c, err := s.Dial(ctx, addr)
634         if err != nil {
635                 log.Levelf(log.Debug, "error dialing %q: %v", addr, err)
636         }
637         // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
638         // it now in case we close the connection forthwith. Note this is also done in the TCP dialer
639         // code to increase the chance it's done.
640         if tc, ok := c.(*net.TCPConn); ok {
641                 tc.SetLinger(0)
642         }
643         countDialResult(err)
644         return c
645 }
646
647 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string, attemptKey outgoingConnAttemptKey) {
648         path := t.getHalfOpenPath(addr, attemptKey)
649         if !path.Exists() {
650                 panic("should exist")
651         }
652         path.Delete()
653         cl.numHalfOpen--
654         if cl.numHalfOpen < 0 {
655                 panic("should not be possible")
656         }
657         for _, t := range cl.torrents {
658                 t.openNewConns()
659         }
660 }
661
662 func (cl *Client) countHalfOpenFromTorrents() (count int) {
663         for _, t := range cl.torrents {
664                 count += t.numHalfOpenAttempts()
665         }
666         return
667 }
668
669 // Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection
670 // for valid reasons.
671 func (cl *Client) initiateProtocolHandshakes(
672         ctx context.Context,
673         nc net.Conn,
674         t *Torrent,
675         encryptHeader bool,
676         newConnOpts newConnectionOpts,
677 ) (
678         c *PeerConn, err error,
679 ) {
680         c = cl.newConnection(nc, newConnOpts)
681         c.headerEncrypted = encryptHeader
682         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
683         defer cancel()
684         dl, ok := ctx.Deadline()
685         if !ok {
686                 panic(ctx)
687         }
688         err = nc.SetDeadline(dl)
689         if err != nil {
690                 panic(err)
691         }
692         err = cl.initiateHandshakes(c, t)
693         return
694 }
695
696 func doProtocolHandshakeOnDialResult(
697         t *Torrent,
698         obfuscatedHeader bool,
699         addr PeerRemoteAddr,
700         dr DialResult,
701 ) (
702         c *PeerConn, err error,
703 ) {
704         cl := t.cl
705         nc := dr.Conn
706         addrIpPort, _ := tryIpPortFromNetAddr(addr)
707         c, err = cl.initiateProtocolHandshakes(
708                 context.Background(), nc, t, obfuscatedHeader,
709                 newConnectionOpts{
710                         outgoing:   true,
711                         remoteAddr: addr,
712                         // It would be possible to retrieve a public IP from the dialer used here?
713                         localPublicAddr: cl.publicAddr(addrIpPort.IP),
714                         network:         dr.Dialer.DialerNetwork(),
715                         connString:      regularNetConnPeerConnConnString(nc),
716                 })
717         if err != nil {
718                 nc.Close()
719         }
720         return c, err
721 }
722
723 // Returns nil connection and nil error if no connection could be established for valid reasons.
724 func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, err error) {
725         torrent.Add("establish outgoing connection", 1)
726         addr := opts.peerInfo.Addr
727         dialPool := dialPool{
728                 resCh: make(chan DialResult),
729                 addr:  addr.String(),
730         }
731         defer dialPool.startDrainer()
732         dialTimeout := opts.t.getDialTimeoutUnlocked()
733         {
734                 ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
735                 defer cancel()
736                 for _, d := range cl.dialers {
737                         dialPool.add(ctx, d)
738                 }
739         }
740         holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr)
741         if holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) && opts.receivedHolepunchConnect {
742                 g.MakeMapIfNilAndSet(
743                         &cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect,
744                         holepunchAddr,
745                         struct{}{},
746                 )
747         }
748         headerObfuscationPolicy := opts.HeaderObfuscationPolicy
749         obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred
750         firstDialResult := dialPool.getFirst()
751         if firstDialResult.Conn == nil {
752                 // No dialers worked. Try to initiate a holepunching rendezvous.
753                 if holepunchAddrErr == nil {
754                         if !opts.receivedHolepunchConnect {
755                                 cl.lock()
756                                 g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
757                                 cl.unlock()
758                         }
759                         opts.t.startHolepunchRendezvous(holepunchAddr)
760                 }
761                 err = fmt.Errorf("all initial dials failed")
762                 return
763         }
764         if opts.receivedHolepunchConnect && holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
765                 g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
766         }
767         c, err = doProtocolHandshakeOnDialResult(
768                 opts.t,
769                 obfuscatedHeaderFirst,
770                 addr,
771                 firstDialResult,
772         )
773         if err == nil {
774                 torrent.Add("initiated conn with preferred header obfuscation", 1)
775                 return
776         }
777         // We should have just tried with the preferred header obfuscation. If it was required, there's nothing else to try.
778         if headerObfuscationPolicy.RequirePreferred {
779                 return
780         }
781         // Reuse the dialer that returned already but failed to handshake.
782         {
783                 ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
784                 defer cancel()
785                 dialPool.add(ctx, firstDialResult.Dialer)
786         }
787         secondDialResult := dialPool.getFirst()
788         if secondDialResult.Conn == nil {
789                 return
790         }
791         c, err = doProtocolHandshakeOnDialResult(
792                 opts.t,
793                 !obfuscatedHeaderFirst,
794                 addr,
795                 secondDialResult,
796         )
797         if err == nil {
798                 torrent.Add("initiated conn with fallback header obfuscation", 1)
799                 return
800         }
801         return
802 }
803
804 type outgoingConnOpts struct {
805         peerInfo PeerInfo
806         t        *Torrent
807         // Don't attempt to connect unless a connect message is received after initiating a rendezvous.
808         requireRendezvous bool
809         // Don't send rendezvous requests to eligible relays.
810         skipHolepunchRendezvous bool
811         // Outgoing connection attempt is in response to holepunch connect message.
812         receivedHolepunchConnect bool
813         HeaderObfuscationPolicy  HeaderObfuscationPolicy
814 }
815
816 // Called to dial out and run a connection. The addr we're given is already
817 // considered half-open.
818 func (cl *Client) outgoingConnection(
819         opts outgoingConnOpts,
820         attemptKey outgoingConnAttemptKey,
821 ) {
822         cl.dialRateLimiter.Wait(context.Background())
823         c, err := cl.dialAndCompleteHandshake(opts)
824         if err == nil {
825                 c.conn.SetWriteDeadline(time.Time{})
826         }
827         cl.lock()
828         defer cl.unlock()
829         // Don't release lock between here and addPeerConn, unless it's for failure.
830         cl.noLongerHalfOpen(opts.t, opts.peerInfo.Addr.String(), attemptKey)
831         if err != nil {
832                 if cl.config.Debug {
833                         cl.logger.Levelf(
834                                 log.Debug,
835                                 "error establishing outgoing connection to %v: %v",
836                                 opts.peerInfo.Addr,
837                                 err,
838                         )
839                 }
840                 return
841         }
842         defer c.close()
843         c.Discovery = opts.peerInfo.Source
844         c.trusted = opts.peerInfo.Trusted
845         opts.t.runHandshookConnLoggingErr(c)
846 }
847
848 // The port number for incoming peer connections. 0 if the client isn't listening.
849 func (cl *Client) incomingPeerPort() int {
850         return cl.LocalPort()
851 }
852
853 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
854         if c.headerEncrypted {
855                 var rw io.ReadWriter
856                 var err error
857                 rw, c.cryptoMethod, err = mse.InitiateHandshake(
858                         struct {
859                                 io.Reader
860                                 io.Writer
861                         }{c.r, c.w},
862                         t.infoHash[:],
863                         nil,
864                         cl.config.CryptoProvides,
865                 )
866                 c.setRW(rw)
867                 if err != nil {
868                         return fmt.Errorf("header obfuscation handshake: %w", err)
869                 }
870         }
871         ih, err := cl.connBtHandshake(c, &t.infoHash)
872         if err != nil {
873                 return fmt.Errorf("bittorrent protocol handshake: %w", err)
874         }
875         if ih != t.infoHash {
876                 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
877         }
878         return nil
879 }
880
881 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
882 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
883 func (cl *Client) forSkeys(f func([]byte) bool) {
884         cl.rLock()
885         defer cl.rUnlock()
886         if false { // Emulate the bug from #114
887                 var firstIh InfoHash
888                 for ih := range cl.torrents {
889                         firstIh = ih
890                         break
891                 }
892                 for range cl.torrents {
893                         if !f(firstIh[:]) {
894                                 break
895                         }
896                 }
897                 return
898         }
899         for ih := range cl.torrents {
900                 if !f(ih[:]) {
901                         break
902                 }
903         }
904 }
905
906 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
907         if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
908                 return ret
909         }
910         return cl.forSkeys
911 }
912
913 // Do encryption and bittorrent handshakes as receiver.
914 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
915         defer perf.ScopeTimerErr(&err)()
916         var rw io.ReadWriter
917         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
918         c.setRW(rw)
919         if err == nil || err == mse.ErrNoSecretKeyMatch {
920                 if c.headerEncrypted {
921                         torrent.Add("handshakes received encrypted", 1)
922                 } else {
923                         torrent.Add("handshakes received unencrypted", 1)
924                 }
925         } else {
926                 torrent.Add("handshakes received with error while handling encryption", 1)
927         }
928         if err != nil {
929                 if err == mse.ErrNoSecretKeyMatch {
930                         err = nil
931                 }
932                 return
933         }
934         if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
935                 err = errors.New("connection does not have required header obfuscation")
936                 return
937         }
938         ih, err := cl.connBtHandshake(c, nil)
939         if err != nil {
940                 return nil, fmt.Errorf("during bt handshake: %w", err)
941         }
942         cl.lock()
943         t = cl.torrents[ih]
944         cl.unlock()
945         return
946 }
947
948 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
949
950 func init() {
951         torrent.Set(
952                 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
953                 &successfulPeerWireProtocolHandshakePeerReservedBytes)
954 }
955
956 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
957         res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
958         if err != nil {
959                 return
960         }
961         successfulPeerWireProtocolHandshakePeerReservedBytes.Add(
962                 hex.EncodeToString(res.PeerExtensionBits[:]), 1)
963         ret = res.Hash
964         c.PeerExtensionBytes = res.PeerExtensionBits
965         c.PeerID = res.PeerID
966         c.completedHandshake = time.Now()
967         if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
968                 cb(c, res.Hash)
969         }
970         return
971 }
972
973 func (cl *Client) runReceivedConn(c *PeerConn) {
974         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
975         if err != nil {
976                 panic(err)
977         }
978         t, err := cl.receiveHandshakes(c)
979         if err != nil {
980                 cl.logger.LazyLog(log.Debug, func() log.Msg {
981                         return log.Fmsg(
982                                 "error receiving handshakes on %v: %s", c, err,
983                         ).Add(
984                                 "network", c.Network,
985                         )
986                 })
987                 torrent.Add("error receiving handshake", 1)
988                 cl.lock()
989                 cl.onBadAccept(c.RemoteAddr)
990                 cl.unlock()
991                 return
992         }
993         if t == nil {
994                 torrent.Add("received handshake for unloaded torrent", 1)
995                 cl.logger.LazyLog(log.Debug, func() log.Msg {
996                         return log.Fmsg("received handshake for unloaded torrent")
997                 })
998                 cl.lock()
999                 cl.onBadAccept(c.RemoteAddr)
1000                 cl.unlock()
1001                 return
1002         }
1003         torrent.Add("received handshake for loaded torrent", 1)
1004         c.conn.SetWriteDeadline(time.Time{})
1005         cl.lock()
1006         defer cl.unlock()
1007         t.runHandshookConnLoggingErr(c)
1008 }
1009
1010 // Client lock must be held before entering this.
1011 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
1012         c.setTorrent(t)
1013         for i, b := range cl.config.MinPeerExtensions {
1014                 if c.PeerExtensionBytes[i]&b != b {
1015                         return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes[:])
1016                 }
1017         }
1018         if c.PeerID == cl.peerID {
1019                 if c.outgoing {
1020                         connsToSelf.Add(1)
1021                         addr := c.RemoteAddr.String()
1022                         cl.dopplegangerAddrs[addr] = struct{}{}
1023                 } /* else {
1024                         // Because the remote address is not necessarily the same as its client's torrent listen
1025                         // address, we won't record the remote address as a doppleganger. Instead, the initiator
1026                         // can record *us* as the doppleganger.
1027                 } */
1028                 t.logger.Levelf(log.Debug, "local and remote peer ids are the same")
1029                 return nil
1030         }
1031         c.r = deadlineReader{c.conn, c.r}
1032         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
1033         if connIsIpv6(c.conn) {
1034                 torrent.Add("completed handshake over ipv6", 1)
1035         }
1036         if err := t.addPeerConn(c); err != nil {
1037                 return fmt.Errorf("adding connection: %w", err)
1038         }
1039         defer t.dropConnection(c)
1040         c.startMessageWriter()
1041         cl.sendInitialMessages(c, t)
1042         c.initUpdateRequestsTimer()
1043         err := c.mainReadLoop()
1044         if err != nil {
1045                 return fmt.Errorf("main read loop: %w", err)
1046         }
1047         return nil
1048 }
1049
1050 func (p *Peer) initUpdateRequestsTimer() {
1051         if check.Enabled {
1052                 if p.updateRequestsTimer != nil {
1053                         panic(p.updateRequestsTimer)
1054                 }
1055         }
1056         if enableUpdateRequestsTimer {
1057                 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
1058         }
1059 }
1060
1061 const peerUpdateRequestsTimerReason = "updateRequestsTimer"
1062
1063 func (c *Peer) updateRequestsTimerFunc() {
1064         c.locker().Lock()
1065         defer c.locker().Unlock()
1066         if c.closed.IsSet() {
1067                 return
1068         }
1069         if c.isLowOnRequests() {
1070                 // If there are no outstanding requests, then a request update should have already run.
1071                 return
1072         }
1073         if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
1074                 // These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's
1075                 // already been fired.
1076                 torrent.Add("spurious timer requests updates", 1)
1077                 return
1078         }
1079         c.updateRequests(peerUpdateRequestsTimerReason)
1080 }
1081
1082 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1083 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1084 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1085 const localClientReqq = 1024
1086
1087 // See the order given in Transmission's tr_peerMsgsNew.
1088 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1089         if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1090                 conn.write(pp.Message{
1091                         Type:       pp.Extended,
1092                         ExtendedID: pp.HandshakeExtendedID,
1093                         ExtendedPayload: func() []byte {
1094                                 msg := pp.ExtendedHandshakeMessage{
1095                                         M: map[pp.ExtensionName]pp.ExtensionNumber{
1096                                                 pp.ExtensionNameMetadata:  metadataExtendedId,
1097                                                 utHolepunch.ExtensionName: utHolepunchExtendedId,
1098                                         },
1099                                         V:            cl.config.ExtendedHandshakeClientVersion,
1100                                         Reqq:         localClientReqq,
1101                                         YourIp:       pp.CompactIp(conn.remoteIp()),
1102                                         Encryption:   cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1103                                         Port:         cl.incomingPeerPort(),
1104                                         MetadataSize: torrent.metadataSize(),
1105                                         // TODO: We can figure these out specific to the socket used.
1106                                         Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1107                                         Ipv6: cl.config.PublicIp6.To16(),
1108                                 }
1109                                 if !cl.config.DisablePEX {
1110                                         msg.M[pp.ExtensionNamePex] = pexExtendedId
1111                                 }
1112                                 return bencode.MustMarshal(msg)
1113                         }(),
1114                 })
1115         }
1116         func() {
1117                 if conn.fastEnabled() {
1118                         if torrent.haveAllPieces() {
1119                                 conn.write(pp.Message{Type: pp.HaveAll})
1120                                 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1121                                 return
1122                         } else if !torrent.haveAnyPieces() {
1123                                 conn.write(pp.Message{Type: pp.HaveNone})
1124                                 conn.sentHaves.Clear()
1125                                 return
1126                         }
1127                 }
1128                 conn.postBitfield()
1129         }()
1130         if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1131                 conn.write(pp.Message{
1132                         Type: pp.Port,
1133                         Port: cl.dhtPort(),
1134                 })
1135         }
1136 }
1137
1138 func (cl *Client) dhtPort() (ret uint16) {
1139         if len(cl.dhtServers) == 0 {
1140                 return
1141         }
1142         return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1143 }
1144
1145 func (cl *Client) haveDhtServer() bool {
1146         return len(cl.dhtServers) > 0
1147 }
1148
1149 // Process incoming ut_metadata message.
1150 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1151         var d pp.ExtendedMetadataRequestMsg
1152         err := bencode.Unmarshal(payload, &d)
1153         if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1154         } else if err != nil {
1155                 return fmt.Errorf("error unmarshalling bencode: %s", err)
1156         }
1157         piece := d.Piece
1158         switch d.Type {
1159         case pp.DataMetadataExtensionMsgType:
1160                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1161                 if !c.requestedMetadataPiece(piece) {
1162                         return fmt.Errorf("got unexpected piece %d", piece)
1163                 }
1164                 c.metadataRequests[piece] = false
1165                 begin := len(payload) - d.PieceSize()
1166                 if begin < 0 || begin >= len(payload) {
1167                         return fmt.Errorf("data has bad offset in payload: %d", begin)
1168                 }
1169                 t.saveMetadataPiece(piece, payload[begin:])
1170                 c.lastUsefulChunkReceived = time.Now()
1171                 err = t.maybeCompleteMetadata()
1172                 if err != nil {
1173                         // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1174                         // don't know who to blame. TODO: Also errors can be returned here that aren't related
1175                         // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1176                         // log consumers can filter for this message.
1177                         t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1178                 }
1179                 return err
1180         case pp.RequestMetadataExtensionMsgType:
1181                 if !t.haveMetadataPiece(piece) {
1182                         c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1183                         return nil
1184                 }
1185                 start := (1 << 14) * piece
1186                 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1187                 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1188                 return nil
1189         case pp.RejectMetadataExtensionMsgType:
1190                 return nil
1191         default:
1192                 return errors.New("unknown msg_type value")
1193         }
1194 }
1195
1196 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1197         if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1198                 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1199         }
1200         return false
1201 }
1202
1203 // Returns whether the IP address and port are considered "bad".
1204 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1205         if port == 0 || ip == nil {
1206                 return true
1207         }
1208         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1209                 return true
1210         }
1211         if _, ok := cl.ipBlockRange(ip); ok {
1212                 return true
1213         }
1214         ipAddr, ok := netip.AddrFromSlice(ip)
1215         if !ok {
1216                 panic(ip)
1217         }
1218         if _, ok := cl.badPeerIPs[ipAddr]; ok {
1219                 return true
1220         }
1221         return false
1222 }
1223
1224 // Return a Torrent ready for insertion into a Client.
1225 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1226         return cl.newTorrentOpt(AddTorrentOpts{
1227                 InfoHash: ih,
1228                 Storage:  specStorage,
1229         })
1230 }
1231
1232 // Return a Torrent ready for insertion into a Client.
1233 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1234         // use provided storage, if provided
1235         storageClient := cl.defaultStorage
1236         if opts.Storage != nil {
1237                 storageClient = storage.NewClient(opts.Storage)
1238         }
1239
1240         t = &Torrent{
1241                 cl:       cl,
1242                 infoHash: opts.InfoHash,
1243                 peers: prioritizedPeers{
1244                         om: gbtree.New(32),
1245                         getPrio: func(p PeerInfo) peerPriority {
1246                                 ipPort := p.addr()
1247                                 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1248                         },
1249                 },
1250                 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1251
1252                 storageOpener:       storageClient,
1253                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1254
1255                 metadataChanged: sync.Cond{
1256                         L: cl.locker(),
1257                 },
1258                 webSeeds:     make(map[string]*Peer),
1259                 gotMetainfoC: make(chan struct{}),
1260         }
1261         t.smartBanCache.Hash = sha1.Sum
1262         t.smartBanCache.Init()
1263         t.networkingEnabled.Set()
1264         t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString()).WithDefaultLevel(log.Debug)
1265         t.sourcesLogger = t.logger.WithNames("sources")
1266         if opts.ChunkSize == 0 {
1267                 opts.ChunkSize = defaultChunkSize
1268         }
1269         t.setChunkSize(opts.ChunkSize)
1270         return
1271 }
1272
1273 // A file-like handle to some torrent data resource.
1274 type Handle interface {
1275         io.Reader
1276         io.Seeker
1277         io.Closer
1278         io.ReaderAt
1279 }
1280
1281 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1282         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1283 }
1284
1285 // Adds a torrent by InfoHash with a custom Storage implementation.
1286 // If the torrent already exists then this Storage is ignored and the
1287 // existing torrent returned with `new` set to `false`
1288 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1289         cl.lock()
1290         defer cl.unlock()
1291         t, ok := cl.torrents[infoHash]
1292         if ok {
1293                 return
1294         }
1295         new = true
1296
1297         t = cl.newTorrent(infoHash, specStorage)
1298         cl.eachDhtServer(func(s DhtServer) {
1299                 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1300                         go t.dhtAnnouncer(s)
1301                 }
1302         })
1303         cl.torrents[infoHash] = t
1304         cl.clearAcceptLimits()
1305         t.updateWantPeersEvent()
1306         // Tickle Client.waitAccept, new torrent may want conns.
1307         cl.event.Broadcast()
1308         return
1309 }
1310
1311 // Adds a torrent by InfoHash with a custom Storage implementation. If the torrent already exists
1312 // then this Storage is ignored and the existing torrent returned with `new` set to `false`.
1313 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1314         infoHash := opts.InfoHash
1315         cl.lock()
1316         defer cl.unlock()
1317         t, ok := cl.torrents[infoHash]
1318         if ok {
1319                 return
1320         }
1321         new = true
1322
1323         t = cl.newTorrentOpt(opts)
1324         cl.eachDhtServer(func(s DhtServer) {
1325                 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1326                         go t.dhtAnnouncer(s)
1327                 }
1328         })
1329         cl.torrents[infoHash] = t
1330         t.setInfoBytesLocked(opts.InfoBytes)
1331         cl.clearAcceptLimits()
1332         t.updateWantPeersEvent()
1333         // Tickle Client.waitAccept, new torrent may want conns.
1334         cl.event.Broadcast()
1335         return
1336 }
1337
1338 type AddTorrentOpts struct {
1339         InfoHash  infohash.T
1340         Storage   storage.ClientImpl
1341         ChunkSize pp.Integer
1342         InfoBytes []byte
1343 }
1344
1345 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1346 // Torrent.MergeSpec.
1347 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1348         t, new = cl.AddTorrentOpt(AddTorrentOpts{
1349                 InfoHash:  spec.InfoHash,
1350                 Storage:   spec.Storage,
1351                 ChunkSize: spec.ChunkSize,
1352         })
1353         modSpec := *spec
1354         if new {
1355                 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1356                 // it.
1357                 modSpec.ChunkSize = 0
1358         }
1359         err = t.MergeSpec(&modSpec)
1360         if err != nil && new {
1361                 t.Drop()
1362         }
1363         return
1364 }
1365
1366 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1367 // spec.DisallowDataDownload/Upload will be read and applied
1368 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1369 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1370         if spec.DisplayName != "" {
1371                 t.SetDisplayName(spec.DisplayName)
1372         }
1373         if spec.InfoBytes != nil {
1374                 err := t.SetInfoBytes(spec.InfoBytes)
1375                 if err != nil {
1376                         return err
1377                 }
1378         }
1379         cl := t.cl
1380         cl.AddDhtNodes(spec.DhtNodes)
1381         t.UseSources(spec.Sources)
1382         cl.lock()
1383         defer cl.unlock()
1384         t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1385         for _, url := range spec.Webseeds {
1386                 t.addWebSeed(url)
1387         }
1388         for _, peerAddr := range spec.PeerAddrs {
1389                 t.addPeer(PeerInfo{
1390                         Addr:    StringAddr(peerAddr),
1391                         Source:  PeerSourceDirect,
1392                         Trusted: true,
1393                 })
1394         }
1395         if spec.ChunkSize != 0 {
1396                 panic("chunk size cannot be changed for existing Torrent")
1397         }
1398         t.addTrackers(spec.Trackers)
1399         t.maybeNewConns()
1400         t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1401         t.dataUploadDisallowed = spec.DisallowDataUpload
1402         return nil
1403 }
1404
1405 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1406         t, ok := cl.torrents[infoHash]
1407         if !ok {
1408                 err = fmt.Errorf("no such torrent")
1409                 return
1410         }
1411         err = t.close(wg)
1412         delete(cl.torrents, infoHash)
1413         return
1414 }
1415
1416 func (cl *Client) allTorrentsCompleted() bool {
1417         for _, t := range cl.torrents {
1418                 if !t.haveInfo() {
1419                         return false
1420                 }
1421                 if !t.haveAllPieces() {
1422                         return false
1423                 }
1424         }
1425         return true
1426 }
1427
1428 // Returns true when all torrents are completely downloaded and false if the
1429 // client is stopped before that.
1430 func (cl *Client) WaitAll() bool {
1431         cl.lock()
1432         defer cl.unlock()
1433         for !cl.allTorrentsCompleted() {
1434                 if cl.closed.IsSet() {
1435                         return false
1436                 }
1437                 cl.event.Wait()
1438         }
1439         return true
1440 }
1441
1442 // Returns handles to all the torrents loaded in the Client.
1443 func (cl *Client) Torrents() []*Torrent {
1444         cl.rLock()
1445         defer cl.rUnlock()
1446         return cl.torrentsAsSlice()
1447 }
1448
1449 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1450         for _, t := range cl.torrents {
1451                 ret = append(ret, t)
1452         }
1453         return
1454 }
1455
1456 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1457         spec, err := TorrentSpecFromMagnetUri(uri)
1458         if err != nil {
1459                 return
1460         }
1461         T, _, err = cl.AddTorrentSpec(spec)
1462         return
1463 }
1464
1465 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1466         ts, err := TorrentSpecFromMetaInfoErr(mi)
1467         if err != nil {
1468                 return
1469         }
1470         T, _, err = cl.AddTorrentSpec(ts)
1471         return
1472 }
1473
1474 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1475         mi, err := metainfo.LoadFromFile(filename)
1476         if err != nil {
1477                 return
1478         }
1479         return cl.AddTorrent(mi)
1480 }
1481
1482 func (cl *Client) DhtServers() []DhtServer {
1483         return cl.dhtServers
1484 }
1485
1486 func (cl *Client) AddDhtNodes(nodes []string) {
1487         for _, n := range nodes {
1488                 hmp := missinggo.SplitHostMaybePort(n)
1489                 ip := net.ParseIP(hmp.Host)
1490                 if ip == nil {
1491                         cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1492                         continue
1493                 }
1494                 ni := krpc.NodeInfo{
1495                         Addr: krpc.NodeAddr{
1496                                 IP:   ip,
1497                                 Port: hmp.Port,
1498                         },
1499                 }
1500                 cl.eachDhtServer(func(s DhtServer) {
1501                         s.AddNode(ni)
1502                 })
1503         }
1504 }
1505
1506 func (cl *Client) banPeerIP(ip net.IP) {
1507         // We can't take this from string, because it will lose netip's v4on6. net.ParseIP parses v4
1508         // addresses directly to v4on6, which doesn't compare equal with v4.
1509         ipAddr, ok := netip.AddrFromSlice(ip)
1510         if !ok {
1511                 panic(ip)
1512         }
1513         g.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{})
1514         for _, t := range cl.torrents {
1515                 t.iterPeers(func(p *Peer) {
1516                         if p.remoteIp().Equal(ip) {
1517                                 t.logger.Levelf(log.Warning, "dropping peer %v with banned ip %v", p, ip)
1518                                 // Should this be a close?
1519                                 p.drop()
1520                         }
1521                 })
1522         }
1523 }
1524
1525 type newConnectionOpts struct {
1526         outgoing        bool
1527         remoteAddr      PeerRemoteAddr
1528         localPublicAddr peerLocalPublicAddr
1529         network         string
1530         connString      string
1531 }
1532
1533 func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) {
1534         if opts.network == "" {
1535                 panic(opts.remoteAddr)
1536         }
1537         c = &PeerConn{
1538                 Peer: Peer{
1539                         outgoing:        opts.outgoing,
1540                         choking:         true,
1541                         peerChoking:     true,
1542                         PeerMaxRequests: 250,
1543
1544                         RemoteAddr:      opts.remoteAddr,
1545                         localPublicAddr: opts.localPublicAddr,
1546                         Network:         opts.network,
1547                         callbacks:       &cl.config.Callbacks,
1548                 },
1549                 connString: opts.connString,
1550                 conn:       nc,
1551         }
1552         c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn
1553         c.initRequestState()
1554         // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
1555         if opts.remoteAddr != nil {
1556                 netipAddrPort, err := netip.ParseAddrPort(opts.remoteAddr.String())
1557                 if err == nil {
1558                         c.bannableAddr = Some(netipAddrPort.Addr())
1559                 }
1560         }
1561         c.peerImpl = c
1562         c.logger = cl.logger.WithDefaultLevel(log.Warning)
1563         c.setRW(connStatsReadWriter{nc, c})
1564         c.r = &rateLimitedReader{
1565                 l: cl.config.DownloadRateLimiter,
1566                 r: c.r,
1567         }
1568         c.logger.Levelf(
1569                 log.Debug,
1570                 "new PeerConn %p [Client %p remoteAddr %v network %v outgoing %t]",
1571                 c, cl, opts.remoteAddr, opts.network, opts.outgoing,
1572         )
1573         for _, f := range cl.config.Callbacks.NewPeer {
1574                 f(&c.Peer)
1575         }
1576         return
1577 }
1578
1579 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1580         cl.lock()
1581         defer cl.unlock()
1582         t := cl.torrent(ih)
1583         if t == nil {
1584                 return
1585         }
1586         t.addPeers([]PeerInfo{{
1587                 Addr:   ipPortAddr{ip, port},
1588                 Source: PeerSourceDhtAnnouncePeer,
1589         }})
1590 }
1591
1592 func firstNotNil(ips ...net.IP) net.IP {
1593         for _, ip := range ips {
1594                 if ip != nil {
1595                         return ip
1596                 }
1597         }
1598         return nil
1599 }
1600
1601 func (cl *Client) eachListener(f func(Listener) bool) {
1602         for _, s := range cl.listeners {
1603                 if !f(s) {
1604                         break
1605                 }
1606         }
1607 }
1608
1609 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1610         for i := 0; i < len(cl.listeners); i += 1 {
1611                 if ret = cl.listeners[i]; f(ret) {
1612                         return
1613                 }
1614         }
1615         return nil
1616 }
1617
1618 func (cl *Client) publicIp(peer net.IP) net.IP {
1619         // TODO: Use BEP 10 to determine how peers are seeing us.
1620         if peer.To4() != nil {
1621                 return firstNotNil(
1622                         cl.config.PublicIp4,
1623                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1624                 )
1625         }
1626
1627         return firstNotNil(
1628                 cl.config.PublicIp6,
1629                 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1630         )
1631 }
1632
1633 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1634         l := cl.findListener(
1635                 func(l Listener) bool {
1636                         return f(addrIpOrNil(l.Addr()))
1637                 },
1638         )
1639         if l == nil {
1640                 return nil
1641         }
1642         return addrIpOrNil(l.Addr())
1643 }
1644
1645 // Our IP as a peer should see it.
1646 func (cl *Client) publicAddr(peer net.IP) IpPort {
1647         return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1648 }
1649
1650 // ListenAddrs addresses currently being listened to.
1651 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1652         cl.lock()
1653         ret = make([]net.Addr, len(cl.listeners))
1654         for i := 0; i < len(cl.listeners); i += 1 {
1655                 ret[i] = cl.listeners[i].Addr()
1656         }
1657         cl.unlock()
1658         return
1659 }
1660
1661 func (cl *Client) PublicIPs() (ips []net.IP) {
1662         if ip := cl.config.PublicIp4; ip != nil {
1663                 ips = append(ips, ip)
1664         }
1665         if ip := cl.config.PublicIp6; ip != nil {
1666                 ips = append(ips, ip)
1667         }
1668         return
1669 }
1670
1671 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1672         ipa, ok := tryIpPortFromNetAddr(addr)
1673         if !ok {
1674                 return
1675         }
1676         ip := maskIpForAcceptLimiting(ipa.IP)
1677         if cl.acceptLimiter == nil {
1678                 cl.acceptLimiter = make(map[ipStr]int)
1679         }
1680         cl.acceptLimiter[ipStr(ip.String())]++
1681 }
1682
1683 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1684         if ip4 := ip.To4(); ip4 != nil {
1685                 return ip4.Mask(net.CIDRMask(24, 32))
1686         }
1687         return ip
1688 }
1689
1690 func (cl *Client) clearAcceptLimits() {
1691         cl.acceptLimiter = nil
1692 }
1693
1694 func (cl *Client) acceptLimitClearer() {
1695         for {
1696                 select {
1697                 case <-cl.closed.Done():
1698                         return
1699                 case <-time.After(15 * time.Minute):
1700                         cl.lock()
1701                         cl.clearAcceptLimits()
1702                         cl.unlock()
1703                 }
1704         }
1705 }
1706
1707 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1708         if cl.config.DisableAcceptRateLimiting {
1709                 return false
1710         }
1711         return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1712 }
1713
1714 func (cl *Client) rLock() {
1715         cl._mu.RLock()
1716 }
1717
1718 func (cl *Client) rUnlock() {
1719         cl._mu.RUnlock()
1720 }
1721
1722 func (cl *Client) lock() {
1723         cl._mu.Lock()
1724 }
1725
1726 func (cl *Client) unlock() {
1727         cl._mu.Unlock()
1728 }
1729
1730 func (cl *Client) locker() *lockWithDeferreds {
1731         return &cl._mu
1732 }
1733
1734 func (cl *Client) String() string {
1735         return fmt.Sprintf("<%[1]T %[1]p>", cl)
1736 }
1737
1738 // Returns connection-level aggregate connStats at the Client level. See the comment on
1739 // TorrentStats.ConnStats.
1740 func (cl *Client) ConnStats() ConnStats {
1741         return cl.connStats.Copy()
1742 }
1743
1744 func (cl *Client) Stats() ClientStats {
1745         cl.rLock()
1746         defer cl.rUnlock()
1747         return cl.statsLocked()
1748 }
1749
1750 func (cl *Client) statsLocked() (stats ClientStats) {
1751         stats.ConnStats = cl.connStats.Copy()
1752         stats.ActiveHalfOpenAttempts = cl.numHalfOpen
1753         stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect =
1754                 len(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect)
1755         stats.NumPeersDialableOnlyAfterHolepunch =
1756                 len(cl.dialableOnlyAfterHolepunch)
1757         return
1758 }