18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/torrent/internal/limiter"
28 "github.com/anacrolix/torrent/tracker"
29 "github.com/anacrolix/torrent/webtorrent"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
35 "golang.org/x/xerrors"
37 "github.com/anacrolix/missinggo/v2"
38 "github.com/anacrolix/missinggo/v2/conntrack"
40 "github.com/anacrolix/torrent/bencode"
41 "github.com/anacrolix/torrent/iplist"
42 "github.com/anacrolix/torrent/metainfo"
43 "github.com/anacrolix/torrent/mse"
44 pp "github.com/anacrolix/torrent/peer_protocol"
45 "github.com/anacrolix/torrent/storage"
48 // Clients contain zero or more Torrents. A Client manages a blocklist, the
49 // TCP/UDP protocol ports, and DHT as desired.
51 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
57 closed missinggo.Event
63 defaultStorage *storage.Client
67 dhtServers []DhtServer
68 ipBlockList iplist.Ranger
70 // Set of addresses that have our client ID. This intentionally will
71 // include ourselves if we end up trying to connect to our own address
72 // through legitimate channels.
73 dopplegangerAddrs map[string]struct{}
74 badPeerIPs map[string]struct{}
75 torrents map[InfoHash]*Torrent
77 acceptLimiter map[ipStr]int
78 dialRateLimiter *rate.Limiter
81 websocketTrackers websocketTrackers
83 activeAnnounceLimiter limiter.Instance
88 func (cl *Client) BadPeerIPs() []string {
91 return cl.badPeerIPsLocked()
94 func (cl *Client) badPeerIPsLocked() []string {
95 return slices.FromMapKeys(cl.badPeerIPs).([]string)
98 func (cl *Client) PeerID() PeerID {
102 // Returns the port number for the first listener that has one. No longer assumes that all port
103 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
105 func (cl *Client) LocalPort() (port int) {
106 cl.eachListener(func(l Listener) bool {
107 port = addrPortOrZero(l.Addr())
113 func writeDhtServerStatus(w io.Writer, s DhtServer) {
114 dhtStats := s.Stats()
115 fmt.Fprintf(w, " ID: %x\n", s.ID())
116 spew.Fdump(w, dhtStats)
119 // Writes out a human readable status of the client, such as for writing to a
121 func (cl *Client) WriteStatus(_w io.Writer) {
124 w := bufio.NewWriter(_w)
126 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
127 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
128 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
129 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
130 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
131 cl.eachDhtServer(func(s DhtServer) {
132 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
133 writeDhtServerStatus(w, s)
135 spew.Fdump(w, &cl.stats)
136 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
138 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
139 return l.InfoHash().AsString() < r.InfoHash().AsString()
142 fmt.Fprint(w, "<unknown name>")
144 fmt.Fprint(w, t.name())
150 "%f%% of %d bytes (%s)",
151 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
153 humanize.Bytes(uint64(*t.length)))
155 w.WriteString("<missing metainfo>")
163 // Filters things that are less than warning from UPnP discovery.
164 func upnpDiscoverLogFilter(m log.Msg) bool {
165 level, ok := m.GetLevel()
166 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
169 func (cl *Client) initLogger() {
170 logger := cl.config.Logger
173 if !cl.config.Debug {
174 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
177 cl.logger = logger.WithValues(cl)
180 func (cl *Client) announceKey() int32 {
181 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
184 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
186 cfg = NewDefaultClientConfig()
196 dopplegangerAddrs: make(map[string]struct{}),
197 torrents: make(map[metainfo.Hash]*Torrent),
198 dialRateLimiter: rate.NewLimiter(10, 10),
200 cl.activeAnnounceLimiter.SlotsPerKey = 2
201 go cl.acceptLimitClearer()
209 cl.event.L = cl.locker()
210 storageImpl := cfg.DefaultStorage
211 if storageImpl == nil {
212 // We'd use mmap by default but HFS+ doesn't support sparse files.
213 storageImplCloser := storage.NewFile(cfg.DataDir)
214 cl.onClose = append(cl.onClose, func() {
215 if err := storageImplCloser.Close(); err != nil {
216 cl.logger.Printf("error closing default storage: %s", err)
219 storageImpl = storageImplCloser
221 cl.defaultStorage = storage.NewClient(storageImpl)
222 if cfg.IPBlocklist != nil {
223 cl.ipBlockList = cfg.IPBlocklist
226 if cfg.PeerID != "" {
227 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
229 o := copy(cl.peerID[:], cfg.Bep20)
230 _, err = rand.Read(cl.peerID[o:])
232 panic("error generating peer id")
236 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
244 for _, _s := range sockets {
245 s := _s // Go is fucking retarded.
246 cl.onClose = append(cl.onClose, func() { s.Close() })
247 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
248 cl.dialers = append(cl.dialers, s)
249 cl.listeners = append(cl.listeners, s)
250 go cl.acceptConnections(s)
256 for _, s := range sockets {
257 if pc, ok := s.(net.PacketConn); ok {
258 ds, err := cl.newAnacrolixDhtServer(pc)
262 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
263 cl.onClose = append(cl.onClose, func() { ds.Close() })
268 cl.websocketTrackers = websocketTrackers{
271 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
274 t, ok := cl.torrents[infoHash]
276 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
278 return t.announceRequest(event), nil
280 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
283 t, ok := cl.torrents[dcc.InfoHash]
285 cl.logger.WithDefaultLevel(log.Warning).Printf(
286 "got webrtc conn for unloaded torrent with infohash %x",
292 go t.onWebRtcConn(dc, dcc)
299 func (cl *Client) AddDhtServer(d DhtServer) {
300 cl.dhtServers = append(cl.dhtServers, d)
303 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
304 // given address for any Torrent.
305 func (cl *Client) AddDialer(d Dialer) {
308 cl.dialers = append(cl.dialers, d)
309 for _, t := range cl.torrents {
314 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
316 func (cl *Client) AddListener(l Listener) {
317 cl.listeners = append(cl.listeners, l)
318 go cl.acceptConnections(l)
321 func (cl *Client) firewallCallback(net.Addr) bool {
323 block := !cl.wantConns()
326 torrent.Add("connections firewalled", 1)
328 torrent.Add("connections not firewalled", 1)
333 func (cl *Client) listenOnNetwork(n network) bool {
334 if n.Ipv4 && cl.config.DisableIPv4 {
337 if n.Ipv6 && cl.config.DisableIPv6 {
340 if n.Tcp && cl.config.DisableTCP {
343 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
349 func (cl *Client) listenNetworks() (ns []network) {
350 for _, n := range allPeerNetworks {
351 if cl.listenOnNetwork(n) {
358 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
359 cfg := dht.ServerConfig{
360 IPBlocklist: cl.ipBlockList,
362 OnAnnouncePeer: cl.onDHTAnnouncePeer,
363 PublicIP: func() net.IP {
364 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
365 return cl.config.PublicIp6
367 return cl.config.PublicIp4
369 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
370 ConnectionTracking: cl.config.ConnTracker,
371 OnQuery: cl.config.DHTOnQuery,
372 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
374 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
377 s, err = dht.NewServer(&cfg)
380 ts, err := s.Bootstrap()
382 cl.logger.Printf("error bootstrapping dht: %s", err)
384 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
390 func (cl *Client) Closed() <-chan struct{} {
396 func (cl *Client) eachDhtServer(f func(DhtServer)) {
397 for _, ds := range cl.dhtServers {
402 // Stops the client. All connections to peers are closed and all activity will
404 func (cl *Client) Close() {
408 for _, t := range cl.torrents {
411 for i := range cl.onClose {
412 cl.onClose[len(cl.onClose)-1-i]()
417 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
418 if cl.ipBlockList == nil {
421 return cl.ipBlockList.Lookup(ip)
424 func (cl *Client) ipIsBlocked(ip net.IP) bool {
425 _, blocked := cl.ipBlockRange(ip)
429 func (cl *Client) wantConns() bool {
430 for _, t := range cl.torrents {
438 func (cl *Client) waitAccept() {
440 if cl.closed.IsSet() {
450 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
451 func (cl *Client) rejectAccepted(conn net.Conn) error {
452 ra := conn.RemoteAddr()
453 if rip := addrIpOrNil(ra); rip != nil {
454 if cl.config.DisableIPv4Peers && rip.To4() != nil {
455 return errors.New("ipv4 peers disabled")
457 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
458 return errors.New("ipv4 disabled")
461 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
462 return errors.New("ipv6 disabled")
464 if cl.rateLimitAccept(rip) {
465 return errors.New("source IP accepted rate limited")
467 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
468 return errors.New("bad source addr")
474 func (cl *Client) acceptConnections(l Listener) {
476 conn, err := l.Accept()
477 torrent.Add("client listener accepts", 1)
478 conn = pproffd.WrapNetConn(conn)
480 closed := cl.closed.IsSet()
483 reject = cl.rejectAccepted(conn)
493 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
498 torrent.Add("rejected accepted connections", 1)
499 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
502 go cl.incomingConnection(conn)
504 log.Fmsg("accepted %q connection at %q from %q",
508 ).SetLevel(log.Debug).Log(cl.logger)
509 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
510 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
511 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
516 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
517 func regularNetConnPeerConnConnString(nc net.Conn) string {
518 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
521 func (cl *Client) incomingConnection(nc net.Conn) {
523 if tc, ok := nc.(*net.TCPConn); ok {
526 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
527 regularNetConnPeerConnConnString(nc))
529 c.Discovery = PeerSourceIncoming
530 cl.runReceivedConn(c)
533 // Returns a handle to the given torrent, if it's present in the client.
534 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
537 t, ok = cl.torrents[ih]
541 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
542 return cl.torrents[ih]
545 type dialResult struct {
550 func countDialResult(err error) {
552 torrent.Add("successful dials", 1)
554 torrent.Add("unsuccessful dials", 1)
558 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
559 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
560 if ret < minDialTimeout {
566 // Returns whether an address is known to connect to a client with our own ID.
567 func (cl *Client) dopplegangerAddr(addr string) bool {
568 _, ok := cl.dopplegangerAddrs[addr]
572 // Returns a connection over UTP or TCP, whichever is first to connect.
573 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
575 t := perf.NewTimer(perf.CallerName(0))
578 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
580 t.Mark("returned conn over " + res.Network)
584 ctx, cancel := context.WithCancel(ctx)
585 // As soon as we return one connection, cancel the others.
588 resCh := make(chan dialResult, left)
592 cl.eachDialer(func(s Dialer) bool {
595 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
598 cl.dialFromSocket(ctx, s, addr),
599 s.LocalAddr().Network(),
606 // Wait for a successful connection.
608 defer perf.ScopeTimer()()
609 for ; left > 0 && res.Conn == nil; left-- {
613 // There are still incompleted dials.
615 for ; left > 0; left-- {
616 conn := (<-resCh).Conn
623 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
625 //if res.Conn != nil {
626 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
628 // cl.logger.Printf("failed to dial %s", addr)
633 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
634 network := s.LocalAddr().Network()
635 cte := cl.config.ConnTracker.Wait(
637 conntrack.Entry{network, s.LocalAddr().String(), addr},
638 "dial torrent client",
641 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
642 // which dial errors allow us to forget the connection tracking entry handle.
643 if ctx.Err() != nil {
649 c, err := s.Dial(ctx, addr)
650 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
651 // it now in case we close the connection forthwith.
652 if tc, ok := c.(*net.TCPConn); ok {
657 if err != nil && forgettableDialError(err) {
664 return closeWrapper{c, func() error {
671 func forgettableDialError(err error) bool {
672 return strings.Contains(err.Error(), "no suitable address found")
675 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
676 if _, ok := t.halfOpen[addr]; !ok {
677 panic("invariant broken")
679 delete(t.halfOpen, addr)
681 for _, t := range cl.torrents {
686 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
687 // for valid reasons.
688 func (cl *Client) initiateProtocolHandshakes(
692 outgoing, encryptHeader bool,
693 remoteAddr PeerRemoteAddr,
694 network, connString string,
696 c *PeerConn, err error,
698 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
699 c.headerEncrypted = encryptHeader
700 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
702 dl, ok := ctx.Deadline()
706 err = nc.SetDeadline(dl)
710 err = cl.initiateHandshakes(c, t)
714 // Returns nil connection and nil error if no connection could be established for valid reasons.
715 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
716 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
719 return t.dialTimeout()
722 dr := cl.dialFirst(dialCtx, addr.String())
725 if dialCtx.Err() != nil {
726 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
728 return nil, errors.New("dial failed")
730 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
737 // Returns nil connection and nil error if no connection could be established
738 // for valid reasons.
739 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
740 torrent.Add("establish outgoing connection", 1)
741 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
742 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
744 torrent.Add("initiated conn with preferred header obfuscation", 1)
747 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
748 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
749 // We should have just tried with the preferred header obfuscation. If it was required,
750 // there's nothing else to try.
753 // Try again with encryption if we didn't earlier, or without if we did.
754 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
756 torrent.Add("initiated conn with fallback header obfuscation", 1)
758 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
762 // Called to dial out and run a connection. The addr we're given is already
763 // considered half-open.
764 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
765 cl.dialRateLimiter.Wait(context.Background())
766 c, err := cl.establishOutgoingConn(t, addr)
769 // Don't release lock between here and addPeerConn, unless it's for
771 cl.noLongerHalfOpen(t, addr.String())
774 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
781 t.runHandshookConnLoggingErr(c)
784 // The port number for incoming peer connections. 0 if the client isn't listening.
785 func (cl *Client) incomingPeerPort() int {
786 return cl.LocalPort()
789 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
790 if c.headerEncrypted {
793 rw, c.cryptoMethod, err = mse.InitiateHandshake(
800 cl.config.CryptoProvides,
804 return xerrors.Errorf("header obfuscation handshake: %w", err)
807 ih, err := cl.connBtHandshake(c, &t.infoHash)
809 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
811 if ih != t.infoHash {
812 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
817 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
818 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
819 func (cl *Client) forSkeys(f func([]byte) bool) {
822 if false { // Emulate the bug from #114
824 for ih := range cl.torrents {
828 for range cl.torrents {
835 for ih := range cl.torrents {
842 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
843 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
849 // Do encryption and bittorrent handshakes as receiver.
850 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
851 defer perf.ScopeTimerErr(&err)()
853 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
855 if err == nil || err == mse.ErrNoSecretKeyMatch {
856 if c.headerEncrypted {
857 torrent.Add("handshakes received encrypted", 1)
859 torrent.Add("handshakes received unencrypted", 1)
862 torrent.Add("handshakes received with error while handling encryption", 1)
865 if err == mse.ErrNoSecretKeyMatch {
870 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
871 err = errors.New("connection does not have required header obfuscation")
874 ih, err := cl.connBtHandshake(c, nil)
876 err = xerrors.Errorf("during bt handshake: %w", err)
885 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
886 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
891 c.PeerExtensionBytes = res.PeerExtensionBits
892 c.PeerID = res.PeerID
893 c.completedHandshake = time.Now()
894 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
900 func (cl *Client) runReceivedConn(c *PeerConn) {
901 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
905 t, err := cl.receiveHandshakes(c)
908 "error receiving handshakes on %v: %s", c, err,
909 ).SetLevel(log.Debug).
911 "network", c.Network,
913 torrent.Add("error receiving handshake", 1)
915 cl.onBadAccept(c.RemoteAddr)
920 torrent.Add("received handshake for unloaded torrent", 1)
921 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
923 cl.onBadAccept(c.RemoteAddr)
927 torrent.Add("received handshake for loaded torrent", 1)
930 t.runHandshookConnLoggingErr(c)
933 // Client lock must be held before entering this.
934 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
936 if c.PeerID == cl.peerID {
939 addr := c.conn.RemoteAddr().String()
940 cl.dopplegangerAddrs[addr] = struct{}{}
942 // Because the remote address is not necessarily the same as its client's torrent listen
943 // address, we won't record the remote address as a doppleganger. Instead, the initiator
944 // can record *us* as the doppleganger.
946 return errors.New("local and remote peer ids are the same")
948 c.conn.SetWriteDeadline(time.Time{})
949 c.r = deadlineReader{c.conn, c.r}
950 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
951 if connIsIpv6(c.conn) {
952 torrent.Add("completed handshake over ipv6", 1)
954 if err := t.addPeerConn(c); err != nil {
955 return fmt.Errorf("adding connection: %w", err)
957 defer t.dropConnection(c)
958 go c.writer(time.Minute)
959 cl.sendInitialMessages(c, t)
960 err := c.mainReadLoop()
962 return fmt.Errorf("main read loop: %w", err)
967 // See the order given in Transmission's tr_peerMsgsNew.
968 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
969 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
970 conn.post(pp.Message{
972 ExtendedID: pp.HandshakeExtendedID,
973 ExtendedPayload: func() []byte {
974 msg := pp.ExtendedHandshakeMessage{
975 M: map[pp.ExtensionName]pp.ExtensionNumber{
976 pp.ExtensionNameMetadata: metadataExtendedId,
978 V: cl.config.ExtendedHandshakeClientVersion,
979 // If peer requests are buffered on read, this instructs the amount of memory
980 // that might be used to cache pending writes. Assuming 512KiB cached for
981 // sending, for 16KiB chunks.
983 YourIp: pp.CompactIp(conn.remoteIp()),
984 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
985 Port: cl.incomingPeerPort(),
986 MetadataSize: torrent.metadataSize(),
987 // TODO: We can figured these out specific to the socket
989 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
990 Ipv6: cl.config.PublicIp6.To16(),
992 if !cl.config.DisablePEX {
993 msg.M[pp.ExtensionNamePex] = pexExtendedId
995 return bencode.MustMarshal(msg)
1000 if conn.fastEnabled() {
1001 if torrent.haveAllPieces() {
1002 conn.post(pp.Message{Type: pp.HaveAll})
1003 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
1005 } else if !torrent.haveAnyPieces() {
1006 conn.post(pp.Message{Type: pp.HaveNone})
1007 conn.sentHaves.Clear()
1013 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1014 conn.post(pp.Message{
1021 func (cl *Client) dhtPort() (ret uint16) {
1022 cl.eachDhtServer(func(s DhtServer) {
1023 ret = uint16(missinggo.AddrPort(s.Addr()))
1028 func (cl *Client) haveDhtServer() (ret bool) {
1029 cl.eachDhtServer(func(_ DhtServer) {
1035 // Process incoming ut_metadata message.
1036 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1037 var d map[string]int
1038 err := bencode.Unmarshal(payload, &d)
1039 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1040 } else if err != nil {
1041 return fmt.Errorf("error unmarshalling bencode: %s", err)
1043 msgType, ok := d["msg_type"]
1045 return errors.New("missing msg_type field")
1049 case pp.DataMetadataExtensionMsgType:
1050 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1051 if !c.requestedMetadataPiece(piece) {
1052 return fmt.Errorf("got unexpected piece %d", piece)
1054 c.metadataRequests[piece] = false
1055 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1056 if begin < 0 || begin >= len(payload) {
1057 return fmt.Errorf("data has bad offset in payload: %d", begin)
1059 t.saveMetadataPiece(piece, payload[begin:])
1060 c.lastUsefulChunkReceived = time.Now()
1061 err = t.maybeCompleteMetadata()
1063 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1064 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1065 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1066 // log consumers can filter for this message.
1067 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1070 case pp.RequestMetadataExtensionMsgType:
1071 if !t.haveMetadataPiece(piece) {
1072 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1075 start := (1 << 14) * piece
1076 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1077 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1079 case pp.RejectMetadataExtensionMsgType:
1082 return errors.New("unknown msg_type value")
1086 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1087 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1088 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1093 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1097 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1100 if _, ok := cl.ipBlockRange(ip); ok {
1103 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1109 // Return a Torrent ready for insertion into a Client.
1110 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1111 // use provided storage, if provided
1112 storageClient := cl.defaultStorage
1113 if specStorage != nil {
1114 storageClient = storage.NewClient(specStorage)
1120 peers: prioritizedPeers{
1122 getPrio: func(p PeerInfo) peerPriority {
1124 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1127 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1129 halfOpen: make(map[string]PeerInfo),
1130 pieceStateChanges: pubsub.NewPubSub(),
1132 storageOpener: storageClient,
1133 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1135 networkingEnabled: true,
1136 metadataChanged: sync.Cond{
1139 webSeeds: make(map[string]*Peer),
1141 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1142 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1143 t.logger = cl.logger.WithContextValue(t)
1144 t.setChunkSize(defaultChunkSize)
1148 // A file-like handle to some torrent data resource.
1149 type Handle interface {
1156 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1157 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1160 // Adds a torrent by InfoHash with a custom Storage implementation.
1161 // If the torrent already exists then this Storage is ignored and the
1162 // existing torrent returned with `new` set to `false`
1163 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1166 t, ok := cl.torrents[infoHash]
1172 t = cl.newTorrent(infoHash, specStorage)
1173 cl.eachDhtServer(func(s DhtServer) {
1174 go t.dhtAnnouncer(s)
1176 cl.torrents[infoHash] = t
1177 cl.clearAcceptLimits()
1178 t.updateWantPeersEvent()
1179 // Tickle Client.waitAccept, new torrent may want conns.
1180 cl.event.Broadcast()
1184 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1185 // Torrent.MergeSpec.
1186 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1187 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1188 err = t.MergeSpec(spec)
1189 if err != nil && new {
1195 type stringAddr string
1197 var _ net.Addr = stringAddr("")
1199 func (stringAddr) Network() string { return "" }
1200 func (me stringAddr) String() string { return string(me) }
1202 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1203 // spec.DisallowDataDownload/Upload will be read and applied
1204 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1205 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1206 if spec.DisplayName != "" {
1207 t.SetDisplayName(spec.DisplayName)
1209 if spec.InfoBytes != nil {
1210 err := t.SetInfoBytes(spec.InfoBytes)
1216 cl.AddDhtNodes(spec.DhtNodes)
1219 useTorrentSources(spec.Sources, t)
1220 for _, url := range spec.Webseeds {
1223 for _, peerAddr := range spec.PeerAddrs {
1225 Addr: stringAddr(peerAddr),
1226 Source: PeerSourceDirect,
1230 if spec.ChunkSize != 0 {
1231 t.setChunkSize(pp.Integer(spec.ChunkSize))
1233 t.addTrackers(spec.Trackers)
1235 t.dataDownloadDisallowed = spec.DisallowDataDownload
1236 t.dataUploadDisallowed = spec.DisallowDataUpload
1240 func useTorrentSources(sources []string, t *Torrent) {
1241 for _, s := range sources {
1243 err := useTorrentSource(s, t)
1245 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1247 t.logger.Printf("successfully used source %q", s)
1253 func useTorrentSource(source string, t *Torrent) error {
1254 req, err := http.NewRequest(http.MethodGet, source, nil)
1258 ctx, cancel := context.WithCancel(context.Background())
1268 req = req.WithContext(ctx)
1269 resp, err := http.DefaultClient.Do(req)
1273 mi, err := metainfo.Load(resp.Body)
1275 if ctx.Err() != nil {
1280 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1283 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1284 t, ok := cl.torrents[infoHash]
1286 err = fmt.Errorf("no such torrent")
1293 delete(cl.torrents, infoHash)
1297 func (cl *Client) allTorrentsCompleted() bool {
1298 for _, t := range cl.torrents {
1302 if !t.haveAllPieces() {
1309 // Returns true when all torrents are completely downloaded and false if the
1310 // client is stopped before that.
1311 func (cl *Client) WaitAll() bool {
1314 for !cl.allTorrentsCompleted() {
1315 if cl.closed.IsSet() {
1323 // Returns handles to all the torrents loaded in the Client.
1324 func (cl *Client) Torrents() []*Torrent {
1327 return cl.torrentsAsSlice()
1330 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1331 for _, t := range cl.torrents {
1332 ret = append(ret, t)
1337 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1338 spec, err := TorrentSpecFromMagnetUri(uri)
1342 T, _, err = cl.AddTorrentSpec(spec)
1346 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1347 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1351 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1352 mi, err := metainfo.LoadFromFile(filename)
1356 return cl.AddTorrent(mi)
1359 func (cl *Client) DhtServers() []DhtServer {
1360 return cl.dhtServers
1363 func (cl *Client) AddDhtNodes(nodes []string) {
1364 for _, n := range nodes {
1365 hmp := missinggo.SplitHostMaybePort(n)
1366 ip := net.ParseIP(hmp.Host)
1368 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1371 ni := krpc.NodeInfo{
1372 Addr: krpc.NodeAddr{
1377 cl.eachDhtServer(func(s DhtServer) {
1383 func (cl *Client) banPeerIP(ip net.IP) {
1384 cl.logger.Printf("banning ip %v", ip)
1385 if cl.badPeerIPs == nil {
1386 cl.badPeerIPs = make(map[string]struct{})
1388 cl.badPeerIPs[ip.String()] = struct{}{}
1391 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1400 PeerMaxRequests: 250,
1402 RemoteAddr: remoteAddr,
1404 callbacks: &cl.config.Callbacks,
1406 connString: connString,
1408 writeBuffer: new(bytes.Buffer),
1411 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1412 c.writerCond.L = cl.locker()
1413 c.setRW(connStatsReadWriter{nc, c})
1414 c.r = &rateLimitedReader{
1415 l: cl.config.DownloadRateLimiter,
1418 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1419 for _, f := range cl.config.Callbacks.NewPeer {
1425 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1432 t.addPeers([]PeerInfo{{
1433 Addr: ipPortAddr{ip, port},
1434 Source: PeerSourceDhtAnnouncePeer,
1438 func firstNotNil(ips ...net.IP) net.IP {
1439 for _, ip := range ips {
1447 func (cl *Client) eachDialer(f func(Dialer) bool) {
1448 for _, s := range cl.dialers {
1455 func (cl *Client) eachListener(f func(Listener) bool) {
1456 for _, s := range cl.listeners {
1463 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1464 cl.eachListener(func(l Listener) bool {
1471 func (cl *Client) publicIp(peer net.IP) net.IP {
1472 // TODO: Use BEP 10 to determine how peers are seeing us.
1473 if peer.To4() != nil {
1475 cl.config.PublicIp4,
1476 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1481 cl.config.PublicIp6,
1482 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1486 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1487 l := cl.findListener(
1488 func(l Listener) bool {
1489 return f(addrIpOrNil(l.Addr()))
1495 return addrIpOrNil(l.Addr())
1498 // Our IP as a peer should see it.
1499 func (cl *Client) publicAddr(peer net.IP) IpPort {
1500 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1503 // ListenAddrs addresses currently being listened to.
1504 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1507 cl.eachListener(func(l Listener) bool {
1508 ret = append(ret, l.Addr())
1514 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1515 ipa, ok := tryIpPortFromNetAddr(addr)
1519 ip := maskIpForAcceptLimiting(ipa.IP)
1520 if cl.acceptLimiter == nil {
1521 cl.acceptLimiter = make(map[ipStr]int)
1523 cl.acceptLimiter[ipStr(ip.String())]++
1526 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1527 if ip4 := ip.To4(); ip4 != nil {
1528 return ip4.Mask(net.CIDRMask(24, 32))
1533 func (cl *Client) clearAcceptLimits() {
1534 cl.acceptLimiter = nil
1537 func (cl *Client) acceptLimitClearer() {
1540 case <-cl.closed.LockedChan(cl.locker()):
1542 case <-time.After(15 * time.Minute):
1544 cl.clearAcceptLimits()
1550 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1551 if cl.config.DisableAcceptRateLimiting {
1554 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1557 func (cl *Client) rLock() {
1561 func (cl *Client) rUnlock() {
1565 func (cl *Client) lock() {
1569 func (cl *Client) unlock() {
1573 func (cl *Client) locker() *lockWithDeferreds {
1577 func (cl *Client) String() string {
1578 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1581 // Returns connection-level aggregate stats at the Client level. See the comment on
1582 // TorrentStats.ConnStats.
1583 func (cl *Client) ConnStats() ConnStats {
1584 return cl.stats.Copy()