]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Add a timeout to fs tests in CI
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "context"
6         "crypto/rand"
7         "encoding/binary"
8         "encoding/hex"
9         "errors"
10         "expvar"
11         "fmt"
12         "io"
13         "math"
14         "net"
15         "net/http"
16         "net/netip"
17         "sort"
18         "strconv"
19         "time"
20
21         "github.com/anacrolix/chansync"
22         "github.com/anacrolix/chansync/events"
23         "github.com/anacrolix/dht/v2"
24         "github.com/anacrolix/dht/v2/krpc"
25         . "github.com/anacrolix/generics"
26         g "github.com/anacrolix/generics"
27         "github.com/anacrolix/log"
28         "github.com/anacrolix/missinggo/perf"
29         "github.com/anacrolix/missinggo/v2"
30         "github.com/anacrolix/missinggo/v2/bitmap"
31         "github.com/anacrolix/missinggo/v2/pproffd"
32         "github.com/anacrolix/sync"
33         "github.com/cespare/xxhash"
34         "github.com/davecgh/go-spew/spew"
35         "github.com/dustin/go-humanize"
36         gbtree "github.com/google/btree"
37         "github.com/pion/datachannel"
38
39         "github.com/anacrolix/torrent/bencode"
40         "github.com/anacrolix/torrent/internal/check"
41         "github.com/anacrolix/torrent/internal/limiter"
42         "github.com/anacrolix/torrent/iplist"
43         "github.com/anacrolix/torrent/metainfo"
44         "github.com/anacrolix/torrent/mse"
45         pp "github.com/anacrolix/torrent/peer_protocol"
46         request_strategy "github.com/anacrolix/torrent/request-strategy"
47         "github.com/anacrolix/torrent/storage"
48         "github.com/anacrolix/torrent/tracker"
49         "github.com/anacrolix/torrent/types/infohash"
50         "github.com/anacrolix/torrent/webtorrent"
51 )
52
53 // Clients contain zero or more Torrents. A Client manages a blocklist, the
54 // TCP/UDP protocol ports, and DHT as desired.
55 type Client struct {
56         // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
57         // fields. See #262.
58         connStats ConnStats
59
60         _mu    lockWithDeferreds
61         event  sync.Cond
62         closed chansync.SetOnce
63
64         config *ClientConfig
65         logger log.Logger
66
67         peerID         PeerID
68         defaultStorage *storage.Client
69         onClose        []func()
70         dialers        []Dialer
71         listeners      []Listener
72         dhtServers     []DhtServer
73         ipBlockList    iplist.Ranger
74
75         // Set of addresses that have our client ID. This intentionally will
76         // include ourselves if we end up trying to connect to our own address
77         // through legitimate channels.
78         dopplegangerAddrs map[string]struct{}
79         badPeerIPs        map[netip.Addr]struct{}
80         torrents          map[InfoHash]*Torrent
81         pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
82
83         acceptLimiter map[ipStr]int
84         numHalfOpen   int
85
86         websocketTrackers websocketTrackers
87
88         activeAnnounceLimiter limiter.Instance
89         httpClient            *http.Client
90
91         clientHolepunchAddrSets
92
93         defaultLocalLtepProtocolMap LocalLtepProtocolMap
94 }
95
96 type ipStr string
97
98 func (cl *Client) BadPeerIPs() (ips []string) {
99         cl.rLock()
100         ips = cl.badPeerIPsLocked()
101         cl.rUnlock()
102         return
103 }
104
105 func (cl *Client) badPeerIPsLocked() (ips []string) {
106         ips = make([]string, len(cl.badPeerIPs))
107         i := 0
108         for k := range cl.badPeerIPs {
109                 ips[i] = k.String()
110                 i += 1
111         }
112         return
113 }
114
115 func (cl *Client) PeerID() PeerID {
116         return cl.peerID
117 }
118
119 // Returns the port number for the first listener that has one. No longer assumes that all port
120 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
121 // found.
122 func (cl *Client) LocalPort() (port int) {
123         for i := 0; i < len(cl.listeners); i += 1 {
124                 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
125                         return
126                 }
127         }
128         return
129 }
130
131 func writeDhtServerStatus(w io.Writer, s DhtServer) {
132         dhtStats := s.Stats()
133         fmt.Fprintf(w, " ID: %x\n", s.ID())
134         spew.Fdump(w, dhtStats)
135 }
136
137 // Writes out a human readable status of the client, such as for writing to a
138 // HTTP status page.
139 func (cl *Client) WriteStatus(_w io.Writer) {
140         cl.rLock()
141         defer cl.rUnlock()
142         w := bufio.NewWriter(_w)
143         defer w.Flush()
144         fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
145         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
146         fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
147         fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
148         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
149         cl.eachDhtServer(func(s DhtServer) {
150                 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
151                 writeDhtServerStatus(w, s)
152         })
153         dumpStats(w, cl.statsLocked())
154         torrentsSlice := cl.torrentsAsSlice()
155         fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
156         fmt.Fprintln(w)
157         sort.Slice(torrentsSlice, func(l, r int) bool {
158                 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
159         })
160         for _, t := range torrentsSlice {
161                 if t.name() == "" {
162                         fmt.Fprint(w, "<unknown name>")
163                 } else {
164                         fmt.Fprint(w, t.name())
165                 }
166                 fmt.Fprint(w, "\n")
167                 if t.info != nil {
168                         fmt.Fprintf(
169                                 w,
170                                 "%f%% of %d bytes (%s)",
171                                 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
172                                 t.length(),
173                                 humanize.Bytes(uint64(t.length())))
174                 } else {
175                         w.WriteString("<missing metainfo>")
176                 }
177                 fmt.Fprint(w, "\n")
178                 t.writeStatus(w)
179                 fmt.Fprintln(w)
180         }
181 }
182
183 func (cl *Client) initLogger() {
184         logger := cl.config.Logger
185         if logger.IsZero() {
186                 logger = log.Default
187         }
188         if cl.config.Debug {
189                 logger = logger.FilterLevel(log.Debug)
190         }
191         cl.logger = logger.WithValues(cl)
192 }
193
194 func (cl *Client) announceKey() int32 {
195         return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
196 }
197
198 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
199 func (cl *Client) init(cfg *ClientConfig) {
200         cl.config = cfg
201         g.MakeMap(&cl.dopplegangerAddrs)
202         cl.torrents = make(map[metainfo.Hash]*Torrent)
203         cl.activeAnnounceLimiter.SlotsPerKey = 2
204         cl.event.L = cl.locker()
205         cl.ipBlockList = cfg.IPBlocklist
206         cl.httpClient = &http.Client{
207                 Transport: cfg.WebTransport,
208         }
209         if cl.httpClient.Transport == nil {
210                 cl.httpClient.Transport = &http.Transport{
211                         Proxy:       cfg.HTTPProxy,
212                         DialContext: cfg.HTTPDialContext,
213                         // I think this value was observed from some webseeds. It seems reasonable to extend it
214                         // to other uses of HTTP from the client.
215                         MaxConnsPerHost: 10,
216                 }
217         }
218         cl.defaultLocalLtepProtocolMap = makeBuiltinLtepProtocols(!cfg.DisablePEX)
219 }
220
221 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
222         if cfg == nil {
223                 cfg = NewDefaultClientConfig()
224                 cfg.ListenPort = 0
225         }
226         cl = &Client{}
227         cl.init(cfg)
228         go cl.acceptLimitClearer()
229         cl.initLogger()
230         defer func() {
231                 if err != nil {
232                         cl.Close()
233                         cl = nil
234                 }
235         }()
236
237         storageImpl := cfg.DefaultStorage
238         if storageImpl == nil {
239                 // We'd use mmap by default but HFS+ doesn't support sparse files.
240                 storageImplCloser := storage.NewFile(cfg.DataDir)
241                 cl.onClose = append(cl.onClose, func() {
242                         if err := storageImplCloser.Close(); err != nil {
243                                 cl.logger.Printf("error closing default storage: %s", err)
244                         }
245                 })
246                 storageImpl = storageImplCloser
247         }
248         cl.defaultStorage = storage.NewClient(storageImpl)
249
250         if cfg.PeerID != "" {
251                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
252         } else {
253                 o := copy(cl.peerID[:], cfg.Bep20)
254                 _, err = rand.Read(cl.peerID[o:])
255                 if err != nil {
256                         panic("error generating peer id")
257                 }
258         }
259
260         builtinListenNetworks := cl.listenNetworks()
261         sockets, err := listenAll(
262                 builtinListenNetworks,
263                 cl.config.ListenHost,
264                 cl.config.ListenPort,
265                 cl.firewallCallback,
266                 cl.logger,
267         )
268         if err != nil {
269                 return
270         }
271         if len(sockets) == 0 && len(builtinListenNetworks) != 0 {
272                 err = fmt.Errorf("no sockets created for networks %v", builtinListenNetworks)
273                 return
274         }
275
276         // Check for panics.
277         cl.LocalPort()
278
279         for _, _s := range sockets {
280                 s := _s // Go is fucking retarded.
281                 cl.onClose = append(cl.onClose, func() { go s.Close() })
282                 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
283                         cl.dialers = append(cl.dialers, s)
284                         cl.listeners = append(cl.listeners, s)
285                         if cl.config.AcceptPeerConnections {
286                                 go cl.acceptConnections(s)
287                         }
288                 }
289         }
290
291         go cl.forwardPort()
292         if !cfg.NoDHT {
293                 for _, s := range sockets {
294                         if pc, ok := s.(net.PacketConn); ok {
295                                 ds, err := cl.NewAnacrolixDhtServer(pc)
296                                 if err != nil {
297                                         panic(err)
298                                 }
299                                 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
300                                 cl.onClose = append(cl.onClose, func() { ds.Close() })
301                         }
302                 }
303         }
304
305         cl.websocketTrackers = websocketTrackers{
306                 PeerId: cl.peerID,
307                 Logger: cl.logger,
308                 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
309                         cl.lock()
310                         defer cl.unlock()
311                         t, ok := cl.torrents[infoHash]
312                         if !ok {
313                                 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
314                         }
315                         return t.announceRequest(event), nil
316                 },
317                 Proxy:                      cl.config.HTTPProxy,
318                 WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
319                 ICEServers:                 cl.config.ICEServers,
320                 DialContext:                cl.config.TrackerDialContext,
321                 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
322                         cl.lock()
323                         defer cl.unlock()
324                         t, ok := cl.torrents[dcc.InfoHash]
325                         if !ok {
326                                 cl.logger.WithDefaultLevel(log.Warning).Printf(
327                                         "got webrtc conn for unloaded torrent with infohash %x",
328                                         dcc.InfoHash,
329                                 )
330                                 dc.Close()
331                                 return
332                         }
333                         go t.onWebRtcConn(dc, dcc)
334                 },
335         }
336
337         return
338 }
339
340 func (cl *Client) AddDhtServer(d DhtServer) {
341         cl.dhtServers = append(cl.dhtServers, d)
342 }
343
344 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
345 // given address for any Torrent.
346 func (cl *Client) AddDialer(d Dialer) {
347         cl.lock()
348         defer cl.unlock()
349         cl.dialers = append(cl.dialers, d)
350         for _, t := range cl.torrents {
351                 t.openNewConns()
352         }
353 }
354
355 func (cl *Client) Listeners() []Listener {
356         return cl.listeners
357 }
358
359 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
360 // yourself.
361 func (cl *Client) AddListener(l Listener) {
362         cl.listeners = append(cl.listeners, l)
363         if cl.config.AcceptPeerConnections {
364                 go cl.acceptConnections(l)
365         }
366 }
367
368 func (cl *Client) firewallCallback(net.Addr) bool {
369         cl.rLock()
370         block := !cl.wantConns() || !cl.config.AcceptPeerConnections
371         cl.rUnlock()
372         if block {
373                 torrent.Add("connections firewalled", 1)
374         } else {
375                 torrent.Add("connections not firewalled", 1)
376         }
377         return block
378 }
379
380 func (cl *Client) listenOnNetwork(n network) bool {
381         if n.Ipv4 && cl.config.DisableIPv4 {
382                 return false
383         }
384         if n.Ipv6 && cl.config.DisableIPv6 {
385                 return false
386         }
387         if n.Tcp && cl.config.DisableTCP {
388                 return false
389         }
390         if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
391                 return false
392         }
393         return true
394 }
395
396 func (cl *Client) listenNetworks() (ns []network) {
397         for _, n := range allPeerNetworks {
398                 if cl.listenOnNetwork(n) {
399                         ns = append(ns, n)
400                 }
401         }
402         return
403 }
404
405 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
406 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
407         logger := cl.logger.WithNames("dht", conn.LocalAddr().String())
408         cfg := dht.ServerConfig{
409                 IPBlocklist:    cl.ipBlockList,
410                 Conn:           conn,
411                 OnAnnouncePeer: cl.onDHTAnnouncePeer,
412                 PublicIP: func() net.IP {
413                         if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
414                                 return cl.config.PublicIp6
415                         }
416                         return cl.config.PublicIp4
417                 }(),
418                 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
419                 OnQuery:       cl.config.DHTOnQuery,
420                 Logger:        logger,
421         }
422         if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
423                 f(&cfg)
424         }
425         s, err = dht.NewServer(&cfg)
426         if err == nil {
427                 go s.TableMaintainer()
428         }
429         return
430 }
431
432 func (cl *Client) Closed() events.Done {
433         return cl.closed.Done()
434 }
435
436 func (cl *Client) eachDhtServer(f func(DhtServer)) {
437         for _, ds := range cl.dhtServers {
438                 f(ds)
439         }
440 }
441
442 // Stops the client. All connections to peers are closed and all activity will come to a halt.
443 func (cl *Client) Close() (errs []error) {
444         var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
445         cl.lock()
446         for _, t := range cl.torrents {
447                 err := t.close(&closeGroup)
448                 if err != nil {
449                         errs = append(errs, err)
450                 }
451         }
452         for i := range cl.onClose {
453                 cl.onClose[len(cl.onClose)-1-i]()
454         }
455         cl.closed.Set()
456         cl.unlock()
457         cl.event.Broadcast()
458         closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
459         return
460 }
461
462 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
463         if cl.ipBlockList == nil {
464                 return
465         }
466         return cl.ipBlockList.Lookup(ip)
467 }
468
469 func (cl *Client) ipIsBlocked(ip net.IP) bool {
470         _, blocked := cl.ipBlockRange(ip)
471         return blocked
472 }
473
474 func (cl *Client) wantConns() bool {
475         if cl.config.AlwaysWantConns {
476                 return true
477         }
478         for _, t := range cl.torrents {
479                 if t.wantIncomingConns() {
480                         return true
481                 }
482         }
483         return false
484 }
485
486 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
487 func (cl *Client) rejectAccepted(conn net.Conn) error {
488         if !cl.wantConns() {
489                 return errors.New("don't want conns right now")
490         }
491         ra := conn.RemoteAddr()
492         if rip := addrIpOrNil(ra); rip != nil {
493                 if cl.config.DisableIPv4Peers && rip.To4() != nil {
494                         return errors.New("ipv4 peers disabled")
495                 }
496                 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
497                         return errors.New("ipv4 disabled")
498                 }
499                 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
500                         return errors.New("ipv6 disabled")
501                 }
502                 if cl.rateLimitAccept(rip) {
503                         return errors.New("source IP accepted rate limited")
504                 }
505                 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
506                         return errors.New("bad source addr")
507                 }
508         }
509         return nil
510 }
511
512 func (cl *Client) acceptConnections(l Listener) {
513         for {
514                 conn, err := l.Accept()
515                 torrent.Add("client listener accepts", 1)
516                 if err == nil {
517                         holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr())
518                         if holepunchErr == nil {
519                                 cl.lock()
520                                 if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
521                                         setAdd(&cl.accepted, holepunchAddr)
522                                 }
523                                 if g.MapContains(
524                                         cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
525                                         holepunchAddr,
526                                 ) {
527                                         setAdd(&cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr)
528                                 }
529                                 cl.unlock()
530                         }
531                 }
532                 conn = pproffd.WrapNetConn(conn)
533                 cl.rLock()
534                 closed := cl.closed.IsSet()
535                 var reject error
536                 if !closed && conn != nil {
537                         reject = cl.rejectAccepted(conn)
538                 }
539                 cl.rUnlock()
540                 if closed {
541                         if conn != nil {
542                                 conn.Close()
543                         }
544                         return
545                 }
546                 if err != nil {
547                         log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger)
548                         continue
549                 }
550                 go func() {
551                         if reject != nil {
552                                 torrent.Add("rejected accepted connections", 1)
553                                 cl.logger.LazyLog(log.Debug, func() log.Msg {
554                                         return log.Fmsg("rejecting accepted conn: %v", reject)
555                                 })
556                                 conn.Close()
557                         } else {
558                                 go cl.incomingConnection(conn)
559                         }
560                         cl.logger.LazyLog(log.Debug, func() log.Msg {
561                                 return log.Fmsg("accepted %q connection at %q from %q",
562                                         l.Addr().Network(),
563                                         conn.LocalAddr(),
564                                         conn.RemoteAddr(),
565                                 )
566                         })
567                         torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
568                         torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
569                         torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
570                 }()
571         }
572 }
573
574 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
575 func regularNetConnPeerConnConnString(nc net.Conn) string {
576         return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
577 }
578
579 func (cl *Client) incomingConnection(nc net.Conn) {
580         defer nc.Close()
581         if tc, ok := nc.(*net.TCPConn); ok {
582                 tc.SetLinger(0)
583         }
584         remoteAddr, _ := tryIpPortFromNetAddr(nc.RemoteAddr())
585         c := cl.newConnection(
586                 nc,
587                 newConnectionOpts{
588                         outgoing:        false,
589                         remoteAddr:      nc.RemoteAddr(),
590                         localPublicAddr: cl.publicAddr(remoteAddr.IP),
591                         network:         nc.RemoteAddr().Network(),
592                         connString:      regularNetConnPeerConnConnString(nc),
593                 })
594         defer func() {
595                 cl.lock()
596                 defer cl.unlock()
597                 c.close()
598         }()
599         c.Discovery = PeerSourceIncoming
600         cl.runReceivedConn(c)
601 }
602
603 // Returns a handle to the given torrent, if it's present in the client.
604 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
605         cl.rLock()
606         defer cl.rUnlock()
607         t, ok = cl.torrents[ih]
608         return
609 }
610
611 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
612         return cl.torrents[ih]
613 }
614
615 type DialResult struct {
616         Conn   net.Conn
617         Dialer Dialer
618 }
619
620 func countDialResult(err error) {
621         if err == nil {
622                 torrent.Add("successful dials", 1)
623         } else {
624                 torrent.Add("unsuccessful dials", 1)
625         }
626 }
627
628 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit, pendingPeers int) (ret time.Duration) {
629         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
630         if ret < minDialTimeout {
631                 ret = minDialTimeout
632         }
633         return
634 }
635
636 // Returns whether an address is known to connect to a client with our own ID.
637 func (cl *Client) dopplegangerAddr(addr string) bool {
638         _, ok := cl.dopplegangerAddrs[addr]
639         return ok
640 }
641
642 // Returns a connection over UTP or TCP, whichever is first to connect.
643 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
644         return DialFirst(ctx, addr, cl.dialers)
645 }
646
647 // Returns a connection over UTP or TCP, whichever is first to connect.
648 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
649         pool := dialPool{
650                 addr: addr,
651         }
652         defer pool.startDrainer()
653         for _, _s := range dialers {
654                 pool.add(ctx, _s)
655         }
656         return pool.getFirst()
657 }
658
659 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
660         c, err := s.Dial(ctx, addr)
661         if err != nil {
662                 log.Levelf(log.Debug, "error dialing %q: %v", addr, err)
663         }
664         // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
665         // it now in case we close the connection forthwith. Note this is also done in the TCP dialer
666         // code to increase the chance it's done.
667         if tc, ok := c.(*net.TCPConn); ok {
668                 tc.SetLinger(0)
669         }
670         countDialResult(err)
671         return c
672 }
673
674 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string, attemptKey outgoingConnAttemptKey) {
675         path := t.getHalfOpenPath(addr, attemptKey)
676         if !path.Exists() {
677                 panic("should exist")
678         }
679         path.Delete()
680         cl.numHalfOpen--
681         if cl.numHalfOpen < 0 {
682                 panic("should not be possible")
683         }
684         for _, t := range cl.torrents {
685                 t.openNewConns()
686         }
687 }
688
689 func (cl *Client) countHalfOpenFromTorrents() (count int) {
690         for _, t := range cl.torrents {
691                 count += t.numHalfOpenAttempts()
692         }
693         return
694 }
695
696 // Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection
697 // for valid reasons.
698 func (cl *Client) initiateProtocolHandshakes(
699         ctx context.Context,
700         nc net.Conn,
701         t *Torrent,
702         encryptHeader bool,
703         newConnOpts newConnectionOpts,
704 ) (
705         c *PeerConn, err error,
706 ) {
707         c = cl.newConnection(nc, newConnOpts)
708         c.headerEncrypted = encryptHeader
709         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
710         defer cancel()
711         dl, ok := ctx.Deadline()
712         if !ok {
713                 panic(ctx)
714         }
715         err = nc.SetDeadline(dl)
716         if err != nil {
717                 panic(err)
718         }
719         err = cl.initiateHandshakes(c, t)
720         return
721 }
722
723 func doProtocolHandshakeOnDialResult(
724         t *Torrent,
725         obfuscatedHeader bool,
726         addr PeerRemoteAddr,
727         dr DialResult,
728 ) (
729         c *PeerConn, err error,
730 ) {
731         cl := t.cl
732         nc := dr.Conn
733         addrIpPort, _ := tryIpPortFromNetAddr(addr)
734         c, err = cl.initiateProtocolHandshakes(
735                 context.Background(), nc, t, obfuscatedHeader,
736                 newConnectionOpts{
737                         outgoing:   true,
738                         remoteAddr: addr,
739                         // It would be possible to retrieve a public IP from the dialer used here?
740                         localPublicAddr: cl.publicAddr(addrIpPort.IP),
741                         network:         dr.Dialer.DialerNetwork(),
742                         connString:      regularNetConnPeerConnConnString(nc),
743                 })
744         if err != nil {
745                 nc.Close()
746         }
747         return c, err
748 }
749
750 // Returns nil connection and nil error if no connection could be established for valid reasons.
751 func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, err error) {
752         // It would be better if dial rate limiting could be tested when considering to open connections
753         // instead. Doing it here means if the limit is low, and the half-open limit is high, we could
754         // end up with lots of outgoing connection attempts pending that were initiated on stale data.
755         {
756                 dialReservation := cl.config.DialRateLimiter.Reserve()
757                 if !opts.receivedHolepunchConnect {
758                         if !dialReservation.OK() {
759                                 err = errors.New("can't make dial limit reservation")
760                                 return
761                         }
762                         time.Sleep(dialReservation.Delay())
763                 }
764         }
765         torrent.Add("establish outgoing connection", 1)
766         addr := opts.peerInfo.Addr
767         dialPool := dialPool{
768                 resCh: make(chan DialResult),
769                 addr:  addr.String(),
770         }
771         defer dialPool.startDrainer()
772         dialTimeout := opts.t.getDialTimeoutUnlocked()
773         {
774                 ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
775                 defer cancel()
776                 for _, d := range cl.dialers {
777                         dialPool.add(ctx, d)
778                 }
779         }
780         holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr)
781         headerObfuscationPolicy := opts.HeaderObfuscationPolicy
782         obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred
783         firstDialResult := dialPool.getFirst()
784         if firstDialResult.Conn == nil {
785                 // No dialers worked. Try to initiate a holepunching rendezvous.
786                 if holepunchAddrErr == nil {
787                         cl.lock()
788                         if !opts.receivedHolepunchConnect {
789                                 g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
790                         }
791                         if !opts.skipHolepunchRendezvous {
792                                 opts.t.trySendHolepunchRendezvous(holepunchAddr)
793                         }
794                         cl.unlock()
795                 }
796                 err = fmt.Errorf("all initial dials failed")
797                 return
798         }
799         if opts.receivedHolepunchConnect && holepunchAddrErr == nil {
800                 cl.lock()
801                 if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
802                         g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
803                 }
804                 g.MakeMapIfNil(&cl.dialedSuccessfullyAfterHolepunchConnect)
805                 g.MapInsert(cl.dialedSuccessfullyAfterHolepunchConnect, holepunchAddr, struct{}{})
806                 cl.unlock()
807         }
808         c, err = doProtocolHandshakeOnDialResult(
809                 opts.t,
810                 obfuscatedHeaderFirst,
811                 addr,
812                 firstDialResult,
813         )
814         if err == nil {
815                 torrent.Add("initiated conn with preferred header obfuscation", 1)
816                 return
817         }
818         c.logger.Levelf(
819                 log.Debug,
820                 "error doing protocol handshake with header obfuscation %v",
821                 obfuscatedHeaderFirst,
822         )
823         firstDialResult.Conn.Close()
824         // We should have just tried with the preferred header obfuscation. If it was required, there's nothing else to try.
825         if headerObfuscationPolicy.RequirePreferred {
826                 return
827         }
828         // Reuse the dialer that returned already but failed to handshake.
829         {
830                 ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
831                 defer cancel()
832                 dialPool.add(ctx, firstDialResult.Dialer)
833         }
834         secondDialResult := dialPool.getFirst()
835         if secondDialResult.Conn == nil {
836                 return
837         }
838         c, err = doProtocolHandshakeOnDialResult(
839                 opts.t,
840                 !obfuscatedHeaderFirst,
841                 addr,
842                 secondDialResult,
843         )
844         if err == nil {
845                 torrent.Add("initiated conn with fallback header obfuscation", 1)
846                 return
847         }
848         c.logger.Levelf(
849                 log.Debug,
850                 "error doing protocol handshake with header obfuscation %v",
851                 !obfuscatedHeaderFirst,
852         )
853         secondDialResult.Conn.Close()
854         return
855 }
856
857 type outgoingConnOpts struct {
858         peerInfo PeerInfo
859         t        *Torrent
860         // Don't attempt to connect unless a connect message is received after initiating a rendezvous.
861         requireRendezvous bool
862         // Don't send rendezvous requests to eligible relays.
863         skipHolepunchRendezvous bool
864         // Outgoing connection attempt is in response to holepunch connect message.
865         receivedHolepunchConnect bool
866         HeaderObfuscationPolicy  HeaderObfuscationPolicy
867 }
868
869 // Called to dial out and run a connection. The addr we're given is already
870 // considered half-open.
871 func (cl *Client) outgoingConnection(
872         opts outgoingConnOpts,
873         attemptKey outgoingConnAttemptKey,
874 ) {
875         c, err := cl.dialAndCompleteHandshake(opts)
876         if err == nil {
877                 c.conn.SetWriteDeadline(time.Time{})
878         }
879         cl.lock()
880         defer cl.unlock()
881         // Don't release lock between here and addPeerConn, unless it's for failure.
882         cl.noLongerHalfOpen(opts.t, opts.peerInfo.Addr.String(), attemptKey)
883         if err != nil {
884                 if cl.config.Debug {
885                         cl.logger.Levelf(
886                                 log.Debug,
887                                 "error establishing outgoing connection to %v: %v",
888                                 opts.peerInfo.Addr,
889                                 err,
890                         )
891                 }
892                 return
893         }
894         defer c.close()
895         c.Discovery = opts.peerInfo.Source
896         c.trusted = opts.peerInfo.Trusted
897         opts.t.runHandshookConnLoggingErr(c)
898 }
899
900 // The port number for incoming peer connections. 0 if the client isn't listening.
901 func (cl *Client) incomingPeerPort() int {
902         return cl.LocalPort()
903 }
904
905 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
906         if c.headerEncrypted {
907                 var rw io.ReadWriter
908                 var err error
909                 rw, c.cryptoMethod, err = mse.InitiateHandshake(
910                         struct {
911                                 io.Reader
912                                 io.Writer
913                         }{c.r, c.w},
914                         t.infoHash[:],
915                         nil,
916                         cl.config.CryptoProvides,
917                 )
918                 c.setRW(rw)
919                 if err != nil {
920                         return fmt.Errorf("header obfuscation handshake: %w", err)
921                 }
922         }
923         ih, err := cl.connBtHandshake(c, &t.infoHash)
924         if err != nil {
925                 return fmt.Errorf("bittorrent protocol handshake: %w", err)
926         }
927         if ih != t.infoHash {
928                 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
929         }
930         return nil
931 }
932
933 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
934 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
935 func (cl *Client) forSkeys(f func([]byte) bool) {
936         cl.rLock()
937         defer cl.rUnlock()
938         if false { // Emulate the bug from #114
939                 var firstIh InfoHash
940                 for ih := range cl.torrents {
941                         firstIh = ih
942                         break
943                 }
944                 for range cl.torrents {
945                         if !f(firstIh[:]) {
946                                 break
947                         }
948                 }
949                 return
950         }
951         for ih := range cl.torrents {
952                 if !f(ih[:]) {
953                         break
954                 }
955         }
956 }
957
958 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
959         if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
960                 return ret
961         }
962         return cl.forSkeys
963 }
964
965 // Do encryption and bittorrent handshakes as receiver.
966 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
967         defer perf.ScopeTimerErr(&err)()
968         var rw io.ReadWriter
969         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
970         c.setRW(rw)
971         if err == nil || err == mse.ErrNoSecretKeyMatch {
972                 if c.headerEncrypted {
973                         torrent.Add("handshakes received encrypted", 1)
974                 } else {
975                         torrent.Add("handshakes received unencrypted", 1)
976                 }
977         } else {
978                 torrent.Add("handshakes received with error while handling encryption", 1)
979         }
980         if err != nil {
981                 if err == mse.ErrNoSecretKeyMatch {
982                         err = nil
983                 }
984                 return
985         }
986         if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
987                 err = errors.New("connection does not have required header obfuscation")
988                 return
989         }
990         ih, err := cl.connBtHandshake(c, nil)
991         if err != nil {
992                 return nil, fmt.Errorf("during bt handshake: %w", err)
993         }
994         cl.lock()
995         t = cl.torrents[ih]
996         cl.unlock()
997         return
998 }
999
1000 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
1001
1002 func init() {
1003         torrent.Set(
1004                 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
1005                 &successfulPeerWireProtocolHandshakePeerReservedBytes)
1006 }
1007
1008 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
1009         res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
1010         if err != nil {
1011                 return
1012         }
1013         successfulPeerWireProtocolHandshakePeerReservedBytes.Add(
1014                 hex.EncodeToString(res.PeerExtensionBits[:]), 1)
1015         ret = res.Hash
1016         c.PeerExtensionBytes = res.PeerExtensionBits
1017         c.PeerID = res.PeerID
1018         c.completedHandshake = time.Now()
1019         if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
1020                 cb(c, res.Hash)
1021         }
1022         return
1023 }
1024
1025 func (cl *Client) runReceivedConn(c *PeerConn) {
1026         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
1027         if err != nil {
1028                 panic(err)
1029         }
1030         t, err := cl.receiveHandshakes(c)
1031         if err != nil {
1032                 cl.logger.LazyLog(log.Debug, func() log.Msg {
1033                         return log.Fmsg(
1034                                 "error receiving handshakes on %v: %s", c, err,
1035                         ).Add(
1036                                 "network", c.Network,
1037                         )
1038                 })
1039                 torrent.Add("error receiving handshake", 1)
1040                 cl.lock()
1041                 cl.onBadAccept(c.RemoteAddr)
1042                 cl.unlock()
1043                 return
1044         }
1045         if t == nil {
1046                 torrent.Add("received handshake for unloaded torrent", 1)
1047                 cl.logger.LazyLog(log.Debug, func() log.Msg {
1048                         return log.Fmsg("received handshake for unloaded torrent")
1049                 })
1050                 cl.lock()
1051                 cl.onBadAccept(c.RemoteAddr)
1052                 cl.unlock()
1053                 return
1054         }
1055         torrent.Add("received handshake for loaded torrent", 1)
1056         c.conn.SetWriteDeadline(time.Time{})
1057         cl.lock()
1058         defer cl.unlock()
1059         t.runHandshookConnLoggingErr(c)
1060 }
1061
1062 // Client lock must be held before entering this.
1063 func (t *Torrent) runHandshookConn(pc *PeerConn) error {
1064         pc.setTorrent(t)
1065         cl := t.cl
1066         for i, b := range cl.config.MinPeerExtensions {
1067                 if pc.PeerExtensionBytes[i]&b != b {
1068                         return fmt.Errorf("peer did not meet minimum peer extensions: %x", pc.PeerExtensionBytes[:])
1069                 }
1070         }
1071         if pc.PeerID == cl.peerID {
1072                 if pc.outgoing {
1073                         connsToSelf.Add(1)
1074                         addr := pc.RemoteAddr.String()
1075                         cl.dopplegangerAddrs[addr] = struct{}{}
1076                 } /* else {
1077                         // Because the remote address is not necessarily the same as its client's torrent listen
1078                         // address, we won't record the remote address as a doppleganger. Instead, the initiator
1079                         // can record *us* as the doppleganger.
1080                 } */
1081                 t.logger.Levelf(log.Debug, "local and remote peer ids are the same")
1082                 return nil
1083         }
1084         pc.r = deadlineReader{pc.conn, pc.r}
1085         completedHandshakeConnectionFlags.Add(pc.connectionFlags(), 1)
1086         if connIsIpv6(pc.conn) {
1087                 torrent.Add("completed handshake over ipv6", 1)
1088         }
1089         if err := t.addPeerConn(pc); err != nil {
1090                 return fmt.Errorf("adding connection: %w", err)
1091         }
1092         defer t.dropConnection(pc)
1093         pc.addBuiltinLtepProtocols(!cl.config.DisablePEX)
1094         for _, cb := range pc.callbacks.PeerConnAdded {
1095                 cb(pc)
1096         }
1097         pc.startMessageWriter()
1098         pc.sendInitialMessages()
1099         pc.initUpdateRequestsTimer()
1100         err := pc.mainReadLoop()
1101         if err != nil {
1102                 return fmt.Errorf("main read loop: %w", err)
1103         }
1104         return nil
1105 }
1106
1107 func (p *Peer) initUpdateRequestsTimer() {
1108         if check.Enabled {
1109                 if p.updateRequestsTimer != nil {
1110                         panic(p.updateRequestsTimer)
1111                 }
1112         }
1113         if enableUpdateRequestsTimer {
1114                 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
1115         }
1116 }
1117
1118 const peerUpdateRequestsTimerReason = "updateRequestsTimer"
1119
1120 func (c *Peer) updateRequestsTimerFunc() {
1121         c.locker().Lock()
1122         defer c.locker().Unlock()
1123         if c.closed.IsSet() {
1124                 return
1125         }
1126         if c.isLowOnRequests() {
1127                 // If there are no outstanding requests, then a request update should have already run.
1128                 return
1129         }
1130         if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
1131                 // These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's
1132                 // already been fired.
1133                 torrent.Add("spurious timer requests updates", 1)
1134                 return
1135         }
1136         c.updateRequests(peerUpdateRequestsTimerReason)
1137 }
1138
1139 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1140 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1141 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1142 const localClientReqq = 1024
1143
1144 // See the order given in Transmission's tr_peerMsgsNew.
1145 func (pc *PeerConn) sendInitialMessages() {
1146         t := pc.t
1147         cl := t.cl
1148         if pc.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1149                 pc.write(pp.Message{
1150                         Type:       pp.Extended,
1151                         ExtendedID: pp.HandshakeExtendedID,
1152                         ExtendedPayload: func() []byte {
1153                                 msg := pp.ExtendedHandshakeMessage{
1154                                         V:            cl.config.ExtendedHandshakeClientVersion,
1155                                         Reqq:         localClientReqq,
1156                                         YourIp:       pp.CompactIp(pc.remoteIp()),
1157                                         Encryption:   cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1158                                         Port:         cl.incomingPeerPort(),
1159                                         MetadataSize: t.metadataSize(),
1160                                         // TODO: We can figure these out specific to the socket used.
1161                                         Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1162                                         Ipv6: cl.config.PublicIp6.To16(),
1163                                 }
1164                                 msg.M = pc.LocalLtepProtocolMap.toSupportedExtensionDict()
1165                                 return bencode.MustMarshal(msg)
1166                         }(),
1167                 })
1168         }
1169         func() {
1170                 if pc.fastEnabled() {
1171                         if t.haveAllPieces() {
1172                                 pc.write(pp.Message{Type: pp.HaveAll})
1173                                 pc.sentHaves.AddRange(0, bitmap.BitRange(pc.t.NumPieces()))
1174                                 return
1175                         } else if !t.haveAnyPieces() {
1176                                 pc.write(pp.Message{Type: pp.HaveNone})
1177                                 pc.sentHaves.Clear()
1178                                 return
1179                         }
1180                 }
1181                 pc.postBitfield()
1182         }()
1183         if pc.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1184                 pc.write(pp.Message{
1185                         Type: pp.Port,
1186                         Port: cl.dhtPort(),
1187                 })
1188         }
1189 }
1190
1191 func (cl *Client) dhtPort() (ret uint16) {
1192         if len(cl.dhtServers) == 0 {
1193                 return
1194         }
1195         return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1196 }
1197
1198 func (cl *Client) haveDhtServer() bool {
1199         return len(cl.dhtServers) > 0
1200 }
1201
1202 // Process incoming ut_metadata message.
1203 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1204         var d pp.ExtendedMetadataRequestMsg
1205         err := bencode.Unmarshal(payload, &d)
1206         if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1207         } else if err != nil {
1208                 return fmt.Errorf("error unmarshalling bencode: %s", err)
1209         }
1210         piece := d.Piece
1211         switch d.Type {
1212         case pp.DataMetadataExtensionMsgType:
1213                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1214                 if !c.requestedMetadataPiece(piece) {
1215                         return fmt.Errorf("got unexpected piece %d", piece)
1216                 }
1217                 c.metadataRequests[piece] = false
1218                 begin := len(payload) - d.PieceSize()
1219                 if begin < 0 || begin >= len(payload) {
1220                         return fmt.Errorf("data has bad offset in payload: %d", begin)
1221                 }
1222                 t.saveMetadataPiece(piece, payload[begin:])
1223                 c.lastUsefulChunkReceived = time.Now()
1224                 err = t.maybeCompleteMetadata()
1225                 if err != nil {
1226                         // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1227                         // don't know who to blame. TODO: Also errors can be returned here that aren't related
1228                         // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1229                         // log consumers can filter for this message.
1230                         t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1231                 }
1232                 return err
1233         case pp.RequestMetadataExtensionMsgType:
1234                 if !t.haveMetadataPiece(piece) {
1235                         c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1236                         return nil
1237                 }
1238                 start := (1 << 14) * piece
1239                 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1240                 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1241                 return nil
1242         case pp.RejectMetadataExtensionMsgType:
1243                 return nil
1244         default:
1245                 return errors.New("unknown msg_type value")
1246         }
1247 }
1248
1249 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1250         if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1251                 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1252         }
1253         return false
1254 }
1255
1256 // Returns whether the IP address and port are considered "bad".
1257 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1258         if port == 0 || ip == nil {
1259                 return true
1260         }
1261         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1262                 return true
1263         }
1264         if _, ok := cl.ipBlockRange(ip); ok {
1265                 return true
1266         }
1267         ipAddr, ok := netip.AddrFromSlice(ip)
1268         if !ok {
1269                 panic(ip)
1270         }
1271         if _, ok := cl.badPeerIPs[ipAddr]; ok {
1272                 return true
1273         }
1274         return false
1275 }
1276
1277 // Return a Torrent ready for insertion into a Client.
1278 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1279         return cl.newTorrentOpt(AddTorrentOpts{
1280                 InfoHash: ih,
1281                 Storage:  specStorage,
1282         })
1283 }
1284
1285 // Return a Torrent ready for insertion into a Client.
1286 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1287         // use provided storage, if provided
1288         storageClient := cl.defaultStorage
1289         if opts.Storage != nil {
1290                 storageClient = storage.NewClient(opts.Storage)
1291         }
1292
1293         t = &Torrent{
1294                 cl:       cl,
1295                 infoHash: opts.InfoHash,
1296                 peers: prioritizedPeers{
1297                         om: gbtree.New(32),
1298                         getPrio: func(p PeerInfo) peerPriority {
1299                                 ipPort := p.addr()
1300                                 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1301                         },
1302                 },
1303                 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1304
1305                 storageOpener:       storageClient,
1306                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1307
1308                 metadataChanged: sync.Cond{
1309                         L: cl.locker(),
1310                 },
1311                 webSeeds:     make(map[string]*Peer),
1312                 gotMetainfoC: make(chan struct{}),
1313         }
1314         var salt [8]byte
1315         rand.Read(salt[:])
1316         t.smartBanCache.Hash = func(b []byte) uint64 {
1317                 h := xxhash.New()
1318                 h.Write(salt[:])
1319                 h.Write(b)
1320                 return h.Sum64()
1321         }
1322         t.smartBanCache.Init()
1323         t.networkingEnabled.Set()
1324         t.logger = cl.logger.WithDefaultLevel(log.Debug)
1325         t.sourcesLogger = t.logger.WithNames("sources")
1326         if opts.ChunkSize == 0 {
1327                 opts.ChunkSize = defaultChunkSize
1328         }
1329         t.setChunkSize(opts.ChunkSize)
1330         return
1331 }
1332
1333 // A file-like handle to some torrent data resource.
1334 type Handle interface {
1335         io.Reader
1336         io.Seeker
1337         io.Closer
1338         io.ReaderAt
1339 }
1340
1341 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1342         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1343 }
1344
1345 // Adds a torrent by InfoHash with a custom Storage implementation.
1346 // If the torrent already exists then this Storage is ignored and the
1347 // existing torrent returned with `new` set to `false`
1348 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1349         cl.lock()
1350         defer cl.unlock()
1351         t, ok := cl.torrents[infoHash]
1352         if ok {
1353                 return
1354         }
1355         new = true
1356
1357         t = cl.newTorrent(infoHash, specStorage)
1358         cl.eachDhtServer(func(s DhtServer) {
1359                 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1360                         go t.dhtAnnouncer(s)
1361                 }
1362         })
1363         cl.torrents[infoHash] = t
1364         cl.clearAcceptLimits()
1365         t.updateWantPeersEvent()
1366         // Tickle Client.waitAccept, new torrent may want conns.
1367         cl.event.Broadcast()
1368         return
1369 }
1370
1371 // Adds a torrent by InfoHash with a custom Storage implementation. If the torrent already exists
1372 // then this Storage is ignored and the existing torrent returned with `new` set to `false`.
1373 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1374         infoHash := opts.InfoHash
1375         cl.lock()
1376         defer cl.unlock()
1377         t, ok := cl.torrents[infoHash]
1378         if ok {
1379                 return
1380         }
1381         new = true
1382
1383         t = cl.newTorrentOpt(opts)
1384         cl.eachDhtServer(func(s DhtServer) {
1385                 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1386                         go t.dhtAnnouncer(s)
1387                 }
1388         })
1389         cl.torrents[infoHash] = t
1390         t.setInfoBytesLocked(opts.InfoBytes)
1391         cl.clearAcceptLimits()
1392         t.updateWantPeersEvent()
1393         // Tickle Client.waitAccept, new torrent may want conns.
1394         cl.event.Broadcast()
1395         return
1396 }
1397
1398 type AddTorrentOpts struct {
1399         InfoHash  infohash.T
1400         Storage   storage.ClientImpl
1401         ChunkSize pp.Integer
1402         InfoBytes []byte
1403 }
1404
1405 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1406 // Torrent.MergeSpec.
1407 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1408         t, new = cl.AddTorrentOpt(AddTorrentOpts{
1409                 InfoHash:  spec.InfoHash,
1410                 Storage:   spec.Storage,
1411                 ChunkSize: spec.ChunkSize,
1412         })
1413         modSpec := *spec
1414         if new {
1415                 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1416                 // it.
1417                 modSpec.ChunkSize = 0
1418         }
1419         err = t.MergeSpec(&modSpec)
1420         if err != nil && new {
1421                 t.Drop()
1422         }
1423         return
1424 }
1425
1426 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1427 // spec.DisallowDataDownload/Upload will be read and applied
1428 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1429 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1430         if spec.DisplayName != "" {
1431                 t.SetDisplayName(spec.DisplayName)
1432         }
1433         if spec.InfoBytes != nil {
1434                 err := t.SetInfoBytes(spec.InfoBytes)
1435                 if err != nil {
1436                         return err
1437                 }
1438         }
1439         cl := t.cl
1440         cl.AddDhtNodes(spec.DhtNodes)
1441         t.UseSources(spec.Sources)
1442         cl.lock()
1443         defer cl.unlock()
1444         t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1445         for _, url := range spec.Webseeds {
1446                 t.addWebSeed(url)
1447         }
1448         for _, peerAddr := range spec.PeerAddrs {
1449                 t.addPeer(PeerInfo{
1450                         Addr:    StringAddr(peerAddr),
1451                         Source:  PeerSourceDirect,
1452                         Trusted: true,
1453                 })
1454         }
1455         if spec.ChunkSize != 0 {
1456                 panic("chunk size cannot be changed for existing Torrent")
1457         }
1458         t.addTrackers(spec.Trackers)
1459         t.maybeNewConns()
1460         t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1461         t.dataUploadDisallowed = spec.DisallowDataUpload
1462         return nil
1463 }
1464
1465 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1466         t, ok := cl.torrents[infoHash]
1467         if !ok {
1468                 err = fmt.Errorf("no such torrent")
1469                 return
1470         }
1471         err = t.close(wg)
1472         delete(cl.torrents, infoHash)
1473         return
1474 }
1475
1476 func (cl *Client) allTorrentsCompleted() bool {
1477         for _, t := range cl.torrents {
1478                 if !t.haveInfo() {
1479                         return false
1480                 }
1481                 if !t.haveAllPieces() {
1482                         return false
1483                 }
1484         }
1485         return true
1486 }
1487
1488 // Returns true when all torrents are completely downloaded and false if the
1489 // client is stopped before that.
1490 func (cl *Client) WaitAll() bool {
1491         cl.lock()
1492         defer cl.unlock()
1493         for !cl.allTorrentsCompleted() {
1494                 if cl.closed.IsSet() {
1495                         return false
1496                 }
1497                 cl.event.Wait()
1498         }
1499         return true
1500 }
1501
1502 // Returns handles to all the torrents loaded in the Client.
1503 func (cl *Client) Torrents() []*Torrent {
1504         cl.rLock()
1505         defer cl.rUnlock()
1506         return cl.torrentsAsSlice()
1507 }
1508
1509 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1510         for _, t := range cl.torrents {
1511                 ret = append(ret, t)
1512         }
1513         return
1514 }
1515
1516 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1517         spec, err := TorrentSpecFromMagnetUri(uri)
1518         if err != nil {
1519                 return
1520         }
1521         T, _, err = cl.AddTorrentSpec(spec)
1522         return
1523 }
1524
1525 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1526         ts, err := TorrentSpecFromMetaInfoErr(mi)
1527         if err != nil {
1528                 return
1529         }
1530         T, _, err = cl.AddTorrentSpec(ts)
1531         return
1532 }
1533
1534 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1535         mi, err := metainfo.LoadFromFile(filename)
1536         if err != nil {
1537                 return
1538         }
1539         return cl.AddTorrent(mi)
1540 }
1541
1542 func (cl *Client) DhtServers() []DhtServer {
1543         return cl.dhtServers
1544 }
1545
1546 func (cl *Client) AddDhtNodes(nodes []string) {
1547         for _, n := range nodes {
1548                 hmp := missinggo.SplitHostMaybePort(n)
1549                 ip := net.ParseIP(hmp.Host)
1550                 if ip == nil {
1551                         cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1552                         continue
1553                 }
1554                 ni := krpc.NodeInfo{
1555                         Addr: krpc.NodeAddr{
1556                                 IP:   ip,
1557                                 Port: hmp.Port,
1558                         },
1559                 }
1560                 cl.eachDhtServer(func(s DhtServer) {
1561                         s.AddNode(ni)
1562                 })
1563         }
1564 }
1565
1566 func (cl *Client) banPeerIP(ip net.IP) {
1567         // We can't take this from string, because it will lose netip's v4on6. net.ParseIP parses v4
1568         // addresses directly to v4on6, which doesn't compare equal with v4.
1569         ipAddr, ok := netip.AddrFromSlice(ip)
1570         if !ok {
1571                 panic(ip)
1572         }
1573         g.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{})
1574         for _, t := range cl.torrents {
1575                 t.iterPeers(func(p *Peer) {
1576                         if p.remoteIp().Equal(ip) {
1577                                 t.logger.Levelf(log.Warning, "dropping peer %v with banned ip %v", p, ip)
1578                                 // Should this be a close?
1579                                 p.drop()
1580                         }
1581                 })
1582         }
1583 }
1584
1585 type newConnectionOpts struct {
1586         outgoing        bool
1587         remoteAddr      PeerRemoteAddr
1588         localPublicAddr peerLocalPublicAddr
1589         network         string
1590         connString      string
1591 }
1592
1593 func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) {
1594         if opts.network == "" {
1595                 panic(opts.remoteAddr)
1596         }
1597         c = &PeerConn{
1598                 Peer: Peer{
1599                         outgoing:        opts.outgoing,
1600                         choking:         true,
1601                         peerChoking:     true,
1602                         PeerMaxRequests: 250,
1603
1604                         RemoteAddr:      opts.remoteAddr,
1605                         localPublicAddr: opts.localPublicAddr,
1606                         Network:         opts.network,
1607                         callbacks:       &cl.config.Callbacks,
1608                 },
1609                 connString: opts.connString,
1610                 conn:       nc,
1611         }
1612         c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn
1613         c.initRequestState()
1614         // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
1615         if opts.remoteAddr != nil {
1616                 netipAddrPort, err := netip.ParseAddrPort(opts.remoteAddr.String())
1617                 if err == nil {
1618                         c.bannableAddr = Some(netipAddrPort.Addr())
1619                 }
1620         }
1621         c.peerImpl = c
1622         c.logger = cl.logger.WithDefaultLevel(log.Warning)
1623         c.logger = c.logger.WithContextText(fmt.Sprintf("%T %p", c, c))
1624         c.setRW(connStatsReadWriter{nc, c})
1625         c.r = &rateLimitedReader{
1626                 l: cl.config.DownloadRateLimiter,
1627                 r: c.r,
1628         }
1629         c.logger.Levelf(
1630                 log.Debug,
1631                 "inited with remoteAddr %v network %v outgoing %t",
1632                 opts.remoteAddr, opts.network, opts.outgoing,
1633         )
1634         for _, f := range cl.config.Callbacks.NewPeer {
1635                 f(&c.Peer)
1636         }
1637         return
1638 }
1639
1640 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1641         cl.lock()
1642         defer cl.unlock()
1643         t := cl.torrent(ih)
1644         if t == nil {
1645                 return
1646         }
1647         t.addPeers([]PeerInfo{{
1648                 Addr:   ipPortAddr{ip, port},
1649                 Source: PeerSourceDhtAnnouncePeer,
1650         }})
1651 }
1652
1653 func firstNotNil(ips ...net.IP) net.IP {
1654         for _, ip := range ips {
1655                 if ip != nil {
1656                         return ip
1657                 }
1658         }
1659         return nil
1660 }
1661
1662 func (cl *Client) eachListener(f func(Listener) bool) {
1663         for _, s := range cl.listeners {
1664                 if !f(s) {
1665                         break
1666                 }
1667         }
1668 }
1669
1670 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1671         for i := 0; i < len(cl.listeners); i += 1 {
1672                 if ret = cl.listeners[i]; f(ret) {
1673                         return
1674                 }
1675         }
1676         return nil
1677 }
1678
1679 func (cl *Client) publicIp(peer net.IP) net.IP {
1680         // TODO: Use BEP 10 to determine how peers are seeing us.
1681         if peer.To4() != nil {
1682                 return firstNotNil(
1683                         cl.config.PublicIp4,
1684                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1685                 )
1686         }
1687
1688         return firstNotNil(
1689                 cl.config.PublicIp6,
1690                 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1691         )
1692 }
1693
1694 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1695         l := cl.findListener(
1696                 func(l Listener) bool {
1697                         return f(addrIpOrNil(l.Addr()))
1698                 },
1699         )
1700         if l == nil {
1701                 return nil
1702         }
1703         return addrIpOrNil(l.Addr())
1704 }
1705
1706 // Our IP as a peer should see it.
1707 func (cl *Client) publicAddr(peer net.IP) IpPort {
1708         return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1709 }
1710
1711 // ListenAddrs addresses currently being listened to.
1712 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1713         cl.lock()
1714         ret = make([]net.Addr, len(cl.listeners))
1715         for i := 0; i < len(cl.listeners); i += 1 {
1716                 ret[i] = cl.listeners[i].Addr()
1717         }
1718         cl.unlock()
1719         return
1720 }
1721
1722 func (cl *Client) PublicIPs() (ips []net.IP) {
1723         if ip := cl.config.PublicIp4; ip != nil {
1724                 ips = append(ips, ip)
1725         }
1726         if ip := cl.config.PublicIp6; ip != nil {
1727                 ips = append(ips, ip)
1728         }
1729         return
1730 }
1731
1732 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1733         ipa, ok := tryIpPortFromNetAddr(addr)
1734         if !ok {
1735                 return
1736         }
1737         ip := maskIpForAcceptLimiting(ipa.IP)
1738         if cl.acceptLimiter == nil {
1739                 cl.acceptLimiter = make(map[ipStr]int)
1740         }
1741         cl.acceptLimiter[ipStr(ip.String())]++
1742 }
1743
1744 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1745         if ip4 := ip.To4(); ip4 != nil {
1746                 return ip4.Mask(net.CIDRMask(24, 32))
1747         }
1748         return ip
1749 }
1750
1751 func (cl *Client) clearAcceptLimits() {
1752         cl.acceptLimiter = nil
1753 }
1754
1755 func (cl *Client) acceptLimitClearer() {
1756         for {
1757                 select {
1758                 case <-cl.closed.Done():
1759                         return
1760                 case <-time.After(15 * time.Minute):
1761                         cl.lock()
1762                         cl.clearAcceptLimits()
1763                         cl.unlock()
1764                 }
1765         }
1766 }
1767
1768 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1769         if cl.config.DisableAcceptRateLimiting {
1770                 return false
1771         }
1772         return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1773 }
1774
1775 func (cl *Client) rLock() {
1776         cl._mu.RLock()
1777 }
1778
1779 func (cl *Client) rUnlock() {
1780         cl._mu.RUnlock()
1781 }
1782
1783 func (cl *Client) lock() {
1784         cl._mu.Lock()
1785 }
1786
1787 func (cl *Client) unlock() {
1788         cl._mu.Unlock()
1789 }
1790
1791 func (cl *Client) locker() *lockWithDeferreds {
1792         return &cl._mu
1793 }
1794
1795 func (cl *Client) String() string {
1796         return fmt.Sprintf("<%[1]T %[1]p>", cl)
1797 }
1798
1799 // Returns connection-level aggregate connStats at the Client level. See the comment on
1800 // TorrentStats.ConnStats.
1801 func (cl *Client) ConnStats() ConnStats {
1802         return cl.connStats.Copy()
1803 }
1804
1805 func (cl *Client) Stats() ClientStats {
1806         cl.rLock()
1807         defer cl.rUnlock()
1808         return cl.statsLocked()
1809 }