17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/perf"
21 "github.com/anacrolix/missinggo/pubsub"
22 "github.com/anacrolix/missinggo/slices"
23 "github.com/anacrolix/missinggo/v2"
24 "github.com/anacrolix/missinggo/v2/bitmap"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
30 "github.com/pion/datachannel"
31 "golang.org/x/time/rate"
32 "golang.org/x/xerrors"
34 "github.com/anacrolix/chansync"
36 "github.com/anacrolix/torrent/bencode"
37 "github.com/anacrolix/torrent/internal/limiter"
38 "github.com/anacrolix/torrent/iplist"
39 "github.com/anacrolix/torrent/metainfo"
40 "github.com/anacrolix/torrent/mse"
41 pp "github.com/anacrolix/torrent/peer_protocol"
42 "github.com/anacrolix/torrent/storage"
43 "github.com/anacrolix/torrent/tracker"
44 "github.com/anacrolix/torrent/webtorrent"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
56 closed missinggo.Event
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
80 websocketTrackers websocketTrackers
82 activeAnnounceLimiter limiter.Instance
84 updateRequests chansync.BroadcastCond
89 func (cl *Client) BadPeerIPs() []string {
92 return cl.badPeerIPsLocked()
95 func (cl *Client) badPeerIPsLocked() []string {
96 return slices.FromMapKeys(cl.badPeerIPs).([]string)
99 func (cl *Client) PeerID() PeerID {
103 // Returns the port number for the first listener that has one. No longer assumes that all port
104 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
106 func (cl *Client) LocalPort() (port int) {
107 cl.eachListener(func(l Listener) bool {
108 port = addrPortOrZero(l.Addr())
114 func writeDhtServerStatus(w io.Writer, s DhtServer) {
115 dhtStats := s.Stats()
116 fmt.Fprintf(w, " ID: %x\n", s.ID())
117 spew.Fdump(w, dhtStats)
120 // Writes out a human readable status of the client, such as for writing to a
122 func (cl *Client) WriteStatus(_w io.Writer) {
125 w := bufio.NewWriter(_w)
127 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
128 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
129 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
130 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
131 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
132 cl.eachDhtServer(func(s DhtServer) {
133 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
134 writeDhtServerStatus(w, s)
136 spew.Fdump(w, &cl.stats)
137 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
139 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
140 return l.InfoHash().AsString() < r.InfoHash().AsString()
143 fmt.Fprint(w, "<unknown name>")
145 fmt.Fprint(w, t.name())
151 "%f%% of %d bytes (%s)",
152 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
154 humanize.Bytes(uint64(*t.length)))
156 w.WriteString("<missing metainfo>")
164 // Filters things that are less than warning from UPnP discovery.
165 func upnpDiscoverLogFilter(m log.Msg) bool {
166 level, ok := m.GetLevel()
167 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
170 func (cl *Client) initLogger() {
171 logger := cl.config.Logger
174 if !cl.config.Debug {
175 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
178 cl.logger = logger.WithValues(cl)
181 func (cl *Client) announceKey() int32 {
182 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
185 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
187 cfg = NewDefaultClientConfig()
197 dopplegangerAddrs: make(map[string]struct{}),
198 torrents: make(map[metainfo.Hash]*Torrent),
199 dialRateLimiter: rate.NewLimiter(10, 10),
201 cl.activeAnnounceLimiter.SlotsPerKey = 2
202 go cl.acceptLimitClearer()
210 cl.event.L = cl.locker()
211 storageImpl := cfg.DefaultStorage
212 if storageImpl == nil {
213 // We'd use mmap by default but HFS+ doesn't support sparse files.
214 storageImplCloser := storage.NewFile(cfg.DataDir)
215 cl.onClose = append(cl.onClose, func() {
216 if err := storageImplCloser.Close(); err != nil {
217 cl.logger.Printf("error closing default storage: %s", err)
220 storageImpl = storageImplCloser
222 cl.defaultStorage = storage.NewClient(storageImpl)
223 if cfg.IPBlocklist != nil {
224 cl.ipBlockList = cfg.IPBlocklist
227 if cfg.PeerID != "" {
228 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
230 o := copy(cl.peerID[:], cfg.Bep20)
231 _, err = rand.Read(cl.peerID[o:])
233 panic("error generating peer id")
237 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
245 for _, _s := range sockets {
246 s := _s // Go is fucking retarded.
247 cl.onClose = append(cl.onClose, func() { s.Close() })
248 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
249 cl.dialers = append(cl.dialers, s)
250 cl.listeners = append(cl.listeners, s)
251 if cl.config.AcceptPeerConnections {
252 go cl.acceptConnections(s)
259 for _, s := range sockets {
260 if pc, ok := s.(net.PacketConn); ok {
261 ds, err := cl.NewAnacrolixDhtServer(pc)
265 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
266 cl.onClose = append(cl.onClose, func() { ds.Close() })
271 cl.websocketTrackers = websocketTrackers{
274 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
277 t, ok := cl.torrents[infoHash]
279 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
281 return t.announceRequest(event), nil
283 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
286 t, ok := cl.torrents[dcc.InfoHash]
288 cl.logger.WithDefaultLevel(log.Warning).Printf(
289 "got webrtc conn for unloaded torrent with infohash %x",
295 go t.onWebRtcConn(dc, dcc)
304 func (cl *Client) AddDhtServer(d DhtServer) {
305 cl.dhtServers = append(cl.dhtServers, d)
308 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
309 // given address for any Torrent.
310 func (cl *Client) AddDialer(d Dialer) {
313 cl.dialers = append(cl.dialers, d)
314 for _, t := range cl.torrents {
319 func (cl *Client) Listeners() []Listener {
323 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
325 func (cl *Client) AddListener(l Listener) {
326 cl.listeners = append(cl.listeners, l)
327 if cl.config.AcceptPeerConnections {
328 go cl.acceptConnections(l)
332 func (cl *Client) firewallCallback(net.Addr) bool {
334 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
337 torrent.Add("connections firewalled", 1)
339 torrent.Add("connections not firewalled", 1)
344 func (cl *Client) listenOnNetwork(n network) bool {
345 if n.Ipv4 && cl.config.DisableIPv4 {
348 if n.Ipv6 && cl.config.DisableIPv6 {
351 if n.Tcp && cl.config.DisableTCP {
354 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
360 func (cl *Client) listenNetworks() (ns []network) {
361 for _, n := range allPeerNetworks {
362 if cl.listenOnNetwork(n) {
369 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
370 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
371 cfg := dht.ServerConfig{
372 IPBlocklist: cl.ipBlockList,
374 OnAnnouncePeer: cl.onDHTAnnouncePeer,
375 PublicIP: func() net.IP {
376 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
377 return cl.config.PublicIp6
379 return cl.config.PublicIp4
381 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
382 OnQuery: cl.config.DHTOnQuery,
383 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
385 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
388 s, err = dht.NewServer(&cfg)
391 ts, err := s.Bootstrap()
393 cl.logger.Printf("error bootstrapping dht: %s", err)
395 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
401 func (cl *Client) Closed() <-chan struct{} {
407 func (cl *Client) eachDhtServer(f func(DhtServer)) {
408 for _, ds := range cl.dhtServers {
413 // Stops the client. All connections to peers are closed and all activity will
415 func (cl *Client) Close() {
419 for _, t := range cl.torrents {
422 for i := range cl.onClose {
423 cl.onClose[len(cl.onClose)-1-i]()
428 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
429 if cl.ipBlockList == nil {
432 return cl.ipBlockList.Lookup(ip)
435 func (cl *Client) ipIsBlocked(ip net.IP) bool {
436 _, blocked := cl.ipBlockRange(ip)
440 func (cl *Client) wantConns() bool {
441 for _, t := range cl.torrents {
449 func (cl *Client) waitAccept() {
451 if cl.closed.IsSet() {
461 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
462 func (cl *Client) rejectAccepted(conn net.Conn) error {
463 ra := conn.RemoteAddr()
464 if rip := addrIpOrNil(ra); rip != nil {
465 if cl.config.DisableIPv4Peers && rip.To4() != nil {
466 return errors.New("ipv4 peers disabled")
468 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
469 return errors.New("ipv4 disabled")
472 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
473 return errors.New("ipv6 disabled")
475 if cl.rateLimitAccept(rip) {
476 return errors.New("source IP accepted rate limited")
478 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
479 return errors.New("bad source addr")
485 func (cl *Client) acceptConnections(l Listener) {
487 conn, err := l.Accept()
488 torrent.Add("client listener accepts", 1)
489 conn = pproffd.WrapNetConn(conn)
491 closed := cl.closed.IsSet()
494 reject = cl.rejectAccepted(conn)
504 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
509 torrent.Add("rejected accepted connections", 1)
510 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
513 go cl.incomingConnection(conn)
515 log.Fmsg("accepted %q connection at %q from %q",
519 ).SetLevel(log.Debug).Log(cl.logger)
520 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
521 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
522 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
527 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
528 func regularNetConnPeerConnConnString(nc net.Conn) string {
529 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
532 func (cl *Client) incomingConnection(nc net.Conn) {
534 if tc, ok := nc.(*net.TCPConn); ok {
537 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
538 regularNetConnPeerConnConnString(nc))
540 c.Discovery = PeerSourceIncoming
541 cl.runReceivedConn(c)
544 // Returns a handle to the given torrent, if it's present in the client.
545 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
548 t, ok = cl.torrents[ih]
552 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
553 return cl.torrents[ih]
556 type dialResult struct {
561 func countDialResult(err error) {
563 torrent.Add("successful dials", 1)
565 torrent.Add("unsuccessful dials", 1)
569 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
570 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
571 if ret < minDialTimeout {
577 // Returns whether an address is known to connect to a client with our own ID.
578 func (cl *Client) dopplegangerAddr(addr string) bool {
579 _, ok := cl.dopplegangerAddrs[addr]
583 // Returns a connection over UTP or TCP, whichever is first to connect.
584 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
586 t := perf.NewTimer(perf.CallerName(0))
589 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
591 t.Mark("returned conn over " + res.Network)
595 ctx, cancel := context.WithCancel(ctx)
596 // As soon as we return one connection, cancel the others.
599 resCh := make(chan dialResult, left)
603 cl.eachDialer(func(s Dialer) bool {
606 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
609 cl.dialFromSocket(ctx, s, addr),
610 s.LocalAddr().Network(),
617 // Wait for a successful connection.
619 defer perf.ScopeTimer()()
620 for ; left > 0 && res.Conn == nil; left-- {
624 // There are still incompleted dials.
626 for ; left > 0; left-- {
627 conn := (<-resCh).Conn
634 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
636 //if res.Conn != nil {
637 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
639 // cl.logger.Printf("failed to dial %s", addr)
644 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
645 c, err := s.Dial(ctx, addr)
646 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
647 // it now in case we close the connection forthwith.
648 if tc, ok := c.(*net.TCPConn); ok {
655 func forgettableDialError(err error) bool {
656 return strings.Contains(err.Error(), "no suitable address found")
659 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
660 if _, ok := t.halfOpen[addr]; !ok {
661 panic("invariant broken")
663 delete(t.halfOpen, addr)
665 for _, t := range cl.torrents {
670 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
671 // for valid reasons.
672 func (cl *Client) initiateProtocolHandshakes(
676 outgoing, encryptHeader bool,
677 remoteAddr PeerRemoteAddr,
678 network, connString string,
680 c *PeerConn, err error,
682 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
683 c.headerEncrypted = encryptHeader
684 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
686 dl, ok := ctx.Deadline()
690 err = nc.SetDeadline(dl)
694 err = cl.initiateHandshakes(c, t)
698 // Returns nil connection and nil error if no connection could be established for valid reasons.
699 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
700 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
703 return t.dialTimeout()
706 dr := cl.dialFirst(dialCtx, addr.String())
709 if dialCtx.Err() != nil {
710 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
712 return nil, errors.New("dial failed")
714 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
721 // Returns nil connection and nil error if no connection could be established
722 // for valid reasons.
723 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
724 torrent.Add("establish outgoing connection", 1)
725 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
726 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
728 torrent.Add("initiated conn with preferred header obfuscation", 1)
731 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
732 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
733 // We should have just tried with the preferred header obfuscation. If it was required,
734 // there's nothing else to try.
737 // Try again with encryption if we didn't earlier, or without if we did.
738 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
740 torrent.Add("initiated conn with fallback header obfuscation", 1)
742 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
746 // Called to dial out and run a connection. The addr we're given is already
747 // considered half-open.
748 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
749 cl.dialRateLimiter.Wait(context.Background())
750 c, err := cl.establishOutgoingConn(t, addr)
753 // Don't release lock between here and addPeerConn, unless it's for
755 cl.noLongerHalfOpen(t, addr.String())
758 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
765 t.runHandshookConnLoggingErr(c)
768 // The port number for incoming peer connections. 0 if the client isn't listening.
769 func (cl *Client) incomingPeerPort() int {
770 return cl.LocalPort()
773 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
774 if c.headerEncrypted {
777 rw, c.cryptoMethod, err = mse.InitiateHandshake(
784 cl.config.CryptoProvides,
788 return xerrors.Errorf("header obfuscation handshake: %w", err)
791 ih, err := cl.connBtHandshake(c, &t.infoHash)
793 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
795 if ih != t.infoHash {
796 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
801 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
802 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
803 func (cl *Client) forSkeys(f func([]byte) bool) {
806 if false { // Emulate the bug from #114
808 for ih := range cl.torrents {
812 for range cl.torrents {
819 for ih := range cl.torrents {
826 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
827 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
833 // Do encryption and bittorrent handshakes as receiver.
834 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
835 defer perf.ScopeTimerErr(&err)()
837 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
839 if err == nil || err == mse.ErrNoSecretKeyMatch {
840 if c.headerEncrypted {
841 torrent.Add("handshakes received encrypted", 1)
843 torrent.Add("handshakes received unencrypted", 1)
846 torrent.Add("handshakes received with error while handling encryption", 1)
849 if err == mse.ErrNoSecretKeyMatch {
854 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
855 err = errors.New("connection does not have required header obfuscation")
858 ih, err := cl.connBtHandshake(c, nil)
860 err = xerrors.Errorf("during bt handshake: %w", err)
869 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
870 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
875 c.PeerExtensionBytes = res.PeerExtensionBits
876 c.PeerID = res.PeerID
877 c.completedHandshake = time.Now()
878 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
884 func (cl *Client) runReceivedConn(c *PeerConn) {
885 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
889 t, err := cl.receiveHandshakes(c)
892 "error receiving handshakes on %v: %s", c, err,
893 ).SetLevel(log.Debug).
895 "network", c.Network,
897 torrent.Add("error receiving handshake", 1)
899 cl.onBadAccept(c.RemoteAddr)
904 torrent.Add("received handshake for unloaded torrent", 1)
905 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
907 cl.onBadAccept(c.RemoteAddr)
911 torrent.Add("received handshake for loaded torrent", 1)
914 t.runHandshookConnLoggingErr(c)
917 // Client lock must be held before entering this.
918 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
920 if c.PeerID == cl.peerID {
923 addr := c.conn.RemoteAddr().String()
924 cl.dopplegangerAddrs[addr] = struct{}{}
926 // Because the remote address is not necessarily the same as its client's torrent listen
927 // address, we won't record the remote address as a doppleganger. Instead, the initiator
928 // can record *us* as the doppleganger.
930 return errors.New("local and remote peer ids are the same")
932 c.conn.SetWriteDeadline(time.Time{})
933 c.r = deadlineReader{c.conn, c.r}
934 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
935 if connIsIpv6(c.conn) {
936 torrent.Add("completed handshake over ipv6", 1)
938 if err := t.addPeerConn(c); err != nil {
939 return fmt.Errorf("adding connection: %w", err)
941 defer t.dropConnection(c)
943 cl.sendInitialMessages(c, t)
944 err := c.mainReadLoop()
946 return fmt.Errorf("main read loop: %w", err)
951 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
952 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
953 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
954 const localClientReqq = 1 << 5
956 // See the order given in Transmission's tr_peerMsgsNew.
957 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
958 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
959 conn.write(pp.Message{
961 ExtendedID: pp.HandshakeExtendedID,
962 ExtendedPayload: func() []byte {
963 msg := pp.ExtendedHandshakeMessage{
964 M: map[pp.ExtensionName]pp.ExtensionNumber{
965 pp.ExtensionNameMetadata: metadataExtendedId,
967 V: cl.config.ExtendedHandshakeClientVersion,
968 Reqq: localClientReqq,
969 YourIp: pp.CompactIp(conn.remoteIp()),
970 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
971 Port: cl.incomingPeerPort(),
972 MetadataSize: torrent.metadataSize(),
973 // TODO: We can figured these out specific to the socket
975 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
976 Ipv6: cl.config.PublicIp6.To16(),
978 if !cl.config.DisablePEX {
979 msg.M[pp.ExtensionNamePex] = pexExtendedId
981 return bencode.MustMarshal(msg)
986 if conn.fastEnabled() {
987 if torrent.haveAllPieces() {
988 conn.write(pp.Message{Type: pp.HaveAll})
989 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
991 } else if !torrent.haveAnyPieces() {
992 conn.write(pp.Message{Type: pp.HaveNone})
993 conn.sentHaves.Clear()
999 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1000 conn.write(pp.Message{
1007 func (cl *Client) dhtPort() (ret uint16) {
1008 cl.eachDhtServer(func(s DhtServer) {
1009 ret = uint16(missinggo.AddrPort(s.Addr()))
1014 func (cl *Client) haveDhtServer() (ret bool) {
1015 cl.eachDhtServer(func(_ DhtServer) {
1021 // Process incoming ut_metadata message.
1022 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1023 var d map[string]int
1024 err := bencode.Unmarshal(payload, &d)
1025 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1026 } else if err != nil {
1027 return fmt.Errorf("error unmarshalling bencode: %s", err)
1029 msgType, ok := d["msg_type"]
1031 return errors.New("missing msg_type field")
1035 case pp.DataMetadataExtensionMsgType:
1036 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1037 if !c.requestedMetadataPiece(piece) {
1038 return fmt.Errorf("got unexpected piece %d", piece)
1040 c.metadataRequests[piece] = false
1041 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1042 if begin < 0 || begin >= len(payload) {
1043 return fmt.Errorf("data has bad offset in payload: %d", begin)
1045 t.saveMetadataPiece(piece, payload[begin:])
1046 c.lastUsefulChunkReceived = time.Now()
1047 err = t.maybeCompleteMetadata()
1049 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1050 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1051 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1052 // log consumers can filter for this message.
1053 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1056 case pp.RequestMetadataExtensionMsgType:
1057 if !t.haveMetadataPiece(piece) {
1058 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1061 start := (1 << 14) * piece
1062 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1063 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1065 case pp.RejectMetadataExtensionMsgType:
1068 return errors.New("unknown msg_type value")
1072 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1073 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1074 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1079 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1083 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1086 if _, ok := cl.ipBlockRange(ip); ok {
1089 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1095 // Return a Torrent ready for insertion into a Client.
1096 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1097 // use provided storage, if provided
1098 storageClient := cl.defaultStorage
1099 if specStorage != nil {
1100 storageClient = storage.NewClient(specStorage)
1106 peers: prioritizedPeers{
1108 getPrio: func(p PeerInfo) peerPriority {
1110 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1113 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1115 halfOpen: make(map[string]PeerInfo),
1116 pieceStateChanges: pubsub.NewPubSub(),
1118 storageOpener: storageClient,
1119 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1121 networkingEnabled: true,
1122 metadataChanged: sync.Cond{
1125 webSeeds: make(map[string]*Peer),
1127 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1128 t.logger = cl.logger.WithContextValue(t)
1129 t.setChunkSize(defaultChunkSize)
1133 // A file-like handle to some torrent data resource.
1134 type Handle interface {
1141 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1142 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1145 // Adds a torrent by InfoHash with a custom Storage implementation.
1146 // If the torrent already exists then this Storage is ignored and the
1147 // existing torrent returned with `new` set to `false`
1148 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1151 t, ok := cl.torrents[infoHash]
1157 t = cl.newTorrent(infoHash, specStorage)
1158 cl.eachDhtServer(func(s DhtServer) {
1159 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1160 go t.dhtAnnouncer(s)
1163 cl.torrents[infoHash] = t
1164 cl.clearAcceptLimits()
1165 t.updateWantPeersEvent()
1166 // Tickle Client.waitAccept, new torrent may want conns.
1167 cl.event.Broadcast()
1171 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1172 // Torrent.MergeSpec.
1173 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1174 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1175 err = t.MergeSpec(spec)
1176 if err != nil && new {
1182 type stringAddr string
1184 var _ net.Addr = stringAddr("")
1186 func (stringAddr) Network() string { return "" }
1187 func (me stringAddr) String() string { return string(me) }
1189 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1190 // spec.DisallowDataDownload/Upload will be read and applied
1191 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1192 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1193 if spec.DisplayName != "" {
1194 t.SetDisplayName(spec.DisplayName)
1196 if spec.InfoBytes != nil {
1197 err := t.SetInfoBytes(spec.InfoBytes)
1203 cl.AddDhtNodes(spec.DhtNodes)
1206 useTorrentSources(spec.Sources, t)
1207 for _, url := range spec.Webseeds {
1210 for _, peerAddr := range spec.PeerAddrs {
1212 Addr: stringAddr(peerAddr),
1213 Source: PeerSourceDirect,
1217 if spec.ChunkSize != 0 {
1218 t.setChunkSize(pp.Integer(spec.ChunkSize))
1220 t.addTrackers(spec.Trackers)
1222 t.dataDownloadDisallowed = spec.DisallowDataDownload
1223 t.dataUploadDisallowed = spec.DisallowDataUpload
1227 func useTorrentSources(sources []string, t *Torrent) {
1228 for _, s := range sources {
1230 err := useTorrentSource(s, t)
1232 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1234 t.logger.Printf("successfully used source %q", s)
1240 func useTorrentSource(source string, t *Torrent) error {
1241 req, err := http.NewRequest(http.MethodGet, source, nil)
1245 ctx, cancel := context.WithCancel(context.Background())
1255 req = req.WithContext(ctx)
1256 resp, err := http.DefaultClient.Do(req)
1260 mi, err := metainfo.Load(resp.Body)
1262 if ctx.Err() != nil {
1267 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1270 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1271 t, ok := cl.torrents[infoHash]
1273 err = fmt.Errorf("no such torrent")
1280 delete(cl.torrents, infoHash)
1284 func (cl *Client) allTorrentsCompleted() bool {
1285 for _, t := range cl.torrents {
1289 if !t.haveAllPieces() {
1296 // Returns true when all torrents are completely downloaded and false if the
1297 // client is stopped before that.
1298 func (cl *Client) WaitAll() bool {
1301 for !cl.allTorrentsCompleted() {
1302 if cl.closed.IsSet() {
1310 // Returns handles to all the torrents loaded in the Client.
1311 func (cl *Client) Torrents() []*Torrent {
1314 return cl.torrentsAsSlice()
1317 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1318 for _, t := range cl.torrents {
1319 ret = append(ret, t)
1324 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1325 spec, err := TorrentSpecFromMagnetUri(uri)
1329 T, _, err = cl.AddTorrentSpec(spec)
1333 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1334 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1338 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1339 mi, err := metainfo.LoadFromFile(filename)
1343 return cl.AddTorrent(mi)
1346 func (cl *Client) DhtServers() []DhtServer {
1347 return cl.dhtServers
1350 func (cl *Client) AddDhtNodes(nodes []string) {
1351 for _, n := range nodes {
1352 hmp := missinggo.SplitHostMaybePort(n)
1353 ip := net.ParseIP(hmp.Host)
1355 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1358 ni := krpc.NodeInfo{
1359 Addr: krpc.NodeAddr{
1364 cl.eachDhtServer(func(s DhtServer) {
1370 func (cl *Client) banPeerIP(ip net.IP) {
1371 cl.logger.Printf("banning ip %v", ip)
1372 if cl.badPeerIPs == nil {
1373 cl.badPeerIPs = make(map[string]struct{})
1375 cl.badPeerIPs[ip.String()] = struct{}{}
1378 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1387 PeerMaxRequests: 250,
1389 RemoteAddr: remoteAddr,
1391 callbacks: &cl.config.Callbacks,
1393 connString: connString,
1397 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1398 c.setRW(connStatsReadWriter{nc, c})
1399 c.r = &rateLimitedReader{
1400 l: cl.config.DownloadRateLimiter,
1403 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1404 for _, f := range cl.config.Callbacks.NewPeer {
1410 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1417 t.addPeers([]PeerInfo{{
1418 Addr: ipPortAddr{ip, port},
1419 Source: PeerSourceDhtAnnouncePeer,
1423 func firstNotNil(ips ...net.IP) net.IP {
1424 for _, ip := range ips {
1432 func (cl *Client) eachDialer(f func(Dialer) bool) {
1433 for _, s := range cl.dialers {
1440 func (cl *Client) eachListener(f func(Listener) bool) {
1441 for _, s := range cl.listeners {
1448 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1449 cl.eachListener(func(l Listener) bool {
1456 func (cl *Client) publicIp(peer net.IP) net.IP {
1457 // TODO: Use BEP 10 to determine how peers are seeing us.
1458 if peer.To4() != nil {
1460 cl.config.PublicIp4,
1461 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1466 cl.config.PublicIp6,
1467 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1471 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1472 l := cl.findListener(
1473 func(l Listener) bool {
1474 return f(addrIpOrNil(l.Addr()))
1480 return addrIpOrNil(l.Addr())
1483 // Our IP as a peer should see it.
1484 func (cl *Client) publicAddr(peer net.IP) IpPort {
1485 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1488 // ListenAddrs addresses currently being listened to.
1489 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1492 cl.eachListener(func(l Listener) bool {
1493 ret = append(ret, l.Addr())
1499 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1500 ipa, ok := tryIpPortFromNetAddr(addr)
1504 ip := maskIpForAcceptLimiting(ipa.IP)
1505 if cl.acceptLimiter == nil {
1506 cl.acceptLimiter = make(map[ipStr]int)
1508 cl.acceptLimiter[ipStr(ip.String())]++
1511 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1512 if ip4 := ip.To4(); ip4 != nil {
1513 return ip4.Mask(net.CIDRMask(24, 32))
1518 func (cl *Client) clearAcceptLimits() {
1519 cl.acceptLimiter = nil
1522 func (cl *Client) acceptLimitClearer() {
1525 case <-cl.closed.LockedChan(cl.locker()):
1527 case <-time.After(15 * time.Minute):
1529 cl.clearAcceptLimits()
1535 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1536 if cl.config.DisableAcceptRateLimiting {
1539 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1542 func (cl *Client) rLock() {
1546 func (cl *Client) rUnlock() {
1550 func (cl *Client) lock() {
1554 func (cl *Client) unlock() {
1558 func (cl *Client) locker() *lockWithDeferreds {
1562 func (cl *Client) String() string {
1563 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1566 // Returns connection-level aggregate stats at the Client level. See the comment on
1567 // TorrentStats.ConnStats.
1568 func (cl *Client) ConnStats() ConnStats {
1569 return cl.stats.Copy()