18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/torrent/internal/limiter"
28 request_strategy "github.com/anacrolix/torrent/request-strategy"
29 "github.com/anacrolix/torrent/tracker"
30 "github.com/anacrolix/torrent/webtorrent"
31 "github.com/davecgh/go-spew/spew"
32 "github.com/dustin/go-humanize"
33 "github.com/google/btree"
34 "github.com/pion/datachannel"
35 "golang.org/x/time/rate"
36 "golang.org/x/xerrors"
38 "github.com/anacrolix/missinggo/v2"
39 "github.com/anacrolix/missinggo/v2/conntrack"
41 "github.com/anacrolix/torrent/bencode"
42 "github.com/anacrolix/torrent/iplist"
43 "github.com/anacrolix/torrent/metainfo"
44 "github.com/anacrolix/torrent/mse"
45 pp "github.com/anacrolix/torrent/peer_protocol"
46 "github.com/anacrolix/torrent/storage"
49 // Clients contain zero or more Torrents. A Client manages a blocklist, the
50 // TCP/UDP protocol ports, and DHT as desired.
52 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
58 closed missinggo.Event
64 defaultStorage *storage.Client
68 dhtServers []DhtServer
69 ipBlockList iplist.Ranger
71 // Set of addresses that have our client ID. This intentionally will
72 // include ourselves if we end up trying to connect to our own address
73 // through legitimate channels.
74 dopplegangerAddrs map[string]struct{}
75 badPeerIPs map[string]struct{}
76 torrents map[InfoHash]*Torrent
78 acceptLimiter map[ipStr]int
79 dialRateLimiter *rate.Limiter
82 websocketTrackers websocketTrackers
84 activeAnnounceLimiter limiter.Instance
86 pieceRequestOrder request_strategy.ClientPieceOrder
91 func (cl *Client) BadPeerIPs() []string {
94 return cl.badPeerIPsLocked()
97 func (cl *Client) badPeerIPsLocked() []string {
98 return slices.FromMapKeys(cl.badPeerIPs).([]string)
101 func (cl *Client) PeerID() PeerID {
105 // Returns the port number for the first listener that has one. No longer assumes that all port
106 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
108 func (cl *Client) LocalPort() (port int) {
109 cl.eachListener(func(l Listener) bool {
110 port = addrPortOrZero(l.Addr())
116 func writeDhtServerStatus(w io.Writer, s DhtServer) {
117 dhtStats := s.Stats()
118 fmt.Fprintf(w, " ID: %x\n", s.ID())
119 spew.Fdump(w, dhtStats)
122 // Writes out a human readable status of the client, such as for writing to a
124 func (cl *Client) WriteStatus(_w io.Writer) {
127 w := bufio.NewWriter(_w)
129 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
130 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
131 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
132 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
133 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
134 cl.eachDhtServer(func(s DhtServer) {
135 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
136 writeDhtServerStatus(w, s)
138 spew.Fdump(w, &cl.stats)
139 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
141 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
142 return l.InfoHash().AsString() < r.InfoHash().AsString()
145 fmt.Fprint(w, "<unknown name>")
147 fmt.Fprint(w, t.name())
153 "%f%% of %d bytes (%s)",
154 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
156 humanize.Bytes(uint64(*t.length)))
158 w.WriteString("<missing metainfo>")
166 // Filters things that are less than warning from UPnP discovery.
167 func upnpDiscoverLogFilter(m log.Msg) bool {
168 level, ok := m.GetLevel()
169 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
172 func (cl *Client) initLogger() {
173 logger := cl.config.Logger
176 if !cl.config.Debug {
177 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
180 cl.logger = logger.WithValues(cl)
183 func (cl *Client) announceKey() int32 {
184 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
187 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
189 cfg = NewDefaultClientConfig()
199 dopplegangerAddrs: make(map[string]struct{}),
200 torrents: make(map[metainfo.Hash]*Torrent),
201 dialRateLimiter: rate.NewLimiter(10, 10),
203 cl.activeAnnounceLimiter.SlotsPerKey = 2
204 go cl.acceptLimitClearer()
212 cl.event.L = cl.locker()
213 storageImpl := cfg.DefaultStorage
214 if storageImpl == nil {
215 // We'd use mmap by default but HFS+ doesn't support sparse files.
216 storageImplCloser := storage.NewFile(cfg.DataDir)
217 cl.onClose = append(cl.onClose, func() {
218 if err := storageImplCloser.Close(); err != nil {
219 cl.logger.Printf("error closing default storage: %s", err)
222 storageImpl = storageImplCloser
224 cl.defaultStorage = storage.NewClient(storageImpl)
225 if cfg.IPBlocklist != nil {
226 cl.ipBlockList = cfg.IPBlocklist
229 if cfg.PeerID != "" {
230 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
232 o := copy(cl.peerID[:], cfg.Bep20)
233 _, err = rand.Read(cl.peerID[o:])
235 panic("error generating peer id")
239 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
247 for _, _s := range sockets {
248 s := _s // Go is fucking retarded.
249 cl.onClose = append(cl.onClose, func() { s.Close() })
250 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
251 cl.dialers = append(cl.dialers, s)
252 cl.listeners = append(cl.listeners, s)
253 go cl.acceptConnections(s)
259 for _, s := range sockets {
260 if pc, ok := s.(net.PacketConn); ok {
261 ds, err := cl.newAnacrolixDhtServer(pc)
265 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
266 cl.onClose = append(cl.onClose, func() { ds.Close() })
271 cl.websocketTrackers = websocketTrackers{
274 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
277 t, ok := cl.torrents[infoHash]
279 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
281 return t.announceRequest(event), nil
283 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
286 t, ok := cl.torrents[dcc.InfoHash]
288 cl.logger.WithDefaultLevel(log.Warning).Printf(
289 "got webrtc conn for unloaded torrent with infohash %x",
295 go t.onWebRtcConn(dc, dcc)
304 func (cl *Client) AddDhtServer(d DhtServer) {
305 cl.dhtServers = append(cl.dhtServers, d)
308 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
309 // given address for any Torrent.
310 func (cl *Client) AddDialer(d Dialer) {
313 cl.dialers = append(cl.dialers, d)
314 for _, t := range cl.torrents {
319 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
321 func (cl *Client) AddListener(l Listener) {
322 cl.listeners = append(cl.listeners, l)
323 go cl.acceptConnections(l)
326 func (cl *Client) firewallCallback(net.Addr) bool {
328 block := !cl.wantConns()
331 torrent.Add("connections firewalled", 1)
333 torrent.Add("connections not firewalled", 1)
338 func (cl *Client) listenOnNetwork(n network) bool {
339 if n.Ipv4 && cl.config.DisableIPv4 {
342 if n.Ipv6 && cl.config.DisableIPv6 {
345 if n.Tcp && cl.config.DisableTCP {
348 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
354 func (cl *Client) listenNetworks() (ns []network) {
355 for _, n := range allPeerNetworks {
356 if cl.listenOnNetwork(n) {
363 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
364 cfg := dht.ServerConfig{
365 IPBlocklist: cl.ipBlockList,
367 OnAnnouncePeer: cl.onDHTAnnouncePeer,
368 PublicIP: func() net.IP {
369 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
370 return cl.config.PublicIp6
372 return cl.config.PublicIp4
374 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
375 ConnectionTracking: cl.config.ConnTracker,
376 OnQuery: cl.config.DHTOnQuery,
377 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
379 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
382 s, err = dht.NewServer(&cfg)
385 ts, err := s.Bootstrap()
387 cl.logger.Printf("error bootstrapping dht: %s", err)
389 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
395 func (cl *Client) Closed() <-chan struct{} {
401 func (cl *Client) eachDhtServer(f func(DhtServer)) {
402 for _, ds := range cl.dhtServers {
407 // Stops the client. All connections to peers are closed and all activity will
409 func (cl *Client) Close() {
413 for _, t := range cl.torrents {
416 for i := range cl.onClose {
417 cl.onClose[len(cl.onClose)-1-i]()
422 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
423 if cl.ipBlockList == nil {
426 return cl.ipBlockList.Lookup(ip)
429 func (cl *Client) ipIsBlocked(ip net.IP) bool {
430 _, blocked := cl.ipBlockRange(ip)
434 func (cl *Client) wantConns() bool {
435 for _, t := range cl.torrents {
443 func (cl *Client) waitAccept() {
445 if cl.closed.IsSet() {
455 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
456 func (cl *Client) rejectAccepted(conn net.Conn) error {
457 ra := conn.RemoteAddr()
458 if rip := addrIpOrNil(ra); rip != nil {
459 if cl.config.DisableIPv4Peers && rip.To4() != nil {
460 return errors.New("ipv4 peers disabled")
462 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
463 return errors.New("ipv4 disabled")
466 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
467 return errors.New("ipv6 disabled")
469 if cl.rateLimitAccept(rip) {
470 return errors.New("source IP accepted rate limited")
472 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
473 return errors.New("bad source addr")
479 func (cl *Client) acceptConnections(l Listener) {
481 conn, err := l.Accept()
482 torrent.Add("client listener accepts", 1)
483 conn = pproffd.WrapNetConn(conn)
485 closed := cl.closed.IsSet()
488 reject = cl.rejectAccepted(conn)
498 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
503 torrent.Add("rejected accepted connections", 1)
504 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
507 go cl.incomingConnection(conn)
509 log.Fmsg("accepted %q connection at %q from %q",
513 ).SetLevel(log.Debug).Log(cl.logger)
514 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
515 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
516 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
521 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
522 func regularNetConnPeerConnConnString(nc net.Conn) string {
523 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
526 func (cl *Client) incomingConnection(nc net.Conn) {
528 if tc, ok := nc.(*net.TCPConn); ok {
531 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
532 regularNetConnPeerConnConnString(nc))
534 c.Discovery = PeerSourceIncoming
535 cl.runReceivedConn(c)
538 // Returns a handle to the given torrent, if it's present in the client.
539 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
542 t, ok = cl.torrents[ih]
546 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
547 return cl.torrents[ih]
550 type dialResult struct {
555 func countDialResult(err error) {
557 torrent.Add("successful dials", 1)
559 torrent.Add("unsuccessful dials", 1)
563 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
564 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
565 if ret < minDialTimeout {
571 // Returns whether an address is known to connect to a client with our own ID.
572 func (cl *Client) dopplegangerAddr(addr string) bool {
573 _, ok := cl.dopplegangerAddrs[addr]
577 // Returns a connection over UTP or TCP, whichever is first to connect.
578 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
580 t := perf.NewTimer(perf.CallerName(0))
583 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
585 t.Mark("returned conn over " + res.Network)
589 ctx, cancel := context.WithCancel(ctx)
590 // As soon as we return one connection, cancel the others.
593 resCh := make(chan dialResult, left)
597 cl.eachDialer(func(s Dialer) bool {
600 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
603 cl.dialFromSocket(ctx, s, addr),
604 s.LocalAddr().Network(),
611 // Wait for a successful connection.
613 defer perf.ScopeTimer()()
614 for ; left > 0 && res.Conn == nil; left-- {
618 // There are still incompleted dials.
620 for ; left > 0; left-- {
621 conn := (<-resCh).Conn
628 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
630 //if res.Conn != nil {
631 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
633 // cl.logger.Printf("failed to dial %s", addr)
638 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
639 network := s.LocalAddr().Network()
640 cte := cl.config.ConnTracker.Wait(
642 conntrack.Entry{network, s.LocalAddr().String(), addr},
643 "dial torrent client",
646 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
647 // which dial errors allow us to forget the connection tracking entry handle.
648 if ctx.Err() != nil {
654 c, err := s.Dial(ctx, addr)
655 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
656 // it now in case we close the connection forthwith.
657 if tc, ok := c.(*net.TCPConn); ok {
662 if err != nil && forgettableDialError(err) {
669 return closeWrapper{c, func() error {
676 func forgettableDialError(err error) bool {
677 return strings.Contains(err.Error(), "no suitable address found")
680 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
681 if _, ok := t.halfOpen[addr]; !ok {
682 panic("invariant broken")
684 delete(t.halfOpen, addr)
686 for _, t := range cl.torrents {
691 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
692 // for valid reasons.
693 func (cl *Client) initiateProtocolHandshakes(
697 outgoing, encryptHeader bool,
698 remoteAddr PeerRemoteAddr,
699 network, connString string,
701 c *PeerConn, err error,
703 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
704 c.headerEncrypted = encryptHeader
705 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
707 dl, ok := ctx.Deadline()
711 err = nc.SetDeadline(dl)
715 err = cl.initiateHandshakes(c, t)
719 // Returns nil connection and nil error if no connection could be established for valid reasons.
720 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
721 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
724 return t.dialTimeout()
727 dr := cl.dialFirst(dialCtx, addr.String())
730 if dialCtx.Err() != nil {
731 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
733 return nil, errors.New("dial failed")
735 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
742 // Returns nil connection and nil error if no connection could be established
743 // for valid reasons.
744 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
745 torrent.Add("establish outgoing connection", 1)
746 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
747 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
749 torrent.Add("initiated conn with preferred header obfuscation", 1)
752 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
753 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
754 // We should have just tried with the preferred header obfuscation. If it was required,
755 // there's nothing else to try.
758 // Try again with encryption if we didn't earlier, or without if we did.
759 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
761 torrent.Add("initiated conn with fallback header obfuscation", 1)
763 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
767 // Called to dial out and run a connection. The addr we're given is already
768 // considered half-open.
769 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
770 cl.dialRateLimiter.Wait(context.Background())
771 c, err := cl.establishOutgoingConn(t, addr)
774 // Don't release lock between here and addPeerConn, unless it's for
776 cl.noLongerHalfOpen(t, addr.String())
779 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
786 t.runHandshookConnLoggingErr(c)
789 // The port number for incoming peer connections. 0 if the client isn't listening.
790 func (cl *Client) incomingPeerPort() int {
791 return cl.LocalPort()
794 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
795 if c.headerEncrypted {
798 rw, c.cryptoMethod, err = mse.InitiateHandshake(
805 cl.config.CryptoProvides,
809 return xerrors.Errorf("header obfuscation handshake: %w", err)
812 ih, err := cl.connBtHandshake(c, &t.infoHash)
814 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
816 if ih != t.infoHash {
817 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
822 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
823 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
824 func (cl *Client) forSkeys(f func([]byte) bool) {
827 if false { // Emulate the bug from #114
829 for ih := range cl.torrents {
833 for range cl.torrents {
840 for ih := range cl.torrents {
847 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
848 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
854 // Do encryption and bittorrent handshakes as receiver.
855 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
856 defer perf.ScopeTimerErr(&err)()
858 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
860 if err == nil || err == mse.ErrNoSecretKeyMatch {
861 if c.headerEncrypted {
862 torrent.Add("handshakes received encrypted", 1)
864 torrent.Add("handshakes received unencrypted", 1)
867 torrent.Add("handshakes received with error while handling encryption", 1)
870 if err == mse.ErrNoSecretKeyMatch {
875 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
876 err = errors.New("connection does not have required header obfuscation")
879 ih, err := cl.connBtHandshake(c, nil)
881 err = xerrors.Errorf("during bt handshake: %w", err)
890 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
891 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
896 c.PeerExtensionBytes = res.PeerExtensionBits
897 c.PeerID = res.PeerID
898 c.completedHandshake = time.Now()
899 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
905 func (cl *Client) runReceivedConn(c *PeerConn) {
906 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
910 t, err := cl.receiveHandshakes(c)
913 "error receiving handshakes on %v: %s", c, err,
914 ).SetLevel(log.Debug).
916 "network", c.Network,
918 torrent.Add("error receiving handshake", 1)
920 cl.onBadAccept(c.RemoteAddr)
925 torrent.Add("received handshake for unloaded torrent", 1)
926 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
928 cl.onBadAccept(c.RemoteAddr)
932 torrent.Add("received handshake for loaded torrent", 1)
935 t.runHandshookConnLoggingErr(c)
938 // Client lock must be held before entering this.
939 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
941 if c.PeerID == cl.peerID {
944 addr := c.conn.RemoteAddr().String()
945 cl.dopplegangerAddrs[addr] = struct{}{}
947 // Because the remote address is not necessarily the same as its client's torrent listen
948 // address, we won't record the remote address as a doppleganger. Instead, the initiator
949 // can record *us* as the doppleganger.
951 return errors.New("local and remote peer ids are the same")
953 c.conn.SetWriteDeadline(time.Time{})
954 c.r = deadlineReader{c.conn, c.r}
955 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
956 if connIsIpv6(c.conn) {
957 torrent.Add("completed handshake over ipv6", 1)
959 if err := t.addPeerConn(c); err != nil {
960 return fmt.Errorf("adding connection: %w", err)
962 defer t.dropConnection(c)
963 go c.writer(time.Minute)
964 cl.sendInitialMessages(c, t)
965 err := c.mainReadLoop()
967 return fmt.Errorf("main read loop: %w", err)
972 // If peer requests are buffered on read, this instructs the amount of memory that might be used to
973 // cache pending writes. Assuming 512KiB cached for sending, for 16KiB chunks.
974 const localClientReqq = 1 << 5
976 // See the order given in Transmission's tr_peerMsgsNew.
977 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
978 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
979 conn.write(pp.Message{
981 ExtendedID: pp.HandshakeExtendedID,
982 ExtendedPayload: func() []byte {
983 msg := pp.ExtendedHandshakeMessage{
984 M: map[pp.ExtensionName]pp.ExtensionNumber{
985 pp.ExtensionNameMetadata: metadataExtendedId,
987 V: cl.config.ExtendedHandshakeClientVersion,
988 Reqq: localClientReqq,
989 YourIp: pp.CompactIp(conn.remoteIp()),
990 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
991 Port: cl.incomingPeerPort(),
992 MetadataSize: torrent.metadataSize(),
993 // TODO: We can figured these out specific to the socket
995 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
996 Ipv6: cl.config.PublicIp6.To16(),
998 if !cl.config.DisablePEX {
999 msg.M[pp.ExtensionNamePex] = pexExtendedId
1001 return bencode.MustMarshal(msg)
1006 if conn.fastEnabled() {
1007 if torrent.haveAllPieces() {
1008 conn.write(pp.Message{Type: pp.HaveAll})
1009 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
1011 } else if !torrent.haveAnyPieces() {
1012 conn.write(pp.Message{Type: pp.HaveNone})
1013 conn.sentHaves.Clear()
1019 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1020 conn.write(pp.Message{
1027 func (cl *Client) dhtPort() (ret uint16) {
1028 cl.eachDhtServer(func(s DhtServer) {
1029 ret = uint16(missinggo.AddrPort(s.Addr()))
1034 func (cl *Client) haveDhtServer() (ret bool) {
1035 cl.eachDhtServer(func(_ DhtServer) {
1041 // Process incoming ut_metadata message.
1042 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1043 var d map[string]int
1044 err := bencode.Unmarshal(payload, &d)
1045 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1046 } else if err != nil {
1047 return fmt.Errorf("error unmarshalling bencode: %s", err)
1049 msgType, ok := d["msg_type"]
1051 return errors.New("missing msg_type field")
1055 case pp.DataMetadataExtensionMsgType:
1056 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1057 if !c.requestedMetadataPiece(piece) {
1058 return fmt.Errorf("got unexpected piece %d", piece)
1060 c.metadataRequests[piece] = false
1061 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1062 if begin < 0 || begin >= len(payload) {
1063 return fmt.Errorf("data has bad offset in payload: %d", begin)
1065 t.saveMetadataPiece(piece, payload[begin:])
1066 c.lastUsefulChunkReceived = time.Now()
1067 err = t.maybeCompleteMetadata()
1069 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1070 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1071 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1072 // log consumers can filter for this message.
1073 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1076 case pp.RequestMetadataExtensionMsgType:
1077 if !t.haveMetadataPiece(piece) {
1078 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1081 start := (1 << 14) * piece
1082 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1083 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1085 case pp.RejectMetadataExtensionMsgType:
1088 return errors.New("unknown msg_type value")
1092 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1093 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1094 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1099 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1103 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1106 if _, ok := cl.ipBlockRange(ip); ok {
1109 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1115 // Return a Torrent ready for insertion into a Client.
1116 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1117 // use provided storage, if provided
1118 storageClient := cl.defaultStorage
1119 if specStorage != nil {
1120 storageClient = storage.NewClient(specStorage)
1126 peers: prioritizedPeers{
1128 getPrio: func(p PeerInfo) peerPriority {
1130 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1133 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1135 halfOpen: make(map[string]PeerInfo),
1136 pieceStateChanges: pubsub.NewPubSub(),
1138 storageOpener: storageClient,
1139 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1141 networkingEnabled: true,
1142 metadataChanged: sync.Cond{
1145 webSeeds: make(map[string]*Peer),
1147 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1148 t.logger = cl.logger.WithContextValue(t)
1149 t.setChunkSize(defaultChunkSize)
1153 // A file-like handle to some torrent data resource.
1154 type Handle interface {
1161 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1162 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1165 // Adds a torrent by InfoHash with a custom Storage implementation.
1166 // If the torrent already exists then this Storage is ignored and the
1167 // existing torrent returned with `new` set to `false`
1168 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1171 t, ok := cl.torrents[infoHash]
1177 t = cl.newTorrent(infoHash, specStorage)
1178 cl.eachDhtServer(func(s DhtServer) {
1179 go t.dhtAnnouncer(s)
1181 cl.torrents[infoHash] = t
1182 cl.clearAcceptLimits()
1183 t.updateWantPeersEvent()
1184 // Tickle Client.waitAccept, new torrent may want conns.
1185 cl.event.Broadcast()
1189 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1190 // Torrent.MergeSpec.
1191 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1192 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1193 err = t.MergeSpec(spec)
1194 if err != nil && new {
1200 type stringAddr string
1202 var _ net.Addr = stringAddr("")
1204 func (stringAddr) Network() string { return "" }
1205 func (me stringAddr) String() string { return string(me) }
1207 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1208 // spec.DisallowDataDownload/Upload will be read and applied
1209 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1210 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1211 if spec.DisplayName != "" {
1212 t.SetDisplayName(spec.DisplayName)
1214 if spec.InfoBytes != nil {
1215 err := t.SetInfoBytes(spec.InfoBytes)
1221 cl.AddDhtNodes(spec.DhtNodes)
1224 useTorrentSources(spec.Sources, t)
1225 for _, url := range spec.Webseeds {
1228 for _, peerAddr := range spec.PeerAddrs {
1230 Addr: stringAddr(peerAddr),
1231 Source: PeerSourceDirect,
1235 if spec.ChunkSize != 0 {
1236 t.setChunkSize(pp.Integer(spec.ChunkSize))
1238 t.addTrackers(spec.Trackers)
1240 t.dataDownloadDisallowed = spec.DisallowDataDownload
1241 t.dataUploadDisallowed = spec.DisallowDataUpload
1245 func useTorrentSources(sources []string, t *Torrent) {
1246 for _, s := range sources {
1248 err := useTorrentSource(s, t)
1250 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1252 t.logger.Printf("successfully used source %q", s)
1258 func useTorrentSource(source string, t *Torrent) error {
1259 req, err := http.NewRequest(http.MethodGet, source, nil)
1263 ctx, cancel := context.WithCancel(context.Background())
1273 req = req.WithContext(ctx)
1274 resp, err := http.DefaultClient.Do(req)
1278 mi, err := metainfo.Load(resp.Body)
1280 if ctx.Err() != nil {
1285 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1288 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1289 t, ok := cl.torrents[infoHash]
1291 err = fmt.Errorf("no such torrent")
1298 delete(cl.torrents, infoHash)
1302 func (cl *Client) allTorrentsCompleted() bool {
1303 for _, t := range cl.torrents {
1307 if !t.haveAllPieces() {
1314 // Returns true when all torrents are completely downloaded and false if the
1315 // client is stopped before that.
1316 func (cl *Client) WaitAll() bool {
1319 for !cl.allTorrentsCompleted() {
1320 if cl.closed.IsSet() {
1328 // Returns handles to all the torrents loaded in the Client.
1329 func (cl *Client) Torrents() []*Torrent {
1332 return cl.torrentsAsSlice()
1335 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1336 for _, t := range cl.torrents {
1337 ret = append(ret, t)
1342 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1343 spec, err := TorrentSpecFromMagnetUri(uri)
1347 T, _, err = cl.AddTorrentSpec(spec)
1351 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1352 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1356 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1357 mi, err := metainfo.LoadFromFile(filename)
1361 return cl.AddTorrent(mi)
1364 func (cl *Client) DhtServers() []DhtServer {
1365 return cl.dhtServers
1368 func (cl *Client) AddDhtNodes(nodes []string) {
1369 for _, n := range nodes {
1370 hmp := missinggo.SplitHostMaybePort(n)
1371 ip := net.ParseIP(hmp.Host)
1373 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1376 ni := krpc.NodeInfo{
1377 Addr: krpc.NodeAddr{
1382 cl.eachDhtServer(func(s DhtServer) {
1388 func (cl *Client) banPeerIP(ip net.IP) {
1389 cl.logger.Printf("banning ip %v", ip)
1390 if cl.badPeerIPs == nil {
1391 cl.badPeerIPs = make(map[string]struct{})
1393 cl.badPeerIPs[ip.String()] = struct{}{}
1396 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1405 PeerMaxRequests: 250,
1407 RemoteAddr: remoteAddr,
1409 callbacks: &cl.config.Callbacks,
1411 connString: connString,
1413 writeBuffer: new(bytes.Buffer),
1416 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1417 c.writerCond.L = cl.locker()
1418 c.setRW(connStatsReadWriter{nc, c})
1419 c.r = &rateLimitedReader{
1420 l: cl.config.DownloadRateLimiter,
1423 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1424 for _, f := range cl.config.Callbacks.NewPeer {
1430 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1437 t.addPeers([]PeerInfo{{
1438 Addr: ipPortAddr{ip, port},
1439 Source: PeerSourceDhtAnnouncePeer,
1443 func firstNotNil(ips ...net.IP) net.IP {
1444 for _, ip := range ips {
1452 func (cl *Client) eachDialer(f func(Dialer) bool) {
1453 for _, s := range cl.dialers {
1460 func (cl *Client) eachListener(f func(Listener) bool) {
1461 for _, s := range cl.listeners {
1468 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1469 cl.eachListener(func(l Listener) bool {
1476 func (cl *Client) publicIp(peer net.IP) net.IP {
1477 // TODO: Use BEP 10 to determine how peers are seeing us.
1478 if peer.To4() != nil {
1480 cl.config.PublicIp4,
1481 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1486 cl.config.PublicIp6,
1487 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1491 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1492 l := cl.findListener(
1493 func(l Listener) bool {
1494 return f(addrIpOrNil(l.Addr()))
1500 return addrIpOrNil(l.Addr())
1503 // Our IP as a peer should see it.
1504 func (cl *Client) publicAddr(peer net.IP) IpPort {
1505 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1508 // ListenAddrs addresses currently being listened to.
1509 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1512 cl.eachListener(func(l Listener) bool {
1513 ret = append(ret, l.Addr())
1519 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1520 ipa, ok := tryIpPortFromNetAddr(addr)
1524 ip := maskIpForAcceptLimiting(ipa.IP)
1525 if cl.acceptLimiter == nil {
1526 cl.acceptLimiter = make(map[ipStr]int)
1528 cl.acceptLimiter[ipStr(ip.String())]++
1531 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1532 if ip4 := ip.To4(); ip4 != nil {
1533 return ip4.Mask(net.CIDRMask(24, 32))
1538 func (cl *Client) clearAcceptLimits() {
1539 cl.acceptLimiter = nil
1542 func (cl *Client) acceptLimitClearer() {
1545 case <-cl.closed.LockedChan(cl.locker()):
1547 case <-time.After(15 * time.Minute):
1549 cl.clearAcceptLimits()
1555 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1556 if cl.config.DisableAcceptRateLimiting {
1559 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1562 func (cl *Client) rLock() {
1566 func (cl *Client) rUnlock() {
1570 func (cl *Client) lock() {
1574 func (cl *Client) unlock() {
1578 func (cl *Client) locker() *lockWithDeferreds {
1582 func (cl *Client) String() string {
1583 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1586 // Returns connection-level aggregate stats at the Client level. See the comment on
1587 // TorrentStats.ConnStats.
1588 func (cl *Client) ConnStats() ConnStats {
1589 return cl.stats.Copy()