17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/perf"
21 "github.com/anacrolix/missinggo/pubsub"
22 "github.com/anacrolix/missinggo/slices"
23 "github.com/anacrolix/missinggo/v2"
24 "github.com/anacrolix/missinggo/v2/bitmap"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
30 "github.com/pion/datachannel"
31 "golang.org/x/time/rate"
32 "golang.org/x/xerrors"
34 "github.com/anacrolix/chansync"
36 "github.com/anacrolix/torrent/bencode"
37 "github.com/anacrolix/torrent/internal/limiter"
38 "github.com/anacrolix/torrent/iplist"
39 "github.com/anacrolix/torrent/metainfo"
40 "github.com/anacrolix/torrent/mse"
41 pp "github.com/anacrolix/torrent/peer_protocol"
42 "github.com/anacrolix/torrent/storage"
43 "github.com/anacrolix/torrent/tracker"
44 "github.com/anacrolix/torrent/webtorrent"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
56 closed chansync.SetOnce
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
80 websocketTrackers websocketTrackers
82 activeAnnounceLimiter limiter.Instance
84 updateRequests chansync.BroadcastCond
89 func (cl *Client) BadPeerIPs() []string {
92 return cl.badPeerIPsLocked()
95 func (cl *Client) badPeerIPsLocked() []string {
96 return slices.FromMapKeys(cl.badPeerIPs).([]string)
99 func (cl *Client) PeerID() PeerID {
103 // Returns the port number for the first listener that has one. No longer assumes that all port
104 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
106 func (cl *Client) LocalPort() (port int) {
107 cl.eachListener(func(l Listener) bool {
108 port = addrPortOrZero(l.Addr())
114 func writeDhtServerStatus(w io.Writer, s DhtServer) {
115 dhtStats := s.Stats()
116 fmt.Fprintf(w, " ID: %x\n", s.ID())
117 spew.Fdump(w, dhtStats)
120 // Writes out a human readable status of the client, such as for writing to a
122 func (cl *Client) WriteStatus(_w io.Writer) {
125 w := bufio.NewWriter(_w)
127 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
128 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
129 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
130 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
131 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
132 cl.eachDhtServer(func(s DhtServer) {
133 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
134 writeDhtServerStatus(w, s)
136 spew.Fdump(w, &cl.stats)
137 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
139 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
140 return l.InfoHash().AsString() < r.InfoHash().AsString()
143 fmt.Fprint(w, "<unknown name>")
145 fmt.Fprint(w, t.name())
151 "%f%% of %d bytes (%s)",
152 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
154 humanize.Bytes(uint64(*t.length)))
156 w.WriteString("<missing metainfo>")
164 // Filters things that are less than warning from UPnP discovery.
165 func upnpDiscoverLogFilter(m log.Msg) bool {
166 level, ok := m.GetLevel()
167 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
170 func (cl *Client) initLogger() {
171 logger := cl.config.Logger
174 if !cl.config.Debug {
175 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
178 cl.logger = logger.WithValues(cl)
181 func (cl *Client) announceKey() int32 {
182 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
185 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
187 cfg = NewDefaultClientConfig()
197 dopplegangerAddrs: make(map[string]struct{}),
198 torrents: make(map[metainfo.Hash]*Torrent),
199 dialRateLimiter: rate.NewLimiter(10, 10),
201 cl.activeAnnounceLimiter.SlotsPerKey = 2
202 go cl.acceptLimitClearer()
210 cl.event.L = cl.locker()
211 storageImpl := cfg.DefaultStorage
212 if storageImpl == nil {
213 // We'd use mmap by default but HFS+ doesn't support sparse files.
214 storageImplCloser := storage.NewFile(cfg.DataDir)
215 cl.onClose = append(cl.onClose, func() {
216 if err := storageImplCloser.Close(); err != nil {
217 cl.logger.Printf("error closing default storage: %s", err)
220 storageImpl = storageImplCloser
222 cl.defaultStorage = storage.NewClient(storageImpl)
223 if cfg.IPBlocklist != nil {
224 cl.ipBlockList = cfg.IPBlocklist
227 if cfg.PeerID != "" {
228 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
230 o := copy(cl.peerID[:], cfg.Bep20)
231 _, err = rand.Read(cl.peerID[o:])
233 panic("error generating peer id")
237 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
245 for _, _s := range sockets {
246 s := _s // Go is fucking retarded.
247 cl.onClose = append(cl.onClose, func() { s.Close() })
248 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
249 cl.dialers = append(cl.dialers, s)
250 cl.listeners = append(cl.listeners, s)
251 if cl.config.AcceptPeerConnections {
252 go cl.acceptConnections(s)
259 for _, s := range sockets {
260 if pc, ok := s.(net.PacketConn); ok {
261 ds, err := cl.NewAnacrolixDhtServer(pc)
265 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
266 cl.onClose = append(cl.onClose, func() { ds.Close() })
271 cl.websocketTrackers = websocketTrackers{
274 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
277 t, ok := cl.torrents[infoHash]
279 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
281 return t.announceRequest(event), nil
283 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
286 t, ok := cl.torrents[dcc.InfoHash]
288 cl.logger.WithDefaultLevel(log.Warning).Printf(
289 "got webrtc conn for unloaded torrent with infohash %x",
295 go t.onWebRtcConn(dc, dcc)
304 func (cl *Client) AddDhtServer(d DhtServer) {
305 cl.dhtServers = append(cl.dhtServers, d)
308 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
309 // given address for any Torrent.
310 func (cl *Client) AddDialer(d Dialer) {
313 cl.dialers = append(cl.dialers, d)
314 for _, t := range cl.torrents {
319 func (cl *Client) Listeners() []Listener {
323 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
325 func (cl *Client) AddListener(l Listener) {
326 cl.listeners = append(cl.listeners, l)
327 if cl.config.AcceptPeerConnections {
328 go cl.acceptConnections(l)
332 func (cl *Client) firewallCallback(net.Addr) bool {
334 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
337 torrent.Add("connections firewalled", 1)
339 torrent.Add("connections not firewalled", 1)
344 func (cl *Client) listenOnNetwork(n network) bool {
345 if n.Ipv4 && cl.config.DisableIPv4 {
348 if n.Ipv6 && cl.config.DisableIPv6 {
351 if n.Tcp && cl.config.DisableTCP {
354 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
360 func (cl *Client) listenNetworks() (ns []network) {
361 for _, n := range allPeerNetworks {
362 if cl.listenOnNetwork(n) {
369 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
370 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
371 cfg := dht.ServerConfig{
372 IPBlocklist: cl.ipBlockList,
374 OnAnnouncePeer: cl.onDHTAnnouncePeer,
375 PublicIP: func() net.IP {
376 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
377 return cl.config.PublicIp6
379 return cl.config.PublicIp4
381 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
382 OnQuery: cl.config.DHTOnQuery,
383 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
385 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
388 s, err = dht.NewServer(&cfg)
391 ts, err := s.Bootstrap()
393 cl.logger.Printf("error bootstrapping dht: %s", err)
395 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
401 func (cl *Client) Closed() chansync.Done {
402 return cl.closed.Done()
405 func (cl *Client) eachDhtServer(f func(DhtServer)) {
406 for _, ds := range cl.dhtServers {
411 // Stops the client. All connections to peers are closed and all activity will
413 func (cl *Client) Close() {
417 for _, t := range cl.torrents {
420 for i := range cl.onClose {
421 cl.onClose[len(cl.onClose)-1-i]()
426 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
427 if cl.ipBlockList == nil {
430 return cl.ipBlockList.Lookup(ip)
433 func (cl *Client) ipIsBlocked(ip net.IP) bool {
434 _, blocked := cl.ipBlockRange(ip)
438 func (cl *Client) wantConns() bool {
439 for _, t := range cl.torrents {
447 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
448 func (cl *Client) rejectAccepted(conn net.Conn) error {
450 return errors.New("don't want conns right now")
452 ra := conn.RemoteAddr()
453 if rip := addrIpOrNil(ra); rip != nil {
454 if cl.config.DisableIPv4Peers && rip.To4() != nil {
455 return errors.New("ipv4 peers disabled")
457 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
458 return errors.New("ipv4 disabled")
461 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
462 return errors.New("ipv6 disabled")
464 if cl.rateLimitAccept(rip) {
465 return errors.New("source IP accepted rate limited")
467 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
468 return errors.New("bad source addr")
474 func (cl *Client) acceptConnections(l Listener) {
476 conn, err := l.Accept()
477 torrent.Add("client listener accepts", 1)
478 conn = pproffd.WrapNetConn(conn)
480 closed := cl.closed.IsSet()
483 reject = cl.rejectAccepted(conn)
493 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
498 torrent.Add("rejected accepted connections", 1)
499 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
502 go cl.incomingConnection(conn)
504 log.Fmsg("accepted %q connection at %q from %q",
508 ).SetLevel(log.Debug).Log(cl.logger)
509 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
510 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
511 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
516 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
517 func regularNetConnPeerConnConnString(nc net.Conn) string {
518 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
521 func (cl *Client) incomingConnection(nc net.Conn) {
523 if tc, ok := nc.(*net.TCPConn); ok {
526 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
527 regularNetConnPeerConnConnString(nc))
533 c.Discovery = PeerSourceIncoming
534 cl.runReceivedConn(c)
537 // Returns a handle to the given torrent, if it's present in the client.
538 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
541 t, ok = cl.torrents[ih]
545 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
546 return cl.torrents[ih]
549 type DialResult struct {
554 func countDialResult(err error) {
556 torrent.Add("successful dials", 1)
558 torrent.Add("unsuccessful dials", 1)
562 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
563 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
564 if ret < minDialTimeout {
570 // Returns whether an address is known to connect to a client with our own ID.
571 func (cl *Client) dopplegangerAddr(addr string) bool {
572 _, ok := cl.dopplegangerAddrs[addr]
576 // Returns a connection over UTP or TCP, whichever is first to connect.
577 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
578 return DialFirst(ctx, addr, cl.dialers)
581 // Returns a connection over UTP or TCP, whichever is first to connect.
582 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
584 t := perf.NewTimer(perf.CallerName(0))
587 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
589 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
593 ctx, cancel := context.WithCancel(ctx)
594 // As soon as we return one connection, cancel the others.
597 resCh := make(chan DialResult, left)
598 for _, _s := range dialers {
603 dialFromSocket(ctx, s, addr),
608 // Wait for a successful connection.
610 defer perf.ScopeTimer()()
611 for ; left > 0 && res.Conn == nil; left-- {
615 // There are still incompleted dials.
617 for ; left > 0; left-- {
618 conn := (<-resCh).Conn
625 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
630 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
631 c, err := s.Dial(ctx, addr)
632 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
633 // it now in case we close the connection forthwith.
634 if tc, ok := c.(*net.TCPConn); ok {
641 func forgettableDialError(err error) bool {
642 return strings.Contains(err.Error(), "no suitable address found")
645 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
646 if _, ok := t.halfOpen[addr]; !ok {
647 panic("invariant broken")
649 delete(t.halfOpen, addr)
651 for _, t := range cl.torrents {
656 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
657 // for valid reasons.
658 func (cl *Client) initiateProtocolHandshakes(
662 outgoing, encryptHeader bool,
663 remoteAddr PeerRemoteAddr,
664 network, connString string,
666 c *PeerConn, err error,
668 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
669 c.headerEncrypted = encryptHeader
670 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
672 dl, ok := ctx.Deadline()
676 err = nc.SetDeadline(dl)
680 err = cl.initiateHandshakes(c, t)
684 // Returns nil connection and nil error if no connection could be established for valid reasons.
685 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
686 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
689 return t.dialTimeout()
692 dr := cl.dialFirst(dialCtx, addr.String())
695 if dialCtx.Err() != nil {
696 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
698 return nil, errors.New("dial failed")
700 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
707 // Returns nil connection and nil error if no connection could be established
708 // for valid reasons.
709 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
710 torrent.Add("establish outgoing connection", 1)
711 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
712 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
714 torrent.Add("initiated conn with preferred header obfuscation", 1)
717 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
718 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
719 // We should have just tried with the preferred header obfuscation. If it was required,
720 // there's nothing else to try.
723 // Try again with encryption if we didn't earlier, or without if we did.
724 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
726 torrent.Add("initiated conn with fallback header obfuscation", 1)
728 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
732 // Called to dial out and run a connection. The addr we're given is already
733 // considered half-open.
734 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
735 cl.dialRateLimiter.Wait(context.Background())
736 c, err := cl.establishOutgoingConn(t, addr)
739 // Don't release lock between here and addPeerConn, unless it's for
741 cl.noLongerHalfOpen(t, addr.String())
744 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
751 t.runHandshookConnLoggingErr(c)
754 // The port number for incoming peer connections. 0 if the client isn't listening.
755 func (cl *Client) incomingPeerPort() int {
756 return cl.LocalPort()
759 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
760 if c.headerEncrypted {
763 rw, c.cryptoMethod, err = mse.InitiateHandshake(
770 cl.config.CryptoProvides,
774 return fmt.Errorf("header obfuscation handshake: %w", err)
777 ih, err := cl.connBtHandshake(c, &t.infoHash)
779 return fmt.Errorf("bittorrent protocol handshake: %w", err)
781 if ih != t.infoHash {
782 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
787 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
788 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
789 func (cl *Client) forSkeys(f func([]byte) bool) {
792 if false { // Emulate the bug from #114
794 for ih := range cl.torrents {
798 for range cl.torrents {
805 for ih := range cl.torrents {
812 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
813 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
819 // Do encryption and bittorrent handshakes as receiver.
820 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
821 defer perf.ScopeTimerErr(&err)()
823 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
825 if err == nil || err == mse.ErrNoSecretKeyMatch {
826 if c.headerEncrypted {
827 torrent.Add("handshakes received encrypted", 1)
829 torrent.Add("handshakes received unencrypted", 1)
832 torrent.Add("handshakes received with error while handling encryption", 1)
835 if err == mse.ErrNoSecretKeyMatch {
840 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
841 err = errors.New("connection does not have required header obfuscation")
844 ih, err := cl.connBtHandshake(c, nil)
846 err = xerrors.Errorf("during bt handshake: %w", err)
855 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
856 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
861 c.PeerExtensionBytes = res.PeerExtensionBits
862 c.PeerID = res.PeerID
863 c.completedHandshake = time.Now()
864 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
870 func (cl *Client) runReceivedConn(c *PeerConn) {
871 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
875 t, err := cl.receiveHandshakes(c)
878 "error receiving handshakes on %v: %s", c, err,
879 ).SetLevel(log.Debug).
881 "network", c.Network,
883 torrent.Add("error receiving handshake", 1)
885 cl.onBadAccept(c.RemoteAddr)
890 torrent.Add("received handshake for unloaded torrent", 1)
891 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
893 cl.onBadAccept(c.RemoteAddr)
897 torrent.Add("received handshake for loaded torrent", 1)
900 t.runHandshookConnLoggingErr(c)
903 // Client lock must be held before entering this.
904 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
906 if c.PeerID == cl.peerID {
909 addr := c.conn.RemoteAddr().String()
910 cl.dopplegangerAddrs[addr] = struct{}{}
912 // Because the remote address is not necessarily the same as its client's torrent listen
913 // address, we won't record the remote address as a doppleganger. Instead, the initiator
914 // can record *us* as the doppleganger.
916 return errors.New("local and remote peer ids are the same")
918 c.conn.SetWriteDeadline(time.Time{})
919 c.r = deadlineReader{c.conn, c.r}
920 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
921 if connIsIpv6(c.conn) {
922 torrent.Add("completed handshake over ipv6", 1)
924 if err := t.addPeerConn(c); err != nil {
925 return fmt.Errorf("adding connection: %w", err)
927 defer t.dropConnection(c)
929 cl.sendInitialMessages(c, t)
930 err := c.mainReadLoop()
932 return fmt.Errorf("main read loop: %w", err)
937 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
938 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
939 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
940 const localClientReqq = 1 << 5
942 // See the order given in Transmission's tr_peerMsgsNew.
943 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
944 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
945 conn.write(pp.Message{
947 ExtendedID: pp.HandshakeExtendedID,
948 ExtendedPayload: func() []byte {
949 msg := pp.ExtendedHandshakeMessage{
950 M: map[pp.ExtensionName]pp.ExtensionNumber{
951 pp.ExtensionNameMetadata: metadataExtendedId,
953 V: cl.config.ExtendedHandshakeClientVersion,
954 Reqq: localClientReqq,
955 YourIp: pp.CompactIp(conn.remoteIp()),
956 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
957 Port: cl.incomingPeerPort(),
958 MetadataSize: torrent.metadataSize(),
959 // TODO: We can figured these out specific to the socket
961 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
962 Ipv6: cl.config.PublicIp6.To16(),
964 if !cl.config.DisablePEX {
965 msg.M[pp.ExtensionNamePex] = pexExtendedId
967 return bencode.MustMarshal(msg)
972 if conn.fastEnabled() {
973 if torrent.haveAllPieces() {
974 conn.write(pp.Message{Type: pp.HaveAll})
975 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
977 } else if !torrent.haveAnyPieces() {
978 conn.write(pp.Message{Type: pp.HaveNone})
979 conn.sentHaves.Clear()
985 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
986 conn.write(pp.Message{
993 func (cl *Client) dhtPort() (ret uint16) {
994 if len(cl.dhtServers) == 0 {
997 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1000 func (cl *Client) haveDhtServer() bool {
1001 return len(cl.dhtServers) > 0
1004 // Process incoming ut_metadata message.
1005 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1006 var d pp.ExtendedMetadataRequestMsg
1007 err := bencode.Unmarshal(payload, &d)
1008 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1009 } else if err != nil {
1010 return fmt.Errorf("error unmarshalling bencode: %s", err)
1014 case pp.DataMetadataExtensionMsgType:
1015 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1016 if !c.requestedMetadataPiece(piece) {
1017 return fmt.Errorf("got unexpected piece %d", piece)
1019 c.metadataRequests[piece] = false
1020 begin := len(payload) - d.PieceSize()
1021 if begin < 0 || begin >= len(payload) {
1022 return fmt.Errorf("data has bad offset in payload: %d", begin)
1024 t.saveMetadataPiece(piece, payload[begin:])
1025 c.lastUsefulChunkReceived = time.Now()
1026 err = t.maybeCompleteMetadata()
1028 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1029 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1030 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1031 // log consumers can filter for this message.
1032 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1035 case pp.RequestMetadataExtensionMsgType:
1036 if !t.haveMetadataPiece(piece) {
1037 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1040 start := (1 << 14) * piece
1041 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1042 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1044 case pp.RejectMetadataExtensionMsgType:
1047 return errors.New("unknown msg_type value")
1051 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1052 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1053 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1058 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1062 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1065 if _, ok := cl.ipBlockRange(ip); ok {
1068 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1074 // Return a Torrent ready for insertion into a Client.
1075 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1076 // use provided storage, if provided
1077 storageClient := cl.defaultStorage
1078 if specStorage != nil {
1079 storageClient = storage.NewClient(specStorage)
1085 peers: prioritizedPeers{
1087 getPrio: func(p PeerInfo) peerPriority {
1089 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1092 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1094 halfOpen: make(map[string]PeerInfo),
1095 pieceStateChanges: pubsub.NewPubSub(),
1097 storageOpener: storageClient,
1098 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1100 metadataChanged: sync.Cond{
1103 webSeeds: make(map[string]*Peer),
1105 t.networkingEnabled.Set()
1106 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1107 t.logger = cl.logger.WithContextValue(t)
1108 t.setChunkSize(defaultChunkSize)
1112 // A file-like handle to some torrent data resource.
1113 type Handle interface {
1120 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1121 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1124 // Adds a torrent by InfoHash with a custom Storage implementation.
1125 // If the torrent already exists then this Storage is ignored and the
1126 // existing torrent returned with `new` set to `false`
1127 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1130 t, ok := cl.torrents[infoHash]
1136 t = cl.newTorrent(infoHash, specStorage)
1137 cl.eachDhtServer(func(s DhtServer) {
1138 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1139 go t.dhtAnnouncer(s)
1142 cl.torrents[infoHash] = t
1143 cl.clearAcceptLimits()
1144 t.updateWantPeersEvent()
1145 // Tickle Client.waitAccept, new torrent may want conns.
1146 cl.event.Broadcast()
1150 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1151 // Torrent.MergeSpec.
1152 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1153 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1154 err = t.MergeSpec(spec)
1155 if err != nil && new {
1161 type stringAddr string
1163 var _ net.Addr = stringAddr("")
1165 func (stringAddr) Network() string { return "" }
1166 func (me stringAddr) String() string { return string(me) }
1168 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1169 // spec.DisallowDataDownload/Upload will be read and applied
1170 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1171 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1172 if spec.DisplayName != "" {
1173 t.SetDisplayName(spec.DisplayName)
1175 if spec.InfoBytes != nil {
1176 err := t.SetInfoBytes(spec.InfoBytes)
1182 cl.AddDhtNodes(spec.DhtNodes)
1185 useTorrentSources(spec.Sources, t)
1186 for _, url := range spec.Webseeds {
1189 for _, peerAddr := range spec.PeerAddrs {
1191 Addr: stringAddr(peerAddr),
1192 Source: PeerSourceDirect,
1196 if spec.ChunkSize != 0 {
1197 t.setChunkSize(pp.Integer(spec.ChunkSize))
1199 t.addTrackers(spec.Trackers)
1201 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1202 t.dataUploadDisallowed = spec.DisallowDataUpload
1206 func useTorrentSources(sources []string, t *Torrent) {
1207 for _, s := range sources {
1209 err := useTorrentSource(s, t)
1211 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1213 t.logger.Printf("successfully used source %q", s)
1219 func useTorrentSource(source string, t *Torrent) error {
1220 req, err := http.NewRequest(http.MethodGet, source, nil)
1224 ctx, cancel := context.WithCancel(context.Background())
1234 req = req.WithContext(ctx)
1235 resp, err := http.DefaultClient.Do(req)
1239 mi, err := metainfo.Load(resp.Body)
1241 if ctx.Err() != nil {
1246 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1249 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1250 t, ok := cl.torrents[infoHash]
1252 err = fmt.Errorf("no such torrent")
1259 delete(cl.torrents, infoHash)
1263 func (cl *Client) allTorrentsCompleted() bool {
1264 for _, t := range cl.torrents {
1268 if !t.haveAllPieces() {
1275 // Returns true when all torrents are completely downloaded and false if the
1276 // client is stopped before that.
1277 func (cl *Client) WaitAll() bool {
1280 for !cl.allTorrentsCompleted() {
1281 if cl.closed.IsSet() {
1289 // Returns handles to all the torrents loaded in the Client.
1290 func (cl *Client) Torrents() []*Torrent {
1293 return cl.torrentsAsSlice()
1296 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1297 for _, t := range cl.torrents {
1298 ret = append(ret, t)
1303 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1304 spec, err := TorrentSpecFromMagnetUri(uri)
1308 T, _, err = cl.AddTorrentSpec(spec)
1312 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1313 ts, err := TorrentSpecFromMetaInfoErr(mi)
1317 T, _, err = cl.AddTorrentSpec(ts)
1321 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1322 mi, err := metainfo.LoadFromFile(filename)
1326 return cl.AddTorrent(mi)
1329 func (cl *Client) DhtServers() []DhtServer {
1330 return cl.dhtServers
1333 func (cl *Client) AddDhtNodes(nodes []string) {
1334 for _, n := range nodes {
1335 hmp := missinggo.SplitHostMaybePort(n)
1336 ip := net.ParseIP(hmp.Host)
1338 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1341 ni := krpc.NodeInfo{
1342 Addr: krpc.NodeAddr{
1347 cl.eachDhtServer(func(s DhtServer) {
1353 func (cl *Client) banPeerIP(ip net.IP) {
1354 cl.logger.Printf("banning ip %v", ip)
1355 if cl.badPeerIPs == nil {
1356 cl.badPeerIPs = make(map[string]struct{})
1358 cl.badPeerIPs[ip.String()] = struct{}{}
1361 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1370 PeerMaxRequests: 250,
1372 RemoteAddr: remoteAddr,
1374 callbacks: &cl.config.Callbacks,
1376 connString: connString,
1380 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1381 c.setRW(connStatsReadWriter{nc, c})
1382 c.r = &rateLimitedReader{
1383 l: cl.config.DownloadRateLimiter,
1386 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1387 for _, f := range cl.config.Callbacks.NewPeer {
1393 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1400 t.addPeers([]PeerInfo{{
1401 Addr: ipPortAddr{ip, port},
1402 Source: PeerSourceDhtAnnouncePeer,
1406 func firstNotNil(ips ...net.IP) net.IP {
1407 for _, ip := range ips {
1415 func (cl *Client) eachListener(f func(Listener) bool) {
1416 for _, s := range cl.listeners {
1423 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1424 cl.eachListener(func(l Listener) bool {
1431 func (cl *Client) publicIp(peer net.IP) net.IP {
1432 // TODO: Use BEP 10 to determine how peers are seeing us.
1433 if peer.To4() != nil {
1435 cl.config.PublicIp4,
1436 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1441 cl.config.PublicIp6,
1442 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1446 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1447 l := cl.findListener(
1448 func(l Listener) bool {
1449 return f(addrIpOrNil(l.Addr()))
1455 return addrIpOrNil(l.Addr())
1458 // Our IP as a peer should see it.
1459 func (cl *Client) publicAddr(peer net.IP) IpPort {
1460 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1463 // ListenAddrs addresses currently being listened to.
1464 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1467 cl.eachListener(func(l Listener) bool {
1468 ret = append(ret, l.Addr())
1474 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1475 ipa, ok := tryIpPortFromNetAddr(addr)
1479 ip := maskIpForAcceptLimiting(ipa.IP)
1480 if cl.acceptLimiter == nil {
1481 cl.acceptLimiter = make(map[ipStr]int)
1483 cl.acceptLimiter[ipStr(ip.String())]++
1486 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1487 if ip4 := ip.To4(); ip4 != nil {
1488 return ip4.Mask(net.CIDRMask(24, 32))
1493 func (cl *Client) clearAcceptLimits() {
1494 cl.acceptLimiter = nil
1497 func (cl *Client) acceptLimitClearer() {
1500 case <-cl.closed.Done():
1502 case <-time.After(15 * time.Minute):
1504 cl.clearAcceptLimits()
1510 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1511 if cl.config.DisableAcceptRateLimiting {
1514 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1517 func (cl *Client) rLock() {
1521 func (cl *Client) rUnlock() {
1525 func (cl *Client) lock() {
1529 func (cl *Client) unlock() {
1533 func (cl *Client) locker() *lockWithDeferreds {
1537 func (cl *Client) String() string {
1538 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1541 // Returns connection-level aggregate stats at the Client level. See the comment on
1542 // TorrentStats.ConnStats.
1543 func (cl *Client) ConnStats() ConnStats {
1544 return cl.stats.Copy()