18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/torrent/tracker"
28 "github.com/anacrolix/torrent/webtorrent"
29 "github.com/davecgh/go-spew/spew"
30 "github.com/dustin/go-humanize"
31 "github.com/google/btree"
32 "github.com/pion/datachannel"
33 "golang.org/x/time/rate"
34 "golang.org/x/xerrors"
36 "github.com/anacrolix/missinggo/v2"
37 "github.com/anacrolix/missinggo/v2/conntrack"
39 "github.com/anacrolix/torrent/bencode"
40 "github.com/anacrolix/torrent/iplist"
41 "github.com/anacrolix/torrent/metainfo"
42 "github.com/anacrolix/torrent/mse"
43 pp "github.com/anacrolix/torrent/peer_protocol"
44 "github.com/anacrolix/torrent/storage"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
56 closed missinggo.Event
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
80 websocketTrackers websocketTrackers
82 activeAnnouncesMu sync.Mutex
83 // Limits concurrent use of a trackers by URL. Push into the channel to use a slot, and receive
85 activeAnnounces map[string]*activeAnnouncesValueType
88 type activeAnnouncesValueType struct {
93 type activeAnnouncesValueRef struct {
94 r *activeAnnouncesValueType
99 func (me activeAnnouncesValueRef) C() chan struct{} {
103 func (me activeAnnouncesValueRef) Drop() {
104 me.cl.activeAnnouncesMu.Lock()
105 defer me.cl.activeAnnouncesMu.Unlock()
108 delete(me.cl.activeAnnounces, me.url)
112 func (cl *Client) getAnnounceRef(url string) activeAnnouncesValueRef {
113 cl.activeAnnouncesMu.Lock()
114 defer cl.activeAnnouncesMu.Unlock()
115 if cl.activeAnnounces == nil {
116 cl.activeAnnounces = make(map[string]*activeAnnouncesValueType)
118 v, ok := cl.activeAnnounces[url]
120 v = &activeAnnouncesValueType{
121 ch: make(chan struct{}, 2),
123 cl.activeAnnounces[url] = v
126 return activeAnnouncesValueRef{
135 func (cl *Client) BadPeerIPs() []string {
138 return cl.badPeerIPsLocked()
141 func (cl *Client) badPeerIPsLocked() []string {
142 return slices.FromMapKeys(cl.badPeerIPs).([]string)
145 func (cl *Client) PeerID() PeerID {
149 // Returns the port number for the first listener that has one. No longer assumes that all port
150 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
152 func (cl *Client) LocalPort() (port int) {
153 cl.eachListener(func(l Listener) bool {
154 port = addrPortOrZero(l.Addr())
160 func writeDhtServerStatus(w io.Writer, s DhtServer) {
161 dhtStats := s.Stats()
162 fmt.Fprintf(w, " ID: %x\n", s.ID())
163 spew.Fdump(w, dhtStats)
166 // Writes out a human readable status of the client, such as for writing to a
168 func (cl *Client) WriteStatus(_w io.Writer) {
171 w := bufio.NewWriter(_w)
173 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
174 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
175 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
176 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
177 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
178 cl.eachDhtServer(func(s DhtServer) {
179 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
180 writeDhtServerStatus(w, s)
182 spew.Fdump(w, &cl.stats)
183 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
185 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
186 return l.InfoHash().AsString() < r.InfoHash().AsString()
189 fmt.Fprint(w, "<unknown name>")
191 fmt.Fprint(w, t.name())
197 "%f%% of %d bytes (%s)",
198 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
200 humanize.Bytes(uint64(*t.length)))
202 w.WriteString("<missing metainfo>")
210 func (cl *Client) initLogger() {
211 cl.logger = cl.config.Logger.WithValues(cl)
212 if !cl.config.Debug {
213 cl.logger = cl.logger.FilterLevel(log.Info)
217 func (cl *Client) announceKey() int32 {
218 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
221 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
223 cfg = NewDefaultClientConfig()
233 dopplegangerAddrs: make(map[string]struct{}),
234 torrents: make(map[metainfo.Hash]*Torrent),
235 dialRateLimiter: rate.NewLimiter(10, 10),
237 go cl.acceptLimitClearer()
245 cl.event.L = cl.locker()
246 storageImpl := cfg.DefaultStorage
247 if storageImpl == nil {
248 // We'd use mmap by default but HFS+ doesn't support sparse files.
249 storageImplCloser := storage.NewFile(cfg.DataDir)
250 cl.onClose = append(cl.onClose, func() {
251 if err := storageImplCloser.Close(); err != nil {
252 cl.logger.Printf("error closing default storage: %s", err)
255 storageImpl = storageImplCloser
257 cl.defaultStorage = storage.NewClient(storageImpl)
258 if cfg.IPBlocklist != nil {
259 cl.ipBlockList = cfg.IPBlocklist
262 if cfg.PeerID != "" {
263 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
265 o := copy(cl.peerID[:], cfg.Bep20)
266 _, err = rand.Read(cl.peerID[o:])
268 panic("error generating peer id")
272 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
280 for _, _s := range sockets {
281 s := _s // Go is fucking retarded.
282 cl.onClose = append(cl.onClose, func() { s.Close() })
283 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
284 cl.dialers = append(cl.dialers, s)
285 cl.listeners = append(cl.listeners, s)
286 go cl.acceptConnections(s)
292 for _, s := range sockets {
293 if pc, ok := s.(net.PacketConn); ok {
294 ds, err := cl.newAnacrolixDhtServer(pc)
298 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
299 cl.onClose = append(cl.onClose, func() { ds.Close() })
304 cl.websocketTrackers = websocketTrackers{
307 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
310 t, ok := cl.torrents[infoHash]
312 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
314 return t.announceRequest(event), nil
316 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
319 t, ok := cl.torrents[dcc.InfoHash]
321 cl.logger.WithDefaultLevel(log.Warning).Printf(
322 "got webrtc conn for unloaded torrent with infohash %x",
328 go t.onWebRtcConn(dc, dcc)
335 func (cl *Client) AddDhtServer(d DhtServer) {
336 cl.dhtServers = append(cl.dhtServers, d)
339 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
340 // given address for any Torrent.
341 func (cl *Client) AddDialer(d Dialer) {
344 cl.dialers = append(cl.dialers, d)
345 for _, t := range cl.torrents {
350 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
352 func (cl *Client) AddListener(l Listener) {
353 cl.listeners = append(cl.listeners, l)
354 go cl.acceptConnections(l)
357 func (cl *Client) firewallCallback(net.Addr) bool {
359 block := !cl.wantConns()
362 torrent.Add("connections firewalled", 1)
364 torrent.Add("connections not firewalled", 1)
369 func (cl *Client) listenOnNetwork(n network) bool {
370 if n.Ipv4 && cl.config.DisableIPv4 {
373 if n.Ipv6 && cl.config.DisableIPv6 {
376 if n.Tcp && cl.config.DisableTCP {
379 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
385 func (cl *Client) listenNetworks() (ns []network) {
386 for _, n := range allPeerNetworks {
387 if cl.listenOnNetwork(n) {
394 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
395 cfg := dht.ServerConfig{
396 IPBlocklist: cl.ipBlockList,
398 OnAnnouncePeer: cl.onDHTAnnouncePeer,
399 PublicIP: func() net.IP {
400 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
401 return cl.config.PublicIp6
403 return cl.config.PublicIp4
405 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
406 ConnectionTracking: cl.config.ConnTracker,
407 OnQuery: cl.config.DHTOnQuery,
408 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
410 s, err = dht.NewServer(&cfg)
413 ts, err := s.Bootstrap()
415 cl.logger.Printf("error bootstrapping dht: %s", err)
417 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
423 func (cl *Client) Closed() <-chan struct{} {
429 func (cl *Client) eachDhtServer(f func(DhtServer)) {
430 for _, ds := range cl.dhtServers {
435 // Stops the client. All connections to peers are closed and all activity will
437 func (cl *Client) Close() {
441 for _, t := range cl.torrents {
444 for i := range cl.onClose {
445 cl.onClose[len(cl.onClose)-1-i]()
450 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
451 if cl.ipBlockList == nil {
454 return cl.ipBlockList.Lookup(ip)
457 func (cl *Client) ipIsBlocked(ip net.IP) bool {
458 _, blocked := cl.ipBlockRange(ip)
462 func (cl *Client) wantConns() bool {
463 for _, t := range cl.torrents {
471 func (cl *Client) waitAccept() {
473 if cl.closed.IsSet() {
483 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
484 func (cl *Client) rejectAccepted(conn net.Conn) error {
485 ra := conn.RemoteAddr()
486 if rip := addrIpOrNil(ra); rip != nil {
487 if cl.config.DisableIPv4Peers && rip.To4() != nil {
488 return errors.New("ipv4 peers disabled")
490 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
491 return errors.New("ipv4 disabled")
494 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
495 return errors.New("ipv6 disabled")
497 if cl.rateLimitAccept(rip) {
498 return errors.New("source IP accepted rate limited")
500 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
501 return errors.New("bad source addr")
507 func (cl *Client) acceptConnections(l Listener) {
509 conn, err := l.Accept()
510 torrent.Add("client listener accepts", 1)
511 conn = pproffd.WrapNetConn(conn)
513 closed := cl.closed.IsSet()
516 reject = cl.rejectAccepted(conn)
526 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
531 torrent.Add("rejected accepted connections", 1)
532 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
535 go cl.incomingConnection(conn)
537 log.Fmsg("accepted %q connection at %q from %q",
541 ).SetLevel(log.Debug).Log(cl.logger)
542 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
543 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
544 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
549 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
550 func regularNetConnPeerConnConnString(nc net.Conn) string {
551 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
554 func (cl *Client) incomingConnection(nc net.Conn) {
556 if tc, ok := nc.(*net.TCPConn); ok {
559 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
560 regularNetConnPeerConnConnString(nc))
562 c.Discovery = PeerSourceIncoming
563 cl.runReceivedConn(c)
566 // Returns a handle to the given torrent, if it's present in the client.
567 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
570 t, ok = cl.torrents[ih]
574 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
575 return cl.torrents[ih]
578 type dialResult struct {
583 func countDialResult(err error) {
585 torrent.Add("successful dials", 1)
587 torrent.Add("unsuccessful dials", 1)
591 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
592 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
593 if ret < minDialTimeout {
599 // Returns whether an address is known to connect to a client with our own ID.
600 func (cl *Client) dopplegangerAddr(addr string) bool {
601 _, ok := cl.dopplegangerAddrs[addr]
605 // Returns a connection over UTP or TCP, whichever is first to connect.
606 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
608 t := perf.NewTimer(perf.CallerName(0))
611 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
613 t.Mark("returned conn over " + res.Network)
617 ctx, cancel := context.WithCancel(ctx)
618 // As soon as we return one connection, cancel the others.
621 resCh := make(chan dialResult, left)
625 cl.eachDialer(func(s Dialer) bool {
628 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
631 cl.dialFromSocket(ctx, s, addr),
632 s.LocalAddr().Network(),
639 // Wait for a successful connection.
641 defer perf.ScopeTimer()()
642 for ; left > 0 && res.Conn == nil; left-- {
646 // There are still incompleted dials.
648 for ; left > 0; left-- {
649 conn := (<-resCh).Conn
656 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
658 //if res.Conn != nil {
659 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
661 // cl.logger.Printf("failed to dial %s", addr)
666 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
667 network := s.LocalAddr().Network()
668 cte := cl.config.ConnTracker.Wait(
670 conntrack.Entry{network, s.LocalAddr().String(), addr},
671 "dial torrent client",
674 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
675 // which dial errors allow us to forget the connection tracking entry handle.
676 if ctx.Err() != nil {
682 c, err := s.Dial(ctx, addr)
683 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
684 // it now in case we close the connection forthwith.
685 if tc, ok := c.(*net.TCPConn); ok {
690 if err != nil && forgettableDialError(err) {
697 return closeWrapper{c, func() error {
704 func forgettableDialError(err error) bool {
705 return strings.Contains(err.Error(), "no suitable address found")
708 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
709 if _, ok := t.halfOpen[addr]; !ok {
710 panic("invariant broken")
712 delete(t.halfOpen, addr)
714 for _, t := range cl.torrents {
719 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
720 // for valid reasons.
721 func (cl *Client) initiateProtocolHandshakes(
725 outgoing, encryptHeader bool,
727 network, connString string,
729 c *PeerConn, err error,
731 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
732 c.headerEncrypted = encryptHeader
733 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
735 dl, ok := ctx.Deadline()
739 err = nc.SetDeadline(dl)
743 err = cl.initiateHandshakes(c, t)
747 // Returns nil connection and nil error if no connection could be established for valid reasons.
748 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
749 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
752 return t.dialTimeout()
755 dr := cl.dialFirst(dialCtx, addr.String())
758 if dialCtx.Err() != nil {
759 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
761 return nil, errors.New("dial failed")
763 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
770 // Returns nil connection and nil error if no connection could be established
771 // for valid reasons.
772 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
773 torrent.Add("establish outgoing connection", 1)
774 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
775 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
777 torrent.Add("initiated conn with preferred header obfuscation", 1)
780 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
781 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
782 // We should have just tried with the preferred header obfuscation. If it was required,
783 // there's nothing else to try.
786 // Try again with encryption if we didn't earlier, or without if we did.
787 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
789 torrent.Add("initiated conn with fallback header obfuscation", 1)
791 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
795 // Called to dial out and run a connection. The addr we're given is already
796 // considered half-open.
797 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
798 cl.dialRateLimiter.Wait(context.Background())
799 c, err := cl.establishOutgoingConn(t, addr)
802 // Don't release lock between here and addConnection, unless it's for
804 cl.noLongerHalfOpen(t, addr.String())
807 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
814 t.runHandshookConnLoggingErr(c)
817 // The port number for incoming peer connections. 0 if the client isn't listening.
818 func (cl *Client) incomingPeerPort() int {
819 return cl.LocalPort()
822 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
823 if c.headerEncrypted {
826 rw, c.cryptoMethod, err = mse.InitiateHandshake(
833 cl.config.CryptoProvides,
837 return xerrors.Errorf("header obfuscation handshake: %w", err)
840 ih, err := cl.connBtHandshake(c, &t.infoHash)
842 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
844 if ih != t.infoHash {
845 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
850 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
851 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
852 func (cl *Client) forSkeys(f func([]byte) bool) {
855 if false { // Emulate the bug from #114
857 for ih := range cl.torrents {
861 for range cl.torrents {
868 for ih := range cl.torrents {
875 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
876 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
882 // Do encryption and bittorrent handshakes as receiver.
883 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
884 defer perf.ScopeTimerErr(&err)()
886 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
888 if err == nil || err == mse.ErrNoSecretKeyMatch {
889 if c.headerEncrypted {
890 torrent.Add("handshakes received encrypted", 1)
892 torrent.Add("handshakes received unencrypted", 1)
895 torrent.Add("handshakes received with error while handling encryption", 1)
898 if err == mse.ErrNoSecretKeyMatch {
903 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
904 err = errors.New("connection does not have required header obfuscation")
907 ih, err := cl.connBtHandshake(c, nil)
909 err = xerrors.Errorf("during bt handshake: %w", err)
918 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
919 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
924 c.PeerExtensionBytes = res.PeerExtensionBits
925 c.PeerID = res.PeerID
926 c.completedHandshake = time.Now()
927 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
933 func (cl *Client) runReceivedConn(c *PeerConn) {
934 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
938 t, err := cl.receiveHandshakes(c)
941 "error receiving handshakes on %v: %s", c, err,
942 ).SetLevel(log.Debug).
944 "network", c.network,
946 torrent.Add("error receiving handshake", 1)
948 cl.onBadAccept(c.RemoteAddr)
953 torrent.Add("received handshake for unloaded torrent", 1)
954 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
956 cl.onBadAccept(c.RemoteAddr)
960 torrent.Add("received handshake for loaded torrent", 1)
963 t.runHandshookConnLoggingErr(c)
966 // Client lock must be held before entering this.
967 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
969 if c.PeerID == cl.peerID {
972 addr := c.conn.RemoteAddr().String()
973 cl.dopplegangerAddrs[addr] = struct{}{}
975 // Because the remote address is not necessarily the same as its client's torrent listen
976 // address, we won't record the remote address as a doppleganger. Instead, the initiator
977 // can record *us* as the doppleganger.
979 return errors.New("local and remote peer ids are the same")
981 c.conn.SetWriteDeadline(time.Time{})
982 c.r = deadlineReader{c.conn, c.r}
983 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
984 if connIsIpv6(c.conn) {
985 torrent.Add("completed handshake over ipv6", 1)
987 if err := t.addConnection(c); err != nil {
988 return fmt.Errorf("adding connection: %w", err)
990 defer t.dropConnection(c)
991 go c.writer(time.Minute)
992 cl.sendInitialMessages(c, t)
993 err := c.mainReadLoop()
995 return fmt.Errorf("main read loop: %w", err)
1000 // See the order given in Transmission's tr_peerMsgsNew.
1001 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1002 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1003 conn.post(pp.Message{
1005 ExtendedID: pp.HandshakeExtendedID,
1006 ExtendedPayload: func() []byte {
1007 msg := pp.ExtendedHandshakeMessage{
1008 M: map[pp.ExtensionName]pp.ExtensionNumber{
1009 pp.ExtensionNameMetadata: metadataExtendedId,
1011 V: cl.config.ExtendedHandshakeClientVersion,
1012 // If peer requests are buffered on read, this instructs the amount of memory
1013 // that might be used to cache pending writes. Assuming 512KiB cached for
1014 // sending, for 16KiB chunks.
1016 YourIp: pp.CompactIp(addrIpOrNil(conn.RemoteAddr)),
1017 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1018 Port: cl.incomingPeerPort(),
1019 MetadataSize: torrent.metadataSize(),
1020 // TODO: We can figured these out specific to the socket
1022 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1023 Ipv6: cl.config.PublicIp6.To16(),
1025 if !cl.config.DisablePEX {
1026 msg.M[pp.ExtensionNamePex] = pexExtendedId
1028 return bencode.MustMarshal(msg)
1033 if conn.fastEnabled() {
1034 if torrent.haveAllPieces() {
1035 conn.post(pp.Message{Type: pp.HaveAll})
1036 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
1038 } else if !torrent.haveAnyPieces() {
1039 conn.post(pp.Message{Type: pp.HaveNone})
1040 conn.sentHaves.Clear()
1046 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1047 conn.post(pp.Message{
1054 func (cl *Client) dhtPort() (ret uint16) {
1055 cl.eachDhtServer(func(s DhtServer) {
1056 ret = uint16(missinggo.AddrPort(s.Addr()))
1061 func (cl *Client) haveDhtServer() (ret bool) {
1062 cl.eachDhtServer(func(_ DhtServer) {
1068 // Process incoming ut_metadata message.
1069 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1070 var d map[string]int
1071 err := bencode.Unmarshal(payload, &d)
1072 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1073 } else if err != nil {
1074 return fmt.Errorf("error unmarshalling bencode: %s", err)
1076 msgType, ok := d["msg_type"]
1078 return errors.New("missing msg_type field")
1082 case pp.DataMetadataExtensionMsgType:
1083 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1084 if !c.requestedMetadataPiece(piece) {
1085 return fmt.Errorf("got unexpected piece %d", piece)
1087 c.metadataRequests[piece] = false
1088 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1089 if begin < 0 || begin >= len(payload) {
1090 return fmt.Errorf("data has bad offset in payload: %d", begin)
1092 t.saveMetadataPiece(piece, payload[begin:])
1093 c.lastUsefulChunkReceived = time.Now()
1094 return t.maybeCompleteMetadata()
1095 case pp.RequestMetadataExtensionMsgType:
1096 if !t.haveMetadataPiece(piece) {
1097 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1100 start := (1 << 14) * piece
1101 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1102 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1104 case pp.RejectMetadataExtensionMsgType:
1107 return errors.New("unknown msg_type value")
1111 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1112 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1113 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1118 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1122 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1125 if _, ok := cl.ipBlockRange(ip); ok {
1128 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1134 // Return a Torrent ready for insertion into a Client.
1135 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1136 // use provided storage, if provided
1137 storageClient := cl.defaultStorage
1138 if specStorage != nil {
1139 storageClient = storage.NewClient(specStorage)
1145 peers: prioritizedPeers{
1147 getPrio: func(p PeerInfo) peerPriority {
1148 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1151 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1153 halfOpen: make(map[string]PeerInfo),
1154 pieceStateChanges: pubsub.NewPubSub(),
1156 storageOpener: storageClient,
1157 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1159 networkingEnabled: true,
1160 metadataChanged: sync.Cond{
1163 webSeeds: make(map[string]*peer),
1165 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1166 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1167 t.logger = cl.logger.WithContextValue(t)
1168 t.setChunkSize(defaultChunkSize)
1172 // A file-like handle to some torrent data resource.
1173 type Handle interface {
1180 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1181 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1184 // Adds a torrent by InfoHash with a custom Storage implementation.
1185 // If the torrent already exists then this Storage is ignored and the
1186 // existing torrent returned with `new` set to `false`
1187 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1190 t, ok := cl.torrents[infoHash]
1196 t = cl.newTorrent(infoHash, specStorage)
1197 cl.eachDhtServer(func(s DhtServer) {
1198 go t.dhtAnnouncer(s)
1200 cl.torrents[infoHash] = t
1201 cl.clearAcceptLimits()
1202 t.updateWantPeersEvent()
1203 // Tickle Client.waitAccept, new torrent may want conns.
1204 cl.event.Broadcast()
1208 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1209 // Torrent.MergeSpec.
1210 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1211 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1212 err = t.MergeSpec(spec)
1213 if err != nil && new {
1219 type stringAddr string
1221 var _ net.Addr = stringAddr("")
1223 func (stringAddr) Network() string { return "" }
1224 func (me stringAddr) String() string { return string(me) }
1226 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1227 // spec.DisallowDataDownload/Upload will be read and applied
1228 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1229 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1230 if spec.DisplayName != "" {
1231 t.SetDisplayName(spec.DisplayName)
1233 if spec.InfoBytes != nil {
1234 err := t.SetInfoBytes(spec.InfoBytes)
1240 cl.AddDhtNodes(spec.DhtNodes)
1243 useTorrentSources(spec.Sources, t)
1244 for _, url := range spec.Webseeds {
1247 for _, peerAddr := range spec.PeerAddrs {
1249 Addr: stringAddr(peerAddr),
1250 Source: PeerSourceDirect,
1254 if spec.ChunkSize != 0 {
1255 t.setChunkSize(pp.Integer(spec.ChunkSize))
1257 t.addTrackers(spec.Trackers)
1259 t.dataDownloadDisallowed = spec.DisallowDataDownload
1260 t.dataUploadDisallowed = spec.DisallowDataUpload
1264 func useTorrentSources(sources []string, t *Torrent) {
1265 for _, s := range sources {
1267 err := useTorrentSource(s, t)
1269 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1271 t.logger.Printf("successfully used source %q", s)
1277 func useTorrentSource(source string, t *Torrent) error {
1278 req, err := http.NewRequest(http.MethodGet, source, nil)
1282 ctx, cancel := context.WithCancel(context.Background())
1292 req = req.WithContext(ctx)
1293 resp, err := http.DefaultClient.Do(req)
1297 mi, err := metainfo.Load(resp.Body)
1299 if ctx.Err() != nil {
1304 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1307 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1308 t, ok := cl.torrents[infoHash]
1310 err = fmt.Errorf("no such torrent")
1317 delete(cl.torrents, infoHash)
1321 func (cl *Client) allTorrentsCompleted() bool {
1322 for _, t := range cl.torrents {
1326 if !t.haveAllPieces() {
1333 // Returns true when all torrents are completely downloaded and false if the
1334 // client is stopped before that.
1335 func (cl *Client) WaitAll() bool {
1338 for !cl.allTorrentsCompleted() {
1339 if cl.closed.IsSet() {
1347 // Returns handles to all the torrents loaded in the Client.
1348 func (cl *Client) Torrents() []*Torrent {
1351 return cl.torrentsAsSlice()
1354 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1355 for _, t := range cl.torrents {
1356 ret = append(ret, t)
1361 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1362 spec, err := TorrentSpecFromMagnetUri(uri)
1366 T, _, err = cl.AddTorrentSpec(spec)
1370 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1371 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1375 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1376 mi, err := metainfo.LoadFromFile(filename)
1380 return cl.AddTorrent(mi)
1383 func (cl *Client) DhtServers() []DhtServer {
1384 return cl.dhtServers
1387 func (cl *Client) AddDhtNodes(nodes []string) {
1388 for _, n := range nodes {
1389 hmp := missinggo.SplitHostMaybePort(n)
1390 ip := net.ParseIP(hmp.Host)
1392 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1395 ni := krpc.NodeInfo{
1396 Addr: krpc.NodeAddr{
1401 cl.eachDhtServer(func(s DhtServer) {
1407 func (cl *Client) banPeerIP(ip net.IP) {
1408 cl.logger.Printf("banning ip %v", ip)
1409 if cl.badPeerIPs == nil {
1410 cl.badPeerIPs = make(map[string]struct{})
1412 cl.badPeerIPs[ip.String()] = struct{}{}
1415 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1421 PeerMaxRequests: 250,
1423 RemoteAddr: remoteAddr,
1426 connString: connString,
1428 writeBuffer: new(bytes.Buffer),
1429 callbacks: &cl.config.Callbacks,
1432 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1433 c.writerCond.L = cl.locker()
1434 c.setRW(connStatsReadWriter{nc, c})
1435 c.r = &rateLimitedReader{
1436 l: cl.config.DownloadRateLimiter,
1439 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1443 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1450 t.addPeers([]PeerInfo{{
1451 Addr: ipPortAddr{ip, port},
1452 Source: PeerSourceDhtAnnouncePeer,
1456 func firstNotNil(ips ...net.IP) net.IP {
1457 for _, ip := range ips {
1465 func (cl *Client) eachDialer(f func(Dialer) bool) {
1466 for _, s := range cl.dialers {
1473 func (cl *Client) eachListener(f func(Listener) bool) {
1474 for _, s := range cl.listeners {
1481 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1482 cl.eachListener(func(l Listener) bool {
1489 func (cl *Client) publicIp(peer net.IP) net.IP {
1490 // TODO: Use BEP 10 to determine how peers are seeing us.
1491 if peer.To4() != nil {
1493 cl.config.PublicIp4,
1494 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1499 cl.config.PublicIp6,
1500 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1504 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1505 l := cl.findListener(
1506 func(l Listener) bool {
1507 return f(addrIpOrNil(l.Addr()))
1513 return addrIpOrNil(l.Addr())
1516 // Our IP as a peer should see it.
1517 func (cl *Client) publicAddr(peer net.IP) IpPort {
1518 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1521 // ListenAddrs addresses currently being listened to.
1522 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1525 cl.eachListener(func(l Listener) bool {
1526 ret = append(ret, l.Addr())
1532 func (cl *Client) onBadAccept(addr net.Addr) {
1533 ipa, ok := tryIpPortFromNetAddr(addr)
1537 ip := maskIpForAcceptLimiting(ipa.IP)
1538 if cl.acceptLimiter == nil {
1539 cl.acceptLimiter = make(map[ipStr]int)
1541 cl.acceptLimiter[ipStr(ip.String())]++
1544 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1545 if ip4 := ip.To4(); ip4 != nil {
1546 return ip4.Mask(net.CIDRMask(24, 32))
1551 func (cl *Client) clearAcceptLimits() {
1552 cl.acceptLimiter = nil
1555 func (cl *Client) acceptLimitClearer() {
1558 case <-cl.closed.LockedChan(cl.locker()):
1560 case <-time.After(15 * time.Minute):
1562 cl.clearAcceptLimits()
1568 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1569 if cl.config.DisableAcceptRateLimiting {
1572 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1575 func (cl *Client) rLock() {
1579 func (cl *Client) rUnlock() {
1583 func (cl *Client) lock() {
1587 func (cl *Client) unlock() {
1591 func (cl *Client) locker() *lockWithDeferreds {
1595 func (cl *Client) String() string {
1596 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1599 // Returns connection-level aggregate stats at the Client level. See the comment on
1600 // TorrentStats.ConnStats.
1601 func (cl *Client) ConnStats() ConnStats {
1602 return cl.stats.Copy()