17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pubsub"
23 "github.com/anacrolix/missinggo/slices"
24 "github.com/anacrolix/missinggo/v2/pproffd"
25 "github.com/anacrolix/sync"
26 "github.com/anacrolix/torrent/tracker"
27 "github.com/anacrolix/torrent/webtorrent"
28 "github.com/davecgh/go-spew/spew"
29 "github.com/dustin/go-humanize"
30 "github.com/google/btree"
31 "github.com/pion/datachannel"
32 "golang.org/x/time/rate"
33 "golang.org/x/xerrors"
35 "github.com/anacrolix/missinggo/v2"
36 "github.com/anacrolix/missinggo/v2/conntrack"
38 "github.com/anacrolix/torrent/bencode"
39 "github.com/anacrolix/torrent/iplist"
40 "github.com/anacrolix/torrent/metainfo"
41 "github.com/anacrolix/torrent/mse"
42 pp "github.com/anacrolix/torrent/peer_protocol"
43 "github.com/anacrolix/torrent/storage"
46 // Clients contain zero or more Torrents. A Client manages a blocklist, the
47 // TCP/UDP protocol ports, and DHT as desired.
49 // An aggregate of stats over all connections. First in struct to ensure
50 // 64-bit alignment of fields. See #262.
55 closed missinggo.Event
61 defaultStorage *storage.Client
65 dhtServers []DhtServer
66 ipBlockList iplist.Ranger
67 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
68 extensionBytes pp.PeerExtensionBits
70 // Set of addresses that have our client ID. This intentionally will
71 // include ourselves if we end up trying to connect to our own address
72 // through legitimate channels.
73 dopplegangerAddrs map[string]struct{}
74 badPeerIPs map[string]struct{}
75 torrents map[InfoHash]*Torrent
77 acceptLimiter map[ipStr]int
78 dialRateLimiter *rate.Limiter
80 websocketTrackers websocketTrackers
85 func (cl *Client) BadPeerIPs() []string {
88 return cl.badPeerIPsLocked()
91 func (cl *Client) badPeerIPsLocked() []string {
92 return slices.FromMapKeys(cl.badPeerIPs).([]string)
95 func (cl *Client) PeerID() PeerID {
99 // Returns the port number for the first listener that has one. No longer assumes that all port
100 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
102 func (cl *Client) LocalPort() (port int) {
103 cl.eachListener(func(l Listener) bool {
104 port = addrPortOrZero(l.Addr())
110 func writeDhtServerStatus(w io.Writer, s DhtServer) {
111 dhtStats := s.Stats()
112 fmt.Fprintf(w, " ID: %x\n", s.ID())
113 spew.Fdump(w, dhtStats)
116 // Writes out a human readable status of the client, such as for writing to a
118 func (cl *Client) WriteStatus(_w io.Writer) {
121 w := bufio.NewWriter(_w)
123 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
124 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
125 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
126 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
127 cl.eachDhtServer(func(s DhtServer) {
128 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
129 writeDhtServerStatus(w, s)
131 spew.Fdump(w, &cl.stats)
132 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
134 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
135 return l.InfoHash().AsString() < r.InfoHash().AsString()
138 fmt.Fprint(w, "<unknown name>")
140 fmt.Fprint(w, t.name())
144 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
146 w.WriteString("<missing metainfo>")
154 func (cl *Client) initLogger() {
155 cl.logger = cl.config.Logger.WithValues(cl)
156 if !cl.config.Debug {
157 cl.logger = cl.logger.FilterLevel(log.Info)
161 func (cl *Client) announceKey() int32 {
162 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
165 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
167 cfg = NewDefaultClientConfig()
177 dopplegangerAddrs: make(map[string]struct{}),
178 torrents: make(map[metainfo.Hash]*Torrent),
179 dialRateLimiter: rate.NewLimiter(10, 10),
181 go cl.acceptLimitClearer()
189 cl.extensionBytes = defaultPeerExtensionBytes()
190 cl.event.L = cl.locker()
191 storageImpl := cfg.DefaultStorage
192 if storageImpl == nil {
193 // We'd use mmap by default but HFS+ doesn't support sparse files.
194 storageImplCloser := storage.NewFile(cfg.DataDir)
195 cl.onClose = append(cl.onClose, func() {
196 if err := storageImplCloser.Close(); err != nil {
197 cl.logger.Printf("error closing default storage: %s", err)
200 storageImpl = storageImplCloser
202 cl.defaultStorage = storage.NewClient(storageImpl)
203 if cfg.IPBlocklist != nil {
204 cl.ipBlockList = cfg.IPBlocklist
207 if cfg.PeerID != "" {
208 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
210 o := copy(cl.peerID[:], cfg.Bep20)
211 _, err = rand.Read(cl.peerID[o:])
213 panic("error generating peer id")
217 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
225 for _, _s := range sockets {
226 s := _s // Go is fucking retarded.
227 cl.onClose = append(cl.onClose, func() { s.Close() })
228 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
229 cl.dialers = append(cl.dialers, s)
230 cl.listeners = append(cl.listeners, s)
231 go cl.acceptConnections(s)
237 for _, s := range sockets {
238 if pc, ok := s.(net.PacketConn); ok {
239 ds, err := cl.newAnacrolixDhtServer(pc)
243 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
244 cl.onClose = append(cl.onClose, func() { ds.Close() })
249 cl.websocketTrackers = websocketTrackers{
252 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) tracker.AnnounceRequest {
255 return cl.torrents[infoHash].announceRequest(event)
257 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
260 t, ok := cl.torrents[dcc.InfoHash]
262 cl.logger.WithDefaultLevel(log.Warning).Printf(
263 "got webrtc conn for unloaded torrent with infohash %x",
269 go t.onWebRtcConn(dc, dcc)
276 func (cl *Client) AddDhtServer(d DhtServer) {
277 cl.dhtServers = append(cl.dhtServers, d)
280 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
281 // given address for any Torrent.
282 func (cl *Client) AddDialer(d Dialer) {
285 cl.dialers = append(cl.dialers, d)
286 for _, t := range cl.torrents {
291 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
293 func (cl *Client) AddListener(l Listener) {
294 cl.listeners = append(cl.listeners, l)
295 go cl.acceptConnections(l)
298 func (cl *Client) firewallCallback(net.Addr) bool {
300 block := !cl.wantConns()
303 torrent.Add("connections firewalled", 1)
305 torrent.Add("connections not firewalled", 1)
310 func (cl *Client) listenOnNetwork(n network) bool {
311 if n.Ipv4 && cl.config.DisableIPv4 {
314 if n.Ipv6 && cl.config.DisableIPv6 {
317 if n.Tcp && cl.config.DisableTCP {
320 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
326 func (cl *Client) listenNetworks() (ns []network) {
327 for _, n := range allPeerNetworks {
328 if cl.listenOnNetwork(n) {
335 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
336 cfg := dht.ServerConfig{
337 IPBlocklist: cl.ipBlockList,
339 OnAnnouncePeer: cl.onDHTAnnouncePeer,
340 PublicIP: func() net.IP {
341 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
342 return cl.config.PublicIp6
344 return cl.config.PublicIp4
346 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
347 ConnectionTracking: cl.config.ConnTracker,
348 OnQuery: cl.config.DHTOnQuery,
349 Logger: cl.logger.WithText(func(m log.Msg) string {
350 return fmt.Sprintf("dht server on %v: %s", conn.LocalAddr().String(), m.Text())
353 s, err = dht.NewServer(&cfg)
356 ts, err := s.Bootstrap()
358 cl.logger.Printf("error bootstrapping dht: %s", err)
360 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
366 func (cl *Client) Closed() <-chan struct{} {
372 func (cl *Client) eachDhtServer(f func(DhtServer)) {
373 for _, ds := range cl.dhtServers {
378 // Stops the client. All connections to peers are closed and all activity will
380 func (cl *Client) Close() {
384 for _, t := range cl.torrents {
387 for i := range cl.onClose {
388 cl.onClose[len(cl.onClose)-1-i]()
393 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
394 if cl.ipBlockList == nil {
397 return cl.ipBlockList.Lookup(ip)
400 func (cl *Client) ipIsBlocked(ip net.IP) bool {
401 _, blocked := cl.ipBlockRange(ip)
405 func (cl *Client) wantConns() bool {
406 for _, t := range cl.torrents {
414 func (cl *Client) waitAccept() {
416 if cl.closed.IsSet() {
426 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
427 func (cl *Client) rejectAccepted(conn net.Conn) error {
428 ra := conn.RemoteAddr()
429 if rip := addrIpOrNil(ra); rip != nil {
430 if cl.config.DisableIPv4Peers && rip.To4() != nil {
431 return errors.New("ipv4 peers disabled")
433 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
434 return errors.New("ipv4 disabled")
437 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
438 return errors.New("ipv6 disabled")
440 if cl.rateLimitAccept(rip) {
441 return errors.New("source IP accepted rate limited")
443 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
444 return errors.New("bad source addr")
450 func (cl *Client) acceptConnections(l net.Listener) {
452 conn, err := l.Accept()
453 torrent.Add("client listener accepts", 1)
454 conn = pproffd.WrapNetConn(conn)
456 closed := cl.closed.IsSet()
459 reject = cl.rejectAccepted(conn)
469 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
474 torrent.Add("rejected accepted connections", 1)
475 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
478 go cl.incomingConnection(conn)
480 log.Fmsg("accepted %q connection at %q from %q",
484 ).SetLevel(log.Debug).Log(cl.logger)
485 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
486 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
487 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
492 func regularConnString(nc net.Conn) string {
493 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
496 func (cl *Client) incomingConnection(nc net.Conn) {
498 if tc, ok := nc.(*net.TCPConn); ok {
501 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
502 regularConnString(nc))
503 c.Discovery = PeerSourceIncoming
504 cl.runReceivedConn(c)
507 // Returns a handle to the given torrent, if it's present in the client.
508 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
511 t, ok = cl.torrents[ih]
515 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
516 return cl.torrents[ih]
519 type dialResult struct {
524 func countDialResult(err error) {
526 torrent.Add("successful dials", 1)
528 torrent.Add("unsuccessful dials", 1)
532 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
533 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
534 if ret < minDialTimeout {
540 // Returns whether an address is known to connect to a client with our own ID.
541 func (cl *Client) dopplegangerAddr(addr string) bool {
542 _, ok := cl.dopplegangerAddrs[addr]
546 // Returns a connection over UTP or TCP, whichever is first to connect.
547 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
549 t := perf.NewTimer(perf.CallerName(0))
552 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
554 t.Mark("returned conn over " + res.Network)
558 ctx, cancel := context.WithCancel(ctx)
559 // As soon as we return one connection, cancel the others.
562 resCh := make(chan dialResult, left)
566 cl.eachDialer(func(s Dialer) bool {
569 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
572 cl.dialFromSocket(ctx, s, addr),
573 s.LocalAddr().Network(),
580 // Wait for a successful connection.
582 defer perf.ScopeTimer()()
583 for ; left > 0 && res.Conn == nil; left-- {
587 // There are still incompleted dials.
589 for ; left > 0; left-- {
590 conn := (<-resCh).Conn
597 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
599 //if res.Conn != nil {
600 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
602 // cl.logger.Printf("failed to dial %s", addr)
607 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
608 network := s.LocalAddr().Network()
609 cte := cl.config.ConnTracker.Wait(
611 conntrack.Entry{network, s.LocalAddr().String(), addr},
612 "dial torrent client",
615 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
616 // which dial errors allow us to forget the connection tracking entry handle.
617 if ctx.Err() != nil {
623 c, err := s.Dial(ctx, addr)
624 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
625 // it now in case we close the connection forthwith.
626 if tc, ok := c.(*net.TCPConn); ok {
631 if err != nil && forgettableDialError(err) {
638 return closeWrapper{c, func() error {
645 func forgettableDialError(err error) bool {
646 return strings.Contains(err.Error(), "no suitable address found")
649 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
650 if _, ok := t.halfOpen[addr]; !ok {
651 panic("invariant broken")
653 delete(t.halfOpen, addr)
657 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
658 // for valid reasons.
659 func (cl *Client) handshakesConnection(
668 c *PeerConn, err error,
670 c = cl.newConnection(nc, true, remoteAddr, network, connString)
671 c.headerEncrypted = encryptHeader
672 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
674 dl, ok := ctx.Deadline()
678 err = nc.SetDeadline(dl)
682 err = cl.initiateHandshakes(c, t)
686 // Returns nil connection and nil error if no connection could be established for valid reasons.
687 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
688 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
691 return t.dialTimeout()
694 dr := cl.dialFirst(dialCtx, addr.String())
697 if dialCtx.Err() != nil {
698 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
700 return nil, errors.New("dial failed")
702 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network, regularConnString(nc))
709 // Returns nil connection and nil error if no connection could be established
710 // for valid reasons.
711 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
712 torrent.Add("establish outgoing connection", 1)
713 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
714 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
716 torrent.Add("initiated conn with preferred header obfuscation", 1)
719 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
720 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
721 // We should have just tried with the preferred header obfuscation. If it was required,
722 // there's nothing else to try.
725 // Try again with encryption if we didn't earlier, or without if we did.
726 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
728 torrent.Add("initiated conn with fallback header obfuscation", 1)
730 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
734 // Called to dial out and run a connection. The addr we're given is already
735 // considered half-open.
736 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
737 cl.dialRateLimiter.Wait(context.Background())
738 c, err := cl.establishOutgoingConn(t, addr)
741 // Don't release lock between here and addConnection, unless it's for
743 cl.noLongerHalfOpen(t, addr.String())
746 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
753 t.runHandshookConnLoggingErr(c)
756 // The port number for incoming peer connections. 0 if the client isn't listening.
757 func (cl *Client) incomingPeerPort() int {
758 return cl.LocalPort()
761 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
762 if c.headerEncrypted {
765 rw, c.cryptoMethod, err = mse.InitiateHandshake(
772 cl.config.CryptoProvides,
776 return xerrors.Errorf("header obfuscation handshake: %w", err)
779 ih, err := cl.connBtHandshake(c, &t.infoHash)
781 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
783 if ih != t.infoHash {
784 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
789 // Calls f with any secret keys.
790 func (cl *Client) forSkeys(f func([]byte) bool) {
793 if false { // Emulate the bug from #114
795 for ih := range cl.torrents {
799 for range cl.torrents {
806 for ih := range cl.torrents {
813 // Do encryption and bittorrent handshakes as receiver.
814 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
815 defer perf.ScopeTimerErr(&err)()
817 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
819 if err == nil || err == mse.ErrNoSecretKeyMatch {
820 if c.headerEncrypted {
821 torrent.Add("handshakes received encrypted", 1)
823 torrent.Add("handshakes received unencrypted", 1)
826 torrent.Add("handshakes received with error while handling encryption", 1)
829 if err == mse.ErrNoSecretKeyMatch {
834 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
835 err = errors.New("connection not have required header obfuscation")
838 ih, err := cl.connBtHandshake(c, nil)
840 err = xerrors.Errorf("during bt handshake: %w", err)
849 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
850 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
855 c.PeerExtensionBytes = res.PeerExtensionBits
856 c.PeerID = res.PeerID
857 c.completedHandshake = time.Now()
861 func (cl *Client) runReceivedConn(c *PeerConn) {
862 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
866 t, err := cl.receiveHandshakes(c)
869 "error receiving handshakes on %v: %s", c, err,
870 ).SetLevel(log.Debug).
872 "network", c.network,
874 torrent.Add("error receiving handshake", 1)
876 cl.onBadAccept(c.remoteAddr)
881 torrent.Add("received handshake for unloaded torrent", 1)
882 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
884 cl.onBadAccept(c.remoteAddr)
888 torrent.Add("received handshake for loaded torrent", 1)
891 t.runHandshookConnLoggingErr(c)
894 // Client lock must be held before entering this.
895 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
897 if c.PeerID == cl.peerID {
900 addr := c.conn.RemoteAddr().String()
901 cl.dopplegangerAddrs[addr] = struct{}{}
903 // Because the remote address is not necessarily the same as its client's torrent listen
904 // address, we won't record the remote address as a doppleganger. Instead, the initiator
905 // can record *us* as the doppleganger.
907 return errors.New("local and remote peer ids are the same")
909 c.conn.SetWriteDeadline(time.Time{})
910 c.r = deadlineReader{c.conn, c.r}
911 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
912 if connIsIpv6(c.conn) {
913 torrent.Add("completed handshake over ipv6", 1)
915 if err := t.addConnection(c); err != nil {
916 return fmt.Errorf("adding connection: %w", err)
918 defer t.dropConnection(c)
919 go c.writer(time.Minute)
920 cl.sendInitialMessages(c, t)
921 err := c.mainReadLoop()
923 return fmt.Errorf("main read loop: %w", err)
928 // See the order given in Transmission's tr_peerMsgsNew.
929 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
930 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
931 conn.post(pp.Message{
933 ExtendedID: pp.HandshakeExtendedID,
934 ExtendedPayload: func() []byte {
935 msg := pp.ExtendedHandshakeMessage{
936 M: map[pp.ExtensionName]pp.ExtensionNumber{
937 pp.ExtensionNameMetadata: metadataExtendedId,
939 V: cl.config.ExtendedHandshakeClientVersion,
940 Reqq: 64, // TODO: Really?
941 YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
942 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
943 Port: cl.incomingPeerPort(),
944 MetadataSize: torrent.metadataSize(),
945 // TODO: We can figured these out specific to the socket
947 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
948 Ipv6: cl.config.PublicIp6.To16(),
950 if !cl.config.DisablePEX {
951 msg.M[pp.ExtensionNamePex] = pexExtendedId
953 return bencode.MustMarshal(msg)
958 if conn.fastEnabled() {
959 if torrent.haveAllPieces() {
960 conn.post(pp.Message{Type: pp.HaveAll})
961 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
963 } else if !torrent.haveAnyPieces() {
964 conn.post(pp.Message{Type: pp.HaveNone})
965 conn.sentHaves.Clear()
971 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
972 conn.post(pp.Message{
979 func (cl *Client) dhtPort() (ret uint16) {
980 cl.eachDhtServer(func(s DhtServer) {
981 ret = uint16(missinggo.AddrPort(s.Addr()))
986 func (cl *Client) haveDhtServer() (ret bool) {
987 cl.eachDhtServer(func(_ DhtServer) {
993 // Process incoming ut_metadata message.
994 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
996 err := bencode.Unmarshal(payload, &d)
997 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
998 } else if err != nil {
999 return fmt.Errorf("error unmarshalling bencode: %s", err)
1001 msgType, ok := d["msg_type"]
1003 return errors.New("missing msg_type field")
1007 case pp.DataMetadataExtensionMsgType:
1008 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1009 if !c.requestedMetadataPiece(piece) {
1010 return fmt.Errorf("got unexpected piece %d", piece)
1012 c.metadataRequests[piece] = false
1013 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1014 if begin < 0 || begin >= len(payload) {
1015 return fmt.Errorf("data has bad offset in payload: %d", begin)
1017 t.saveMetadataPiece(piece, payload[begin:])
1018 c.lastUsefulChunkReceived = time.Now()
1019 return t.maybeCompleteMetadata()
1020 case pp.RequestMetadataExtensionMsgType:
1021 if !t.haveMetadataPiece(piece) {
1022 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1025 start := (1 << 14) * piece
1026 c.logger.Printf("sending metadata piece %d", piece)
1027 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1029 case pp.RejectMetadataExtensionMsgType:
1032 return errors.New("unknown msg_type value")
1036 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1037 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1038 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1043 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1047 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1050 if _, ok := cl.ipBlockRange(ip); ok {
1053 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1059 // Return a Torrent ready for insertion into a Client.
1060 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1061 // use provided storage, if provided
1062 storageClient := cl.defaultStorage
1063 if specStorage != nil {
1064 storageClient = storage.NewClient(specStorage)
1070 peers: prioritizedPeers{
1072 getPrio: func(p Peer) peerPriority {
1073 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1076 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1078 halfOpen: make(map[string]Peer),
1079 pieceStateChanges: pubsub.NewPubSub(),
1081 storageOpener: storageClient,
1082 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1084 networkingEnabled: true,
1085 metadataChanged: sync.Cond{
1089 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1090 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1091 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1092 return fmt.Sprintf("%v: %s", t, m.Text())
1094 t.setChunkSize(defaultChunkSize)
1098 // A file-like handle to some torrent data resource.
1099 type Handle interface {
1106 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1107 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1110 // Adds a torrent by InfoHash with a custom Storage implementation.
1111 // If the torrent already exists then this Storage is ignored and the
1112 // existing torrent returned with `new` set to `false`
1113 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1116 t, ok := cl.torrents[infoHash]
1122 t = cl.newTorrent(infoHash, specStorage)
1123 cl.eachDhtServer(func(s DhtServer) {
1124 go t.dhtAnnouncer(s)
1126 cl.torrents[infoHash] = t
1127 cl.clearAcceptLimits()
1128 t.updateWantPeersEvent()
1129 // Tickle Client.waitAccept, new torrent may want conns.
1130 cl.event.Broadcast()
1134 // Add or merge a torrent spec. If the torrent is already present, the
1135 // trackers will be merged with the existing ones. If the Info isn't yet
1136 // known, it will be set. The display name is replaced if the new spec
1137 // provides one. Returns new if the torrent wasn't already in the client.
1138 // Note that any `Storage` defined on the spec will be ignored if the
1139 // torrent is already present (i.e. `new` return value is `true`)
1140 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1141 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1142 if spec.DisplayName != "" {
1143 t.SetDisplayName(spec.DisplayName)
1145 if spec.InfoBytes != nil {
1146 err = t.SetInfoBytes(spec.InfoBytes)
1153 if spec.ChunkSize != 0 {
1154 t.setChunkSize(pp.Integer(spec.ChunkSize))
1156 t.addTrackers(spec.Trackers)
1161 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1162 t, ok := cl.torrents[infoHash]
1164 err = fmt.Errorf("no such torrent")
1171 delete(cl.torrents, infoHash)
1175 func (cl *Client) allTorrentsCompleted() bool {
1176 for _, t := range cl.torrents {
1180 if !t.haveAllPieces() {
1187 // Returns true when all torrents are completely downloaded and false if the
1188 // client is stopped before that.
1189 func (cl *Client) WaitAll() bool {
1192 for !cl.allTorrentsCompleted() {
1193 if cl.closed.IsSet() {
1201 // Returns handles to all the torrents loaded in the Client.
1202 func (cl *Client) Torrents() []*Torrent {
1205 return cl.torrentsAsSlice()
1208 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1209 for _, t := range cl.torrents {
1210 ret = append(ret, t)
1215 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1216 spec, err := TorrentSpecFromMagnetURI(uri)
1220 T, _, err = cl.AddTorrentSpec(spec)
1224 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1225 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1227 slices.MakeInto(&ss, mi.Nodes)
1232 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1233 mi, err := metainfo.LoadFromFile(filename)
1237 return cl.AddTorrent(mi)
1240 func (cl *Client) DhtServers() []DhtServer {
1241 return cl.dhtServers
1244 func (cl *Client) AddDHTNodes(nodes []string) {
1245 for _, n := range nodes {
1246 hmp := missinggo.SplitHostMaybePort(n)
1247 ip := net.ParseIP(hmp.Host)
1249 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1252 ni := krpc.NodeInfo{
1253 Addr: krpc.NodeAddr{
1258 cl.eachDhtServer(func(s DhtServer) {
1264 func (cl *Client) banPeerIP(ip net.IP) {
1265 cl.logger.Printf("banning ip %v", ip)
1266 if cl.badPeerIPs == nil {
1267 cl.badPeerIPs = make(map[string]struct{})
1269 cl.badPeerIPs[ip.String()] = struct{}{}
1272 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1278 PeerMaxRequests: 250,
1279 writeBuffer: new(bytes.Buffer),
1280 remoteAddr: remoteAddr,
1282 connString: connString,
1284 c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string {
1285 return fmt.Sprintf("%v: %s", c, m.Text())
1287 c.writerCond.L = cl.locker()
1288 c.setRW(connStatsReadWriter{nc, c})
1289 c.r = &rateLimitedReader{
1290 l: cl.config.DownloadRateLimiter,
1293 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1297 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1305 Addr: ipPortAddr{ip, port},
1306 Source: PeerSourceDhtAnnouncePeer,
1310 func firstNotNil(ips ...net.IP) net.IP {
1311 for _, ip := range ips {
1319 func (cl *Client) eachDialer(f func(Dialer) bool) {
1320 for _, s := range cl.dialers {
1327 func (cl *Client) eachListener(f func(Listener) bool) {
1328 for _, s := range cl.listeners {
1335 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1336 cl.eachListener(func(l Listener) bool {
1343 func (cl *Client) publicIp(peer net.IP) net.IP {
1344 // TODO: Use BEP 10 to determine how peers are seeing us.
1345 if peer.To4() != nil {
1347 cl.config.PublicIp4,
1348 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1353 cl.config.PublicIp6,
1354 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1358 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1359 l := cl.findListener(
1360 func(l net.Listener) bool {
1361 return f(addrIpOrNil(l.Addr()))
1367 return addrIpOrNil(l.Addr())
1370 // Our IP as a peer should see it.
1371 func (cl *Client) publicAddr(peer net.IP) IpPort {
1372 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1375 // ListenAddrs addresses currently being listened to.
1376 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1379 cl.eachListener(func(l Listener) bool {
1380 ret = append(ret, l.Addr())
1386 func (cl *Client) onBadAccept(addr net.Addr) {
1387 ipa, ok := tryIpPortFromNetAddr(addr)
1391 ip := maskIpForAcceptLimiting(ipa.IP)
1392 if cl.acceptLimiter == nil {
1393 cl.acceptLimiter = make(map[ipStr]int)
1395 cl.acceptLimiter[ipStr(ip.String())]++
1398 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1399 if ip4 := ip.To4(); ip4 != nil {
1400 return ip4.Mask(net.CIDRMask(24, 32))
1405 func (cl *Client) clearAcceptLimits() {
1406 cl.acceptLimiter = nil
1409 func (cl *Client) acceptLimitClearer() {
1412 case <-cl.closed.LockedChan(cl.locker()):
1414 case <-time.After(15 * time.Minute):
1416 cl.clearAcceptLimits()
1422 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1423 if cl.config.DisableAcceptRateLimiting {
1426 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1429 func (cl *Client) rLock() {
1433 func (cl *Client) rUnlock() {
1437 func (cl *Client) lock() {
1441 func (cl *Client) unlock() {
1445 func (cl *Client) locker() *lockWithDeferreds {
1449 func (cl *Client) String() string {
1450 return fmt.Sprintf("<%[1]T %[1]p>", cl)