17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/perf"
21 "github.com/anacrolix/missinggo/pubsub"
22 "github.com/anacrolix/missinggo/slices"
23 "github.com/anacrolix/missinggo/v2"
24 "github.com/anacrolix/missinggo/v2/bitmap"
25 "github.com/anacrolix/missinggo/v2/conntrack"
26 "github.com/anacrolix/missinggo/v2/pproffd"
27 "github.com/anacrolix/sync"
28 "github.com/davecgh/go-spew/spew"
29 "github.com/dustin/go-humanize"
30 "github.com/google/btree"
31 "github.com/pion/datachannel"
32 "golang.org/x/time/rate"
33 "golang.org/x/xerrors"
35 "github.com/anacrolix/torrent/bencode"
36 "github.com/anacrolix/torrent/internal/chansync"
37 "github.com/anacrolix/torrent/internal/limiter"
38 "github.com/anacrolix/torrent/iplist"
39 "github.com/anacrolix/torrent/metainfo"
40 "github.com/anacrolix/torrent/mse"
41 pp "github.com/anacrolix/torrent/peer_protocol"
42 "github.com/anacrolix/torrent/storage"
43 "github.com/anacrolix/torrent/tracker"
44 "github.com/anacrolix/torrent/webtorrent"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
56 closed missinggo.Event
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
80 websocketTrackers websocketTrackers
82 activeAnnounceLimiter limiter.Instance
84 updateRequests chansync.BroadcastCond
89 func (cl *Client) BadPeerIPs() []string {
92 return cl.badPeerIPsLocked()
95 func (cl *Client) badPeerIPsLocked() []string {
96 return slices.FromMapKeys(cl.badPeerIPs).([]string)
99 func (cl *Client) PeerID() PeerID {
103 // Returns the port number for the first listener that has one. No longer assumes that all port
104 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
106 func (cl *Client) LocalPort() (port int) {
107 cl.eachListener(func(l Listener) bool {
108 port = addrPortOrZero(l.Addr())
114 func writeDhtServerStatus(w io.Writer, s DhtServer) {
115 dhtStats := s.Stats()
116 fmt.Fprintf(w, " ID: %x\n", s.ID())
117 spew.Fdump(w, dhtStats)
120 // Writes out a human readable status of the client, such as for writing to a
122 func (cl *Client) WriteStatus(_w io.Writer) {
125 w := bufio.NewWriter(_w)
127 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
128 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
129 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
130 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
131 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
132 cl.eachDhtServer(func(s DhtServer) {
133 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
134 writeDhtServerStatus(w, s)
136 spew.Fdump(w, &cl.stats)
137 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
139 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
140 return l.InfoHash().AsString() < r.InfoHash().AsString()
143 fmt.Fprint(w, "<unknown name>")
145 fmt.Fprint(w, t.name())
151 "%f%% of %d bytes (%s)",
152 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
154 humanize.Bytes(uint64(*t.length)))
156 w.WriteString("<missing metainfo>")
164 // Filters things that are less than warning from UPnP discovery.
165 func upnpDiscoverLogFilter(m log.Msg) bool {
166 level, ok := m.GetLevel()
167 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
170 func (cl *Client) initLogger() {
171 logger := cl.config.Logger
174 if !cl.config.Debug {
175 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
178 cl.logger = logger.WithValues(cl)
181 func (cl *Client) announceKey() int32 {
182 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
185 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
187 cfg = NewDefaultClientConfig()
197 dopplegangerAddrs: make(map[string]struct{}),
198 torrents: make(map[metainfo.Hash]*Torrent),
199 dialRateLimiter: rate.NewLimiter(10, 10),
201 cl.activeAnnounceLimiter.SlotsPerKey = 2
202 go cl.acceptLimitClearer()
210 cl.event.L = cl.locker()
211 storageImpl := cfg.DefaultStorage
212 if storageImpl == nil {
213 // We'd use mmap by default but HFS+ doesn't support sparse files.
214 storageImplCloser := storage.NewFile(cfg.DataDir)
215 cl.onClose = append(cl.onClose, func() {
216 if err := storageImplCloser.Close(); err != nil {
217 cl.logger.Printf("error closing default storage: %s", err)
220 storageImpl = storageImplCloser
222 cl.defaultStorage = storage.NewClient(storageImpl)
223 if cfg.IPBlocklist != nil {
224 cl.ipBlockList = cfg.IPBlocklist
227 if cfg.PeerID != "" {
228 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
230 o := copy(cl.peerID[:], cfg.Bep20)
231 _, err = rand.Read(cl.peerID[o:])
233 panic("error generating peer id")
237 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
245 for _, _s := range sockets {
246 s := _s // Go is fucking retarded.
247 cl.onClose = append(cl.onClose, func() { s.Close() })
248 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
249 cl.dialers = append(cl.dialers, s)
250 cl.listeners = append(cl.listeners, s)
251 go cl.acceptConnections(s)
257 for _, s := range sockets {
258 if pc, ok := s.(net.PacketConn); ok {
259 ds, err := cl.newAnacrolixDhtServer(pc)
263 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
264 cl.onClose = append(cl.onClose, func() { ds.Close() })
269 cl.websocketTrackers = websocketTrackers{
272 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
275 t, ok := cl.torrents[infoHash]
277 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
279 return t.announceRequest(event), nil
281 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
284 t, ok := cl.torrents[dcc.InfoHash]
286 cl.logger.WithDefaultLevel(log.Warning).Printf(
287 "got webrtc conn for unloaded torrent with infohash %x",
293 go t.onWebRtcConn(dc, dcc)
302 func (cl *Client) AddDhtServer(d DhtServer) {
303 cl.dhtServers = append(cl.dhtServers, d)
306 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
307 // given address for any Torrent.
308 func (cl *Client) AddDialer(d Dialer) {
311 cl.dialers = append(cl.dialers, d)
312 for _, t := range cl.torrents {
317 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
319 func (cl *Client) AddListener(l Listener) {
320 cl.listeners = append(cl.listeners, l)
321 go cl.acceptConnections(l)
324 func (cl *Client) firewallCallback(net.Addr) bool {
326 block := !cl.wantConns()
329 torrent.Add("connections firewalled", 1)
331 torrent.Add("connections not firewalled", 1)
336 func (cl *Client) listenOnNetwork(n network) bool {
337 if n.Ipv4 && cl.config.DisableIPv4 {
340 if n.Ipv6 && cl.config.DisableIPv6 {
343 if n.Tcp && cl.config.DisableTCP {
346 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
352 func (cl *Client) listenNetworks() (ns []network) {
353 for _, n := range allPeerNetworks {
354 if cl.listenOnNetwork(n) {
361 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
362 cfg := dht.ServerConfig{
363 IPBlocklist: cl.ipBlockList,
365 OnAnnouncePeer: cl.onDHTAnnouncePeer,
366 PublicIP: func() net.IP {
367 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
368 return cl.config.PublicIp6
370 return cl.config.PublicIp4
372 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
373 ConnectionTracking: cl.config.ConnTracker,
374 OnQuery: cl.config.DHTOnQuery,
375 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
377 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
380 s, err = dht.NewServer(&cfg)
383 ts, err := s.Bootstrap()
385 cl.logger.Printf("error bootstrapping dht: %s", err)
387 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
393 func (cl *Client) Closed() <-chan struct{} {
399 func (cl *Client) eachDhtServer(f func(DhtServer)) {
400 for _, ds := range cl.dhtServers {
405 // Stops the client. All connections to peers are closed and all activity will
407 func (cl *Client) Close() {
411 for _, t := range cl.torrents {
414 for i := range cl.onClose {
415 cl.onClose[len(cl.onClose)-1-i]()
420 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
421 if cl.ipBlockList == nil {
424 return cl.ipBlockList.Lookup(ip)
427 func (cl *Client) ipIsBlocked(ip net.IP) bool {
428 _, blocked := cl.ipBlockRange(ip)
432 func (cl *Client) wantConns() bool {
433 for _, t := range cl.torrents {
441 func (cl *Client) waitAccept() {
443 if cl.closed.IsSet() {
453 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
454 func (cl *Client) rejectAccepted(conn net.Conn) error {
455 ra := conn.RemoteAddr()
456 if rip := addrIpOrNil(ra); rip != nil {
457 if cl.config.DisableIPv4Peers && rip.To4() != nil {
458 return errors.New("ipv4 peers disabled")
460 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
461 return errors.New("ipv4 disabled")
464 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
465 return errors.New("ipv6 disabled")
467 if cl.rateLimitAccept(rip) {
468 return errors.New("source IP accepted rate limited")
470 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
471 return errors.New("bad source addr")
477 func (cl *Client) acceptConnections(l Listener) {
479 conn, err := l.Accept()
480 torrent.Add("client listener accepts", 1)
481 conn = pproffd.WrapNetConn(conn)
483 closed := cl.closed.IsSet()
486 reject = cl.rejectAccepted(conn)
496 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
501 torrent.Add("rejected accepted connections", 1)
502 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
505 go cl.incomingConnection(conn)
507 log.Fmsg("accepted %q connection at %q from %q",
511 ).SetLevel(log.Debug).Log(cl.logger)
512 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
513 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
514 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
519 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
520 func regularNetConnPeerConnConnString(nc net.Conn) string {
521 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
524 func (cl *Client) incomingConnection(nc net.Conn) {
526 if tc, ok := nc.(*net.TCPConn); ok {
529 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
530 regularNetConnPeerConnConnString(nc))
532 c.Discovery = PeerSourceIncoming
533 cl.runReceivedConn(c)
536 // Returns a handle to the given torrent, if it's present in the client.
537 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
540 t, ok = cl.torrents[ih]
544 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
545 return cl.torrents[ih]
548 type dialResult struct {
553 func countDialResult(err error) {
555 torrent.Add("successful dials", 1)
557 torrent.Add("unsuccessful dials", 1)
561 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
562 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
563 if ret < minDialTimeout {
569 // Returns whether an address is known to connect to a client with our own ID.
570 func (cl *Client) dopplegangerAddr(addr string) bool {
571 _, ok := cl.dopplegangerAddrs[addr]
575 // Returns a connection over UTP or TCP, whichever is first to connect.
576 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
578 t := perf.NewTimer(perf.CallerName(0))
581 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
583 t.Mark("returned conn over " + res.Network)
587 ctx, cancel := context.WithCancel(ctx)
588 // As soon as we return one connection, cancel the others.
591 resCh := make(chan dialResult, left)
595 cl.eachDialer(func(s Dialer) bool {
598 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
601 cl.dialFromSocket(ctx, s, addr),
602 s.LocalAddr().Network(),
609 // Wait for a successful connection.
611 defer perf.ScopeTimer()()
612 for ; left > 0 && res.Conn == nil; left-- {
616 // There are still incompleted dials.
618 for ; left > 0; left-- {
619 conn := (<-resCh).Conn
626 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
628 //if res.Conn != nil {
629 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
631 // cl.logger.Printf("failed to dial %s", addr)
636 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
637 network := s.LocalAddr().Network()
638 cte := cl.config.ConnTracker.Wait(
640 conntrack.Entry{network, s.LocalAddr().String(), addr},
641 "dial torrent client",
644 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
645 // which dial errors allow us to forget the connection tracking entry handle.
646 if ctx.Err() != nil {
652 c, err := s.Dial(ctx, addr)
653 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
654 // it now in case we close the connection forthwith.
655 if tc, ok := c.(*net.TCPConn); ok {
660 if err != nil && forgettableDialError(err) {
667 return closeWrapper{c, func() error {
674 func forgettableDialError(err error) bool {
675 return strings.Contains(err.Error(), "no suitable address found")
678 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
679 if _, ok := t.halfOpen[addr]; !ok {
680 panic("invariant broken")
682 delete(t.halfOpen, addr)
684 for _, t := range cl.torrents {
689 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
690 // for valid reasons.
691 func (cl *Client) initiateProtocolHandshakes(
695 outgoing, encryptHeader bool,
696 remoteAddr PeerRemoteAddr,
697 network, connString string,
699 c *PeerConn, err error,
701 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
702 c.headerEncrypted = encryptHeader
703 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
705 dl, ok := ctx.Deadline()
709 err = nc.SetDeadline(dl)
713 err = cl.initiateHandshakes(c, t)
717 // Returns nil connection and nil error if no connection could be established for valid reasons.
718 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
719 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
722 return t.dialTimeout()
725 dr := cl.dialFirst(dialCtx, addr.String())
728 if dialCtx.Err() != nil {
729 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
731 return nil, errors.New("dial failed")
733 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
740 // Returns nil connection and nil error if no connection could be established
741 // for valid reasons.
742 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
743 torrent.Add("establish outgoing connection", 1)
744 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
745 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
747 torrent.Add("initiated conn with preferred header obfuscation", 1)
750 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
751 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
752 // We should have just tried with the preferred header obfuscation. If it was required,
753 // there's nothing else to try.
756 // Try again with encryption if we didn't earlier, or without if we did.
757 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
759 torrent.Add("initiated conn with fallback header obfuscation", 1)
761 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
765 // Called to dial out and run a connection. The addr we're given is already
766 // considered half-open.
767 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
768 cl.dialRateLimiter.Wait(context.Background())
769 c, err := cl.establishOutgoingConn(t, addr)
772 // Don't release lock between here and addPeerConn, unless it's for
774 cl.noLongerHalfOpen(t, addr.String())
777 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
784 t.runHandshookConnLoggingErr(c)
787 // The port number for incoming peer connections. 0 if the client isn't listening.
788 func (cl *Client) incomingPeerPort() int {
789 return cl.LocalPort()
792 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
793 if c.headerEncrypted {
796 rw, c.cryptoMethod, err = mse.InitiateHandshake(
803 cl.config.CryptoProvides,
807 return xerrors.Errorf("header obfuscation handshake: %w", err)
810 ih, err := cl.connBtHandshake(c, &t.infoHash)
812 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
814 if ih != t.infoHash {
815 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
820 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
821 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
822 func (cl *Client) forSkeys(f func([]byte) bool) {
825 if false { // Emulate the bug from #114
827 for ih := range cl.torrents {
831 for range cl.torrents {
838 for ih := range cl.torrents {
845 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
846 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
852 // Do encryption and bittorrent handshakes as receiver.
853 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
854 defer perf.ScopeTimerErr(&err)()
856 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
858 if err == nil || err == mse.ErrNoSecretKeyMatch {
859 if c.headerEncrypted {
860 torrent.Add("handshakes received encrypted", 1)
862 torrent.Add("handshakes received unencrypted", 1)
865 torrent.Add("handshakes received with error while handling encryption", 1)
868 if err == mse.ErrNoSecretKeyMatch {
873 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
874 err = errors.New("connection does not have required header obfuscation")
877 ih, err := cl.connBtHandshake(c, nil)
879 err = xerrors.Errorf("during bt handshake: %w", err)
888 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
889 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
894 c.PeerExtensionBytes = res.PeerExtensionBits
895 c.PeerID = res.PeerID
896 c.completedHandshake = time.Now()
897 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
903 func (cl *Client) runReceivedConn(c *PeerConn) {
904 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
908 t, err := cl.receiveHandshakes(c)
911 "error receiving handshakes on %v: %s", c, err,
912 ).SetLevel(log.Debug).
914 "network", c.Network,
916 torrent.Add("error receiving handshake", 1)
918 cl.onBadAccept(c.RemoteAddr)
923 torrent.Add("received handshake for unloaded torrent", 1)
924 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
926 cl.onBadAccept(c.RemoteAddr)
930 torrent.Add("received handshake for loaded torrent", 1)
933 t.runHandshookConnLoggingErr(c)
936 // Client lock must be held before entering this.
937 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
939 if c.PeerID == cl.peerID {
942 addr := c.conn.RemoteAddr().String()
943 cl.dopplegangerAddrs[addr] = struct{}{}
945 // Because the remote address is not necessarily the same as its client's torrent listen
946 // address, we won't record the remote address as a doppleganger. Instead, the initiator
947 // can record *us* as the doppleganger.
949 return errors.New("local and remote peer ids are the same")
951 c.conn.SetWriteDeadline(time.Time{})
952 c.r = deadlineReader{c.conn, c.r}
953 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
954 if connIsIpv6(c.conn) {
955 torrent.Add("completed handshake over ipv6", 1)
957 if err := t.addPeerConn(c); err != nil {
958 return fmt.Errorf("adding connection: %w", err)
960 defer t.dropConnection(c)
962 cl.sendInitialMessages(c, t)
963 err := c.mainReadLoop()
965 return fmt.Errorf("main read loop: %w", err)
970 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
971 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
972 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
973 const localClientReqq = 1 << 5
975 // See the order given in Transmission's tr_peerMsgsNew.
976 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
977 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
978 conn.write(pp.Message{
980 ExtendedID: pp.HandshakeExtendedID,
981 ExtendedPayload: func() []byte {
982 msg := pp.ExtendedHandshakeMessage{
983 M: map[pp.ExtensionName]pp.ExtensionNumber{
984 pp.ExtensionNameMetadata: metadataExtendedId,
986 V: cl.config.ExtendedHandshakeClientVersion,
987 Reqq: localClientReqq,
988 YourIp: pp.CompactIp(conn.remoteIp()),
989 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
990 Port: cl.incomingPeerPort(),
991 MetadataSize: torrent.metadataSize(),
992 // TODO: We can figured these out specific to the socket
994 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
995 Ipv6: cl.config.PublicIp6.To16(),
997 if !cl.config.DisablePEX {
998 msg.M[pp.ExtensionNamePex] = pexExtendedId
1000 return bencode.MustMarshal(msg)
1005 if conn.fastEnabled() {
1006 if torrent.haveAllPieces() {
1007 conn.write(pp.Message{Type: pp.HaveAll})
1008 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1010 } else if !torrent.haveAnyPieces() {
1011 conn.write(pp.Message{Type: pp.HaveNone})
1012 conn.sentHaves.Clear()
1018 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1019 conn.write(pp.Message{
1026 func (cl *Client) dhtPort() (ret uint16) {
1027 cl.eachDhtServer(func(s DhtServer) {
1028 ret = uint16(missinggo.AddrPort(s.Addr()))
1033 func (cl *Client) haveDhtServer() (ret bool) {
1034 cl.eachDhtServer(func(_ DhtServer) {
1040 // Process incoming ut_metadata message.
1041 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1042 var d map[string]int
1043 err := bencode.Unmarshal(payload, &d)
1044 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1045 } else if err != nil {
1046 return fmt.Errorf("error unmarshalling bencode: %s", err)
1048 msgType, ok := d["msg_type"]
1050 return errors.New("missing msg_type field")
1054 case pp.DataMetadataExtensionMsgType:
1055 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1056 if !c.requestedMetadataPiece(piece) {
1057 return fmt.Errorf("got unexpected piece %d", piece)
1059 c.metadataRequests[piece] = false
1060 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1061 if begin < 0 || begin >= len(payload) {
1062 return fmt.Errorf("data has bad offset in payload: %d", begin)
1064 t.saveMetadataPiece(piece, payload[begin:])
1065 c.lastUsefulChunkReceived = time.Now()
1066 err = t.maybeCompleteMetadata()
1068 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1069 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1070 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1071 // log consumers can filter for this message.
1072 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1075 case pp.RequestMetadataExtensionMsgType:
1076 if !t.haveMetadataPiece(piece) {
1077 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1080 start := (1 << 14) * piece
1081 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1082 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1084 case pp.RejectMetadataExtensionMsgType:
1087 return errors.New("unknown msg_type value")
1091 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1092 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1093 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1098 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1102 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1105 if _, ok := cl.ipBlockRange(ip); ok {
1108 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1114 // Return a Torrent ready for insertion into a Client.
1115 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1116 // use provided storage, if provided
1117 storageClient := cl.defaultStorage
1118 if specStorage != nil {
1119 storageClient = storage.NewClient(specStorage)
1125 peers: prioritizedPeers{
1127 getPrio: func(p PeerInfo) peerPriority {
1129 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1132 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1134 halfOpen: make(map[string]PeerInfo),
1135 pieceStateChanges: pubsub.NewPubSub(),
1137 storageOpener: storageClient,
1138 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1140 networkingEnabled: true,
1141 metadataChanged: sync.Cond{
1144 webSeeds: make(map[string]*Peer),
1146 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1147 t.logger = cl.logger.WithContextValue(t)
1148 t.setChunkSize(defaultChunkSize)
1152 // A file-like handle to some torrent data resource.
1153 type Handle interface {
1160 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1161 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1164 // Adds a torrent by InfoHash with a custom Storage implementation.
1165 // If the torrent already exists then this Storage is ignored and the
1166 // existing torrent returned with `new` set to `false`
1167 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1170 t, ok := cl.torrents[infoHash]
1176 t = cl.newTorrent(infoHash, specStorage)
1177 cl.eachDhtServer(func(s DhtServer) {
1178 go t.dhtAnnouncer(s)
1180 cl.torrents[infoHash] = t
1181 cl.clearAcceptLimits()
1182 t.updateWantPeersEvent()
1183 // Tickle Client.waitAccept, new torrent may want conns.
1184 cl.event.Broadcast()
1188 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1189 // Torrent.MergeSpec.
1190 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1191 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1192 err = t.MergeSpec(spec)
1193 if err != nil && new {
1199 type stringAddr string
1201 var _ net.Addr = stringAddr("")
1203 func (stringAddr) Network() string { return "" }
1204 func (me stringAddr) String() string { return string(me) }
1206 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1207 // spec.DisallowDataDownload/Upload will be read and applied
1208 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1209 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1210 if spec.DisplayName != "" {
1211 t.SetDisplayName(spec.DisplayName)
1213 if spec.InfoBytes != nil {
1214 err := t.SetInfoBytes(spec.InfoBytes)
1220 cl.AddDhtNodes(spec.DhtNodes)
1223 useTorrentSources(spec.Sources, t)
1224 for _, url := range spec.Webseeds {
1227 for _, peerAddr := range spec.PeerAddrs {
1229 Addr: stringAddr(peerAddr),
1230 Source: PeerSourceDirect,
1234 if spec.ChunkSize != 0 {
1235 t.setChunkSize(pp.Integer(spec.ChunkSize))
1237 t.addTrackers(spec.Trackers)
1239 t.dataDownloadDisallowed = spec.DisallowDataDownload
1240 t.dataUploadDisallowed = spec.DisallowDataUpload
1244 func useTorrentSources(sources []string, t *Torrent) {
1245 for _, s := range sources {
1247 err := useTorrentSource(s, t)
1249 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1251 t.logger.Printf("successfully used source %q", s)
1257 func useTorrentSource(source string, t *Torrent) error {
1258 req, err := http.NewRequest(http.MethodGet, source, nil)
1262 ctx, cancel := context.WithCancel(context.Background())
1272 req = req.WithContext(ctx)
1273 resp, err := http.DefaultClient.Do(req)
1277 mi, err := metainfo.Load(resp.Body)
1279 if ctx.Err() != nil {
1284 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1287 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1288 t, ok := cl.torrents[infoHash]
1290 err = fmt.Errorf("no such torrent")
1297 delete(cl.torrents, infoHash)
1301 func (cl *Client) allTorrentsCompleted() bool {
1302 for _, t := range cl.torrents {
1306 if !t.haveAllPieces() {
1313 // Returns true when all torrents are completely downloaded and false if the
1314 // client is stopped before that.
1315 func (cl *Client) WaitAll() bool {
1318 for !cl.allTorrentsCompleted() {
1319 if cl.closed.IsSet() {
1327 // Returns handles to all the torrents loaded in the Client.
1328 func (cl *Client) Torrents() []*Torrent {
1331 return cl.torrentsAsSlice()
1334 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1335 for _, t := range cl.torrents {
1336 ret = append(ret, t)
1341 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1342 spec, err := TorrentSpecFromMagnetUri(uri)
1346 T, _, err = cl.AddTorrentSpec(spec)
1350 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1351 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1355 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1356 mi, err := metainfo.LoadFromFile(filename)
1360 return cl.AddTorrent(mi)
1363 func (cl *Client) DhtServers() []DhtServer {
1364 return cl.dhtServers
1367 func (cl *Client) AddDhtNodes(nodes []string) {
1368 for _, n := range nodes {
1369 hmp := missinggo.SplitHostMaybePort(n)
1370 ip := net.ParseIP(hmp.Host)
1372 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1375 ni := krpc.NodeInfo{
1376 Addr: krpc.NodeAddr{
1381 cl.eachDhtServer(func(s DhtServer) {
1387 func (cl *Client) banPeerIP(ip net.IP) {
1388 cl.logger.Printf("banning ip %v", ip)
1389 if cl.badPeerIPs == nil {
1390 cl.badPeerIPs = make(map[string]struct{})
1392 cl.badPeerIPs[ip.String()] = struct{}{}
1395 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1404 PeerMaxRequests: 250,
1406 RemoteAddr: remoteAddr,
1408 callbacks: &cl.config.Callbacks,
1410 connString: connString,
1414 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1415 c.setRW(connStatsReadWriter{nc, c})
1416 c.r = &rateLimitedReader{
1417 l: cl.config.DownloadRateLimiter,
1420 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1421 for _, f := range cl.config.Callbacks.NewPeer {
1427 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1434 t.addPeers([]PeerInfo{{
1435 Addr: ipPortAddr{ip, port},
1436 Source: PeerSourceDhtAnnouncePeer,
1440 func firstNotNil(ips ...net.IP) net.IP {
1441 for _, ip := range ips {
1449 func (cl *Client) eachDialer(f func(Dialer) bool) {
1450 for _, s := range cl.dialers {
1457 func (cl *Client) eachListener(f func(Listener) bool) {
1458 for _, s := range cl.listeners {
1465 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1466 cl.eachListener(func(l Listener) bool {
1473 func (cl *Client) publicIp(peer net.IP) net.IP {
1474 // TODO: Use BEP 10 to determine how peers are seeing us.
1475 if peer.To4() != nil {
1477 cl.config.PublicIp4,
1478 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1483 cl.config.PublicIp6,
1484 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1488 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1489 l := cl.findListener(
1490 func(l Listener) bool {
1491 return f(addrIpOrNil(l.Addr()))
1497 return addrIpOrNil(l.Addr())
1500 // Our IP as a peer should see it.
1501 func (cl *Client) publicAddr(peer net.IP) IpPort {
1502 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1505 // ListenAddrs addresses currently being listened to.
1506 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1509 cl.eachListener(func(l Listener) bool {
1510 ret = append(ret, l.Addr())
1516 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1517 ipa, ok := tryIpPortFromNetAddr(addr)
1521 ip := maskIpForAcceptLimiting(ipa.IP)
1522 if cl.acceptLimiter == nil {
1523 cl.acceptLimiter = make(map[ipStr]int)
1525 cl.acceptLimiter[ipStr(ip.String())]++
1528 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1529 if ip4 := ip.To4(); ip4 != nil {
1530 return ip4.Mask(net.CIDRMask(24, 32))
1535 func (cl *Client) clearAcceptLimits() {
1536 cl.acceptLimiter = nil
1539 func (cl *Client) acceptLimitClearer() {
1542 case <-cl.closed.LockedChan(cl.locker()):
1544 case <-time.After(15 * time.Minute):
1546 cl.clearAcceptLimits()
1552 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1553 if cl.config.DisableAcceptRateLimiting {
1556 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1559 func (cl *Client) rLock() {
1563 func (cl *Client) rUnlock() {
1567 func (cl *Client) lock() {
1571 func (cl *Client) unlock() {
1575 func (cl *Client) locker() *lockWithDeferreds {
1579 func (cl *Client) String() string {
1580 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1583 // Returns connection-level aggregate stats at the Client level. See the comment on
1584 // TorrentStats.ConnStats.
1585 func (cl *Client) ConnStats() ConnStats {
1586 return cl.stats.Copy()