17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pubsub"
23 "github.com/anacrolix/missinggo/slices"
24 "github.com/anacrolix/missinggo/v2/pproffd"
25 "github.com/anacrolix/sync"
26 "github.com/anacrolix/torrent/tracker"
27 "github.com/anacrolix/torrent/webtorrent"
28 "github.com/davecgh/go-spew/spew"
29 "github.com/dustin/go-humanize"
30 "github.com/google/btree"
31 "github.com/pion/datachannel"
32 "golang.org/x/time/rate"
33 "golang.org/x/xerrors"
35 "github.com/anacrolix/missinggo/v2"
36 "github.com/anacrolix/missinggo/v2/conntrack"
38 "github.com/anacrolix/torrent/bencode"
39 "github.com/anacrolix/torrent/iplist"
40 "github.com/anacrolix/torrent/metainfo"
41 "github.com/anacrolix/torrent/mse"
42 pp "github.com/anacrolix/torrent/peer_protocol"
43 "github.com/anacrolix/torrent/storage"
46 // Clients contain zero or more Torrents. A Client manages a blocklist, the
47 // TCP/UDP protocol ports, and DHT as desired.
49 // An aggregate of stats over all connections. First in struct to ensure
50 // 64-bit alignment of fields. See #262.
55 closed missinggo.Event
61 defaultStorage *storage.Client
65 dhtServers []DhtServer
66 ipBlockList iplist.Ranger
68 // Set of addresses that have our client ID. This intentionally will
69 // include ourselves if we end up trying to connect to our own address
70 // through legitimate channels.
71 dopplegangerAddrs map[string]struct{}
72 badPeerIPs map[string]struct{}
73 torrents map[InfoHash]*Torrent
75 acceptLimiter map[ipStr]int
76 dialRateLimiter *rate.Limiter
78 websocketTrackers websocketTrackers
83 func (cl *Client) BadPeerIPs() []string {
86 return cl.badPeerIPsLocked()
89 func (cl *Client) badPeerIPsLocked() []string {
90 return slices.FromMapKeys(cl.badPeerIPs).([]string)
93 func (cl *Client) PeerID() PeerID {
97 // Returns the port number for the first listener that has one. No longer assumes that all port
98 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
100 func (cl *Client) LocalPort() (port int) {
101 cl.eachListener(func(l Listener) bool {
102 port = addrPortOrZero(l.Addr())
108 func writeDhtServerStatus(w io.Writer, s DhtServer) {
109 dhtStats := s.Stats()
110 fmt.Fprintf(w, " ID: %x\n", s.ID())
111 spew.Fdump(w, dhtStats)
114 // Writes out a human readable status of the client, such as for writing to a
116 func (cl *Client) WriteStatus(_w io.Writer) {
119 w := bufio.NewWriter(_w)
121 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
122 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
123 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
124 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
125 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
126 cl.eachDhtServer(func(s DhtServer) {
127 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
128 writeDhtServerStatus(w, s)
130 spew.Fdump(w, &cl.stats)
131 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
133 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
134 return l.InfoHash().AsString() < r.InfoHash().AsString()
137 fmt.Fprint(w, "<unknown name>")
139 fmt.Fprint(w, t.name())
143 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
145 w.WriteString("<missing metainfo>")
153 func (cl *Client) initLogger() {
154 cl.logger = cl.config.Logger.WithValues(cl)
155 if !cl.config.Debug {
156 cl.logger = cl.logger.FilterLevel(log.Info)
160 func (cl *Client) announceKey() int32 {
161 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
164 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
166 cfg = NewDefaultClientConfig()
176 dopplegangerAddrs: make(map[string]struct{}),
177 torrents: make(map[metainfo.Hash]*Torrent),
178 dialRateLimiter: rate.NewLimiter(10, 10),
180 go cl.acceptLimitClearer()
188 cl.event.L = cl.locker()
189 storageImpl := cfg.DefaultStorage
190 if storageImpl == nil {
191 // We'd use mmap by default but HFS+ doesn't support sparse files.
192 storageImplCloser := storage.NewFile(cfg.DataDir)
193 cl.onClose = append(cl.onClose, func() {
194 if err := storageImplCloser.Close(); err != nil {
195 cl.logger.Printf("error closing default storage: %s", err)
198 storageImpl = storageImplCloser
200 cl.defaultStorage = storage.NewClient(storageImpl)
201 if cfg.IPBlocklist != nil {
202 cl.ipBlockList = cfg.IPBlocklist
205 if cfg.PeerID != "" {
206 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
208 o := copy(cl.peerID[:], cfg.Bep20)
209 _, err = rand.Read(cl.peerID[o:])
211 panic("error generating peer id")
215 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
223 for _, _s := range sockets {
224 s := _s // Go is fucking retarded.
225 cl.onClose = append(cl.onClose, func() { s.Close() })
226 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
227 cl.dialers = append(cl.dialers, s)
228 cl.listeners = append(cl.listeners, s)
229 go cl.acceptConnections(s)
235 for _, s := range sockets {
236 if pc, ok := s.(net.PacketConn); ok {
237 ds, err := cl.newAnacrolixDhtServer(pc)
241 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
242 cl.onClose = append(cl.onClose, func() { ds.Close() })
247 cl.websocketTrackers = websocketTrackers{
250 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) tracker.AnnounceRequest {
253 return cl.torrents[infoHash].announceRequest(event)
255 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
258 t, ok := cl.torrents[dcc.InfoHash]
260 cl.logger.WithDefaultLevel(log.Warning).Printf(
261 "got webrtc conn for unloaded torrent with infohash %x",
267 go t.onWebRtcConn(dc, dcc)
274 func (cl *Client) AddDhtServer(d DhtServer) {
275 cl.dhtServers = append(cl.dhtServers, d)
278 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
279 // given address for any Torrent.
280 func (cl *Client) AddDialer(d Dialer) {
283 cl.dialers = append(cl.dialers, d)
284 for _, t := range cl.torrents {
289 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
291 func (cl *Client) AddListener(l Listener) {
292 cl.listeners = append(cl.listeners, l)
293 go cl.acceptConnections(l)
296 func (cl *Client) firewallCallback(net.Addr) bool {
298 block := !cl.wantConns()
301 torrent.Add("connections firewalled", 1)
303 torrent.Add("connections not firewalled", 1)
308 func (cl *Client) listenOnNetwork(n network) bool {
309 if n.Ipv4 && cl.config.DisableIPv4 {
312 if n.Ipv6 && cl.config.DisableIPv6 {
315 if n.Tcp && cl.config.DisableTCP {
318 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
324 func (cl *Client) listenNetworks() (ns []network) {
325 for _, n := range allPeerNetworks {
326 if cl.listenOnNetwork(n) {
333 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
334 cfg := dht.ServerConfig{
335 IPBlocklist: cl.ipBlockList,
337 OnAnnouncePeer: cl.onDHTAnnouncePeer,
338 PublicIP: func() net.IP {
339 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
340 return cl.config.PublicIp6
342 return cl.config.PublicIp4
344 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
345 ConnectionTracking: cl.config.ConnTracker,
346 OnQuery: cl.config.DHTOnQuery,
347 Logger: cl.logger.WithText(func(m log.Msg) string {
348 return fmt.Sprintf("dht server on %v: %s", conn.LocalAddr().String(), m.Text())
351 s, err = dht.NewServer(&cfg)
354 ts, err := s.Bootstrap()
356 cl.logger.Printf("error bootstrapping dht: %s", err)
358 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
364 func (cl *Client) Closed() <-chan struct{} {
370 func (cl *Client) eachDhtServer(f func(DhtServer)) {
371 for _, ds := range cl.dhtServers {
376 // Stops the client. All connections to peers are closed and all activity will
378 func (cl *Client) Close() {
382 for _, t := range cl.torrents {
385 for i := range cl.onClose {
386 cl.onClose[len(cl.onClose)-1-i]()
391 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
392 if cl.ipBlockList == nil {
395 return cl.ipBlockList.Lookup(ip)
398 func (cl *Client) ipIsBlocked(ip net.IP) bool {
399 _, blocked := cl.ipBlockRange(ip)
403 func (cl *Client) wantConns() bool {
404 for _, t := range cl.torrents {
412 func (cl *Client) waitAccept() {
414 if cl.closed.IsSet() {
424 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
425 func (cl *Client) rejectAccepted(conn net.Conn) error {
426 ra := conn.RemoteAddr()
427 if rip := addrIpOrNil(ra); rip != nil {
428 if cl.config.DisableIPv4Peers && rip.To4() != nil {
429 return errors.New("ipv4 peers disabled")
431 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
432 return errors.New("ipv4 disabled")
435 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
436 return errors.New("ipv6 disabled")
438 if cl.rateLimitAccept(rip) {
439 return errors.New("source IP accepted rate limited")
441 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
442 return errors.New("bad source addr")
448 func (cl *Client) acceptConnections(l net.Listener) {
450 conn, err := l.Accept()
451 torrent.Add("client listener accepts", 1)
452 conn = pproffd.WrapNetConn(conn)
454 closed := cl.closed.IsSet()
457 reject = cl.rejectAccepted(conn)
467 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
472 torrent.Add("rejected accepted connections", 1)
473 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
476 go cl.incomingConnection(conn)
478 log.Fmsg("accepted %q connection at %q from %q",
482 ).SetLevel(log.Debug).Log(cl.logger)
483 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
484 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
485 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
490 func regularConnString(nc net.Conn) string {
491 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
494 func (cl *Client) incomingConnection(nc net.Conn) {
496 if tc, ok := nc.(*net.TCPConn); ok {
499 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
500 regularConnString(nc))
501 c.Discovery = PeerSourceIncoming
502 cl.runReceivedConn(c)
505 // Returns a handle to the given torrent, if it's present in the client.
506 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
509 t, ok = cl.torrents[ih]
513 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
514 return cl.torrents[ih]
517 type dialResult struct {
522 func countDialResult(err error) {
524 torrent.Add("successful dials", 1)
526 torrent.Add("unsuccessful dials", 1)
530 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
531 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
532 if ret < minDialTimeout {
538 // Returns whether an address is known to connect to a client with our own ID.
539 func (cl *Client) dopplegangerAddr(addr string) bool {
540 _, ok := cl.dopplegangerAddrs[addr]
544 // Returns a connection over UTP or TCP, whichever is first to connect.
545 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
547 t := perf.NewTimer(perf.CallerName(0))
550 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
552 t.Mark("returned conn over " + res.Network)
556 ctx, cancel := context.WithCancel(ctx)
557 // As soon as we return one connection, cancel the others.
560 resCh := make(chan dialResult, left)
564 cl.eachDialer(func(s Dialer) bool {
567 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
570 cl.dialFromSocket(ctx, s, addr),
571 s.LocalAddr().Network(),
578 // Wait for a successful connection.
580 defer perf.ScopeTimer()()
581 for ; left > 0 && res.Conn == nil; left-- {
585 // There are still incompleted dials.
587 for ; left > 0; left-- {
588 conn := (<-resCh).Conn
595 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
597 //if res.Conn != nil {
598 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
600 // cl.logger.Printf("failed to dial %s", addr)
605 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
606 network := s.LocalAddr().Network()
607 cte := cl.config.ConnTracker.Wait(
609 conntrack.Entry{network, s.LocalAddr().String(), addr},
610 "dial torrent client",
613 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
614 // which dial errors allow us to forget the connection tracking entry handle.
615 if ctx.Err() != nil {
621 c, err := s.Dial(ctx, addr)
622 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
623 // it now in case we close the connection forthwith.
624 if tc, ok := c.(*net.TCPConn); ok {
629 if err != nil && forgettableDialError(err) {
636 return closeWrapper{c, func() error {
643 func forgettableDialError(err error) bool {
644 return strings.Contains(err.Error(), "no suitable address found")
647 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
648 if _, ok := t.halfOpen[addr]; !ok {
649 panic("invariant broken")
651 delete(t.halfOpen, addr)
655 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
656 // for valid reasons.
657 func (cl *Client) handshakesConnection(
666 c *PeerConn, err error,
668 c = cl.newConnection(nc, true, remoteAddr, network, connString)
669 c.headerEncrypted = encryptHeader
670 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
672 dl, ok := ctx.Deadline()
676 err = nc.SetDeadline(dl)
680 err = cl.initiateHandshakes(c, t)
684 // Returns nil connection and nil error if no connection could be established for valid reasons.
685 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
686 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
689 return t.dialTimeout()
692 dr := cl.dialFirst(dialCtx, addr.String())
695 if dialCtx.Err() != nil {
696 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
698 return nil, errors.New("dial failed")
700 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network, regularConnString(nc))
707 // Returns nil connection and nil error if no connection could be established
708 // for valid reasons.
709 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
710 torrent.Add("establish outgoing connection", 1)
711 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
712 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
714 torrent.Add("initiated conn with preferred header obfuscation", 1)
717 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
718 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
719 // We should have just tried with the preferred header obfuscation. If it was required,
720 // there's nothing else to try.
723 // Try again with encryption if we didn't earlier, or without if we did.
724 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
726 torrent.Add("initiated conn with fallback header obfuscation", 1)
728 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
732 // Called to dial out and run a connection. The addr we're given is already
733 // considered half-open.
734 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
735 cl.dialRateLimiter.Wait(context.Background())
736 c, err := cl.establishOutgoingConn(t, addr)
739 // Don't release lock between here and addConnection, unless it's for
741 cl.noLongerHalfOpen(t, addr.String())
744 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
751 t.runHandshookConnLoggingErr(c)
754 // The port number for incoming peer connections. 0 if the client isn't listening.
755 func (cl *Client) incomingPeerPort() int {
756 return cl.LocalPort()
759 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
760 if c.headerEncrypted {
763 rw, c.cryptoMethod, err = mse.InitiateHandshake(
770 cl.config.CryptoProvides,
774 return xerrors.Errorf("header obfuscation handshake: %w", err)
777 ih, err := cl.connBtHandshake(c, &t.infoHash)
779 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
781 if ih != t.infoHash {
782 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
787 // Calls f with any secret keys.
788 func (cl *Client) forSkeys(f func([]byte) bool) {
791 if false { // Emulate the bug from #114
793 for ih := range cl.torrents {
797 for range cl.torrents {
804 for ih := range cl.torrents {
811 // Do encryption and bittorrent handshakes as receiver.
812 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
813 defer perf.ScopeTimerErr(&err)()
815 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
817 if err == nil || err == mse.ErrNoSecretKeyMatch {
818 if c.headerEncrypted {
819 torrent.Add("handshakes received encrypted", 1)
821 torrent.Add("handshakes received unencrypted", 1)
824 torrent.Add("handshakes received with error while handling encryption", 1)
827 if err == mse.ErrNoSecretKeyMatch {
832 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
833 err = errors.New("connection not have required header obfuscation")
836 ih, err := cl.connBtHandshake(c, nil)
838 err = xerrors.Errorf("during bt handshake: %w", err)
847 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
848 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
853 c.PeerExtensionBytes = res.PeerExtensionBits
854 c.PeerID = res.PeerID
855 c.completedHandshake = time.Now()
859 func (cl *Client) runReceivedConn(c *PeerConn) {
860 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
864 t, err := cl.receiveHandshakes(c)
867 "error receiving handshakes on %v: %s", c, err,
868 ).SetLevel(log.Debug).
870 "network", c.network,
872 torrent.Add("error receiving handshake", 1)
874 cl.onBadAccept(c.remoteAddr)
879 torrent.Add("received handshake for unloaded torrent", 1)
880 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
882 cl.onBadAccept(c.remoteAddr)
886 torrent.Add("received handshake for loaded torrent", 1)
889 t.runHandshookConnLoggingErr(c)
892 // Client lock must be held before entering this.
893 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
895 if c.PeerID == cl.peerID {
898 addr := c.conn.RemoteAddr().String()
899 cl.dopplegangerAddrs[addr] = struct{}{}
901 // Because the remote address is not necessarily the same as its client's torrent listen
902 // address, we won't record the remote address as a doppleganger. Instead, the initiator
903 // can record *us* as the doppleganger.
905 return errors.New("local and remote peer ids are the same")
907 c.conn.SetWriteDeadline(time.Time{})
908 c.r = deadlineReader{c.conn, c.r}
909 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
910 if connIsIpv6(c.conn) {
911 torrent.Add("completed handshake over ipv6", 1)
913 if err := t.addConnection(c); err != nil {
914 return fmt.Errorf("adding connection: %w", err)
916 defer t.dropConnection(c)
917 go c.writer(time.Minute)
918 cl.sendInitialMessages(c, t)
919 err := c.mainReadLoop()
921 return fmt.Errorf("main read loop: %w", err)
926 // See the order given in Transmission's tr_peerMsgsNew.
927 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
928 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
929 conn.post(pp.Message{
931 ExtendedID: pp.HandshakeExtendedID,
932 ExtendedPayload: func() []byte {
933 msg := pp.ExtendedHandshakeMessage{
934 M: map[pp.ExtensionName]pp.ExtensionNumber{
935 pp.ExtensionNameMetadata: metadataExtendedId,
937 V: cl.config.ExtendedHandshakeClientVersion,
938 Reqq: 64, // TODO: Really?
939 YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
940 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
941 Port: cl.incomingPeerPort(),
942 MetadataSize: torrent.metadataSize(),
943 // TODO: We can figured these out specific to the socket
945 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
946 Ipv6: cl.config.PublicIp6.To16(),
948 if !cl.config.DisablePEX {
949 msg.M[pp.ExtensionNamePex] = pexExtendedId
951 return bencode.MustMarshal(msg)
956 if conn.fastEnabled() {
957 if torrent.haveAllPieces() {
958 conn.post(pp.Message{Type: pp.HaveAll})
959 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
961 } else if !torrent.haveAnyPieces() {
962 conn.post(pp.Message{Type: pp.HaveNone})
963 conn.sentHaves.Clear()
969 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
970 conn.post(pp.Message{
977 func (cl *Client) dhtPort() (ret uint16) {
978 cl.eachDhtServer(func(s DhtServer) {
979 ret = uint16(missinggo.AddrPort(s.Addr()))
984 func (cl *Client) haveDhtServer() (ret bool) {
985 cl.eachDhtServer(func(_ DhtServer) {
991 // Process incoming ut_metadata message.
992 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
994 err := bencode.Unmarshal(payload, &d)
995 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
996 } else if err != nil {
997 return fmt.Errorf("error unmarshalling bencode: %s", err)
999 msgType, ok := d["msg_type"]
1001 return errors.New("missing msg_type field")
1005 case pp.DataMetadataExtensionMsgType:
1006 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1007 if !c.requestedMetadataPiece(piece) {
1008 return fmt.Errorf("got unexpected piece %d", piece)
1010 c.metadataRequests[piece] = false
1011 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1012 if begin < 0 || begin >= len(payload) {
1013 return fmt.Errorf("data has bad offset in payload: %d", begin)
1015 t.saveMetadataPiece(piece, payload[begin:])
1016 c.lastUsefulChunkReceived = time.Now()
1017 return t.maybeCompleteMetadata()
1018 case pp.RequestMetadataExtensionMsgType:
1019 if !t.haveMetadataPiece(piece) {
1020 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1023 start := (1 << 14) * piece
1024 c.logger.Printf("sending metadata piece %d", piece)
1025 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1027 case pp.RejectMetadataExtensionMsgType:
1030 return errors.New("unknown msg_type value")
1034 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1035 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1036 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1041 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1045 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1048 if _, ok := cl.ipBlockRange(ip); ok {
1051 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1057 // Return a Torrent ready for insertion into a Client.
1058 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1059 // use provided storage, if provided
1060 storageClient := cl.defaultStorage
1061 if specStorage != nil {
1062 storageClient = storage.NewClient(specStorage)
1068 peers: prioritizedPeers{
1070 getPrio: func(p Peer) peerPriority {
1071 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1074 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1076 halfOpen: make(map[string]Peer),
1077 pieceStateChanges: pubsub.NewPubSub(),
1079 storageOpener: storageClient,
1080 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1082 networkingEnabled: true,
1083 metadataChanged: sync.Cond{
1087 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1088 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1089 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1090 return fmt.Sprintf("%v: %s", t, m.Text())
1092 t.setChunkSize(defaultChunkSize)
1096 // A file-like handle to some torrent data resource.
1097 type Handle interface {
1104 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1105 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1108 // Adds a torrent by InfoHash with a custom Storage implementation.
1109 // If the torrent already exists then this Storage is ignored and the
1110 // existing torrent returned with `new` set to `false`
1111 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1114 t, ok := cl.torrents[infoHash]
1120 t = cl.newTorrent(infoHash, specStorage)
1121 cl.eachDhtServer(func(s DhtServer) {
1122 go t.dhtAnnouncer(s)
1124 cl.torrents[infoHash] = t
1125 cl.clearAcceptLimits()
1126 t.updateWantPeersEvent()
1127 // Tickle Client.waitAccept, new torrent may want conns.
1128 cl.event.Broadcast()
1132 // Add or merge a torrent spec. If the torrent is already present, the
1133 // trackers will be merged with the existing ones. If the Info isn't yet
1134 // known, it will be set. The display name is replaced if the new spec
1135 // provides one. Returns new if the torrent wasn't already in the client.
1136 // Note that any `Storage` defined on the spec will be ignored if the
1137 // torrent is already present (i.e. `new` return value is `true`)
1138 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1139 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1140 if spec.DisplayName != "" {
1141 t.SetDisplayName(spec.DisplayName)
1143 if spec.InfoBytes != nil {
1144 err = t.SetInfoBytes(spec.InfoBytes)
1151 if spec.ChunkSize != 0 {
1152 t.setChunkSize(pp.Integer(spec.ChunkSize))
1154 t.addTrackers(spec.Trackers)
1159 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1160 t, ok := cl.torrents[infoHash]
1162 err = fmt.Errorf("no such torrent")
1169 delete(cl.torrents, infoHash)
1173 func (cl *Client) allTorrentsCompleted() bool {
1174 for _, t := range cl.torrents {
1178 if !t.haveAllPieces() {
1185 // Returns true when all torrents are completely downloaded and false if the
1186 // client is stopped before that.
1187 func (cl *Client) WaitAll() bool {
1190 for !cl.allTorrentsCompleted() {
1191 if cl.closed.IsSet() {
1199 // Returns handles to all the torrents loaded in the Client.
1200 func (cl *Client) Torrents() []*Torrent {
1203 return cl.torrentsAsSlice()
1206 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1207 for _, t := range cl.torrents {
1208 ret = append(ret, t)
1213 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1214 spec, err := TorrentSpecFromMagnetURI(uri)
1218 T, _, err = cl.AddTorrentSpec(spec)
1222 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1223 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1225 slices.MakeInto(&ss, mi.Nodes)
1230 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1231 mi, err := metainfo.LoadFromFile(filename)
1235 return cl.AddTorrent(mi)
1238 func (cl *Client) DhtServers() []DhtServer {
1239 return cl.dhtServers
1242 func (cl *Client) AddDHTNodes(nodes []string) {
1243 for _, n := range nodes {
1244 hmp := missinggo.SplitHostMaybePort(n)
1245 ip := net.ParseIP(hmp.Host)
1247 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1250 ni := krpc.NodeInfo{
1251 Addr: krpc.NodeAddr{
1256 cl.eachDhtServer(func(s DhtServer) {
1262 func (cl *Client) banPeerIP(ip net.IP) {
1263 cl.logger.Printf("banning ip %v", ip)
1264 if cl.badPeerIPs == nil {
1265 cl.badPeerIPs = make(map[string]struct{})
1267 cl.badPeerIPs[ip.String()] = struct{}{}
1270 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1276 PeerMaxRequests: 250,
1277 writeBuffer: new(bytes.Buffer),
1278 remoteAddr: remoteAddr,
1280 connString: connString,
1282 c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string {
1283 return fmt.Sprintf("%v: %s", c, m.Text())
1285 c.writerCond.L = cl.locker()
1286 c.setRW(connStatsReadWriter{nc, c})
1287 c.r = &rateLimitedReader{
1288 l: cl.config.DownloadRateLimiter,
1291 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1295 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1303 Addr: ipPortAddr{ip, port},
1304 Source: PeerSourceDhtAnnouncePeer,
1308 func firstNotNil(ips ...net.IP) net.IP {
1309 for _, ip := range ips {
1317 func (cl *Client) eachDialer(f func(Dialer) bool) {
1318 for _, s := range cl.dialers {
1325 func (cl *Client) eachListener(f func(Listener) bool) {
1326 for _, s := range cl.listeners {
1333 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1334 cl.eachListener(func(l Listener) bool {
1341 func (cl *Client) publicIp(peer net.IP) net.IP {
1342 // TODO: Use BEP 10 to determine how peers are seeing us.
1343 if peer.To4() != nil {
1345 cl.config.PublicIp4,
1346 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1351 cl.config.PublicIp6,
1352 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1356 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1357 l := cl.findListener(
1358 func(l net.Listener) bool {
1359 return f(addrIpOrNil(l.Addr()))
1365 return addrIpOrNil(l.Addr())
1368 // Our IP as a peer should see it.
1369 func (cl *Client) publicAddr(peer net.IP) IpPort {
1370 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1373 // ListenAddrs addresses currently being listened to.
1374 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1377 cl.eachListener(func(l Listener) bool {
1378 ret = append(ret, l.Addr())
1384 func (cl *Client) onBadAccept(addr net.Addr) {
1385 ipa, ok := tryIpPortFromNetAddr(addr)
1389 ip := maskIpForAcceptLimiting(ipa.IP)
1390 if cl.acceptLimiter == nil {
1391 cl.acceptLimiter = make(map[ipStr]int)
1393 cl.acceptLimiter[ipStr(ip.String())]++
1396 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1397 if ip4 := ip.To4(); ip4 != nil {
1398 return ip4.Mask(net.CIDRMask(24, 32))
1403 func (cl *Client) clearAcceptLimits() {
1404 cl.acceptLimiter = nil
1407 func (cl *Client) acceptLimitClearer() {
1410 case <-cl.closed.LockedChan(cl.locker()):
1412 case <-time.After(15 * time.Minute):
1414 cl.clearAcceptLimits()
1420 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1421 if cl.config.DisableAcceptRateLimiting {
1424 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1427 func (cl *Client) rLock() {
1431 func (cl *Client) rUnlock() {
1435 func (cl *Client) lock() {
1439 func (cl *Client) unlock() {
1443 func (cl *Client) locker() *lockWithDeferreds {
1447 func (cl *Client) String() string {
1448 return fmt.Sprintf("<%[1]T %[1]p>", cl)