17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pubsub"
23 "github.com/anacrolix/missinggo/slices"
24 "github.com/anacrolix/missinggo/v2/pproffd"
25 "github.com/anacrolix/sync"
26 "github.com/anacrolix/torrent/tracker"
27 "github.com/anacrolix/torrent/webtorrent"
28 "github.com/davecgh/go-spew/spew"
29 "github.com/dustin/go-humanize"
30 "github.com/google/btree"
31 "github.com/pion/datachannel"
32 "golang.org/x/time/rate"
33 "golang.org/x/xerrors"
35 "github.com/anacrolix/missinggo/v2"
36 "github.com/anacrolix/missinggo/v2/conntrack"
38 "github.com/anacrolix/torrent/bencode"
39 "github.com/anacrolix/torrent/iplist"
40 "github.com/anacrolix/torrent/metainfo"
41 "github.com/anacrolix/torrent/mse"
42 pp "github.com/anacrolix/torrent/peer_protocol"
43 "github.com/anacrolix/torrent/storage"
46 // Clients contain zero or more Torrents. A Client manages a blocklist, the
47 // TCP/UDP protocol ports, and DHT as desired.
49 // An aggregate of stats over all connections. First in struct to ensure
50 // 64-bit alignment of fields. See #262.
55 closed missinggo.Event
61 defaultStorage *storage.Client
65 dhtServers []DhtServer
66 ipBlockList iplist.Ranger
68 // Set of addresses that have our client ID. This intentionally will
69 // include ourselves if we end up trying to connect to our own address
70 // through legitimate channels.
71 dopplegangerAddrs map[string]struct{}
72 badPeerIPs map[string]struct{}
73 torrents map[InfoHash]*Torrent
75 acceptLimiter map[ipStr]int
76 dialRateLimiter *rate.Limiter
78 websocketTrackers websocketTrackers
83 func (cl *Client) BadPeerIPs() []string {
86 return cl.badPeerIPsLocked()
89 func (cl *Client) badPeerIPsLocked() []string {
90 return slices.FromMapKeys(cl.badPeerIPs).([]string)
93 func (cl *Client) PeerID() PeerID {
97 // Returns the port number for the first listener that has one. No longer assumes that all port
98 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
100 func (cl *Client) LocalPort() (port int) {
101 cl.eachListener(func(l Listener) bool {
102 port = addrPortOrZero(l.Addr())
108 func writeDhtServerStatus(w io.Writer, s DhtServer) {
109 dhtStats := s.Stats()
110 fmt.Fprintf(w, " ID: %x\n", s.ID())
111 spew.Fdump(w, dhtStats)
114 // Writes out a human readable status of the client, such as for writing to a
116 func (cl *Client) WriteStatus(_w io.Writer) {
119 w := bufio.NewWriter(_w)
121 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
122 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
123 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
124 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
125 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
126 cl.eachDhtServer(func(s DhtServer) {
127 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
128 writeDhtServerStatus(w, s)
130 spew.Fdump(w, &cl.stats)
131 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
133 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
134 return l.InfoHash().AsString() < r.InfoHash().AsString()
137 fmt.Fprint(w, "<unknown name>")
139 fmt.Fprint(w, t.name())
145 "%f%% of %d bytes (%s)",
146 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
148 humanize.Bytes(uint64(*t.length)))
150 w.WriteString("<missing metainfo>")
158 func (cl *Client) initLogger() {
159 cl.logger = cl.config.Logger.WithValues(cl)
160 if !cl.config.Debug {
161 cl.logger = cl.logger.FilterLevel(log.Info)
165 func (cl *Client) announceKey() int32 {
166 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
169 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
171 cfg = NewDefaultClientConfig()
181 dopplegangerAddrs: make(map[string]struct{}),
182 torrents: make(map[metainfo.Hash]*Torrent),
183 dialRateLimiter: rate.NewLimiter(10, 10),
185 go cl.acceptLimitClearer()
193 cl.event.L = cl.locker()
194 storageImpl := cfg.DefaultStorage
195 if storageImpl == nil {
196 // We'd use mmap by default but HFS+ doesn't support sparse files.
197 storageImplCloser := storage.NewFile(cfg.DataDir)
198 cl.onClose = append(cl.onClose, func() {
199 if err := storageImplCloser.Close(); err != nil {
200 cl.logger.Printf("error closing default storage: %s", err)
203 storageImpl = storageImplCloser
205 cl.defaultStorage = storage.NewClient(storageImpl)
206 if cfg.IPBlocklist != nil {
207 cl.ipBlockList = cfg.IPBlocklist
210 if cfg.PeerID != "" {
211 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
213 o := copy(cl.peerID[:], cfg.Bep20)
214 _, err = rand.Read(cl.peerID[o:])
216 panic("error generating peer id")
220 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
228 for _, _s := range sockets {
229 s := _s // Go is fucking retarded.
230 cl.onClose = append(cl.onClose, func() { s.Close() })
231 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
232 cl.dialers = append(cl.dialers, s)
233 cl.listeners = append(cl.listeners, s)
234 go cl.acceptConnections(s)
240 for _, s := range sockets {
241 if pc, ok := s.(net.PacketConn); ok {
242 ds, err := cl.newAnacrolixDhtServer(pc)
246 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
247 cl.onClose = append(cl.onClose, func() { ds.Close() })
252 cl.websocketTrackers = websocketTrackers{
255 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
258 t, ok := cl.torrents[infoHash]
260 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
262 return t.announceRequest(event), nil
264 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
267 t, ok := cl.torrents[dcc.InfoHash]
269 cl.logger.WithDefaultLevel(log.Warning).Printf(
270 "got webrtc conn for unloaded torrent with infohash %x",
276 go t.onWebRtcConn(dc, dcc)
283 func (cl *Client) AddDhtServer(d DhtServer) {
284 cl.dhtServers = append(cl.dhtServers, d)
287 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
288 // given address for any Torrent.
289 func (cl *Client) AddDialer(d Dialer) {
292 cl.dialers = append(cl.dialers, d)
293 for _, t := range cl.torrents {
298 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
300 func (cl *Client) AddListener(l Listener) {
301 cl.listeners = append(cl.listeners, l)
302 go cl.acceptConnections(l)
305 func (cl *Client) firewallCallback(net.Addr) bool {
307 block := !cl.wantConns()
310 torrent.Add("connections firewalled", 1)
312 torrent.Add("connections not firewalled", 1)
317 func (cl *Client) listenOnNetwork(n network) bool {
318 if n.Ipv4 && cl.config.DisableIPv4 {
321 if n.Ipv6 && cl.config.DisableIPv6 {
324 if n.Tcp && cl.config.DisableTCP {
327 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
333 func (cl *Client) listenNetworks() (ns []network) {
334 for _, n := range allPeerNetworks {
335 if cl.listenOnNetwork(n) {
342 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
343 cfg := dht.ServerConfig{
344 IPBlocklist: cl.ipBlockList,
346 OnAnnouncePeer: cl.onDHTAnnouncePeer,
347 PublicIP: func() net.IP {
348 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
349 return cl.config.PublicIp6
351 return cl.config.PublicIp4
353 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
354 ConnectionTracking: cl.config.ConnTracker,
355 OnQuery: cl.config.DHTOnQuery,
356 Logger: cl.logger.WithText(func(m log.Msg) string {
357 return fmt.Sprintf("dht server on %v: %s", conn.LocalAddr().String(), m.Text())
360 s, err = dht.NewServer(&cfg)
363 ts, err := s.Bootstrap()
365 cl.logger.Printf("error bootstrapping dht: %s", err)
367 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
373 func (cl *Client) Closed() <-chan struct{} {
379 func (cl *Client) eachDhtServer(f func(DhtServer)) {
380 for _, ds := range cl.dhtServers {
385 // Stops the client. All connections to peers are closed and all activity will
387 func (cl *Client) Close() {
391 for _, t := range cl.torrents {
394 for i := range cl.onClose {
395 cl.onClose[len(cl.onClose)-1-i]()
400 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
401 if cl.ipBlockList == nil {
404 return cl.ipBlockList.Lookup(ip)
407 func (cl *Client) ipIsBlocked(ip net.IP) bool {
408 _, blocked := cl.ipBlockRange(ip)
412 func (cl *Client) wantConns() bool {
413 for _, t := range cl.torrents {
421 func (cl *Client) waitAccept() {
423 if cl.closed.IsSet() {
433 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
434 func (cl *Client) rejectAccepted(conn net.Conn) error {
435 ra := conn.RemoteAddr()
436 if rip := addrIpOrNil(ra); rip != nil {
437 if cl.config.DisableIPv4Peers && rip.To4() != nil {
438 return errors.New("ipv4 peers disabled")
440 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
441 return errors.New("ipv4 disabled")
444 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
445 return errors.New("ipv6 disabled")
447 if cl.rateLimitAccept(rip) {
448 return errors.New("source IP accepted rate limited")
450 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
451 return errors.New("bad source addr")
457 func (cl *Client) acceptConnections(l net.Listener) {
459 conn, err := l.Accept()
460 torrent.Add("client listener accepts", 1)
461 conn = pproffd.WrapNetConn(conn)
463 closed := cl.closed.IsSet()
466 reject = cl.rejectAccepted(conn)
476 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
481 torrent.Add("rejected accepted connections", 1)
482 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
485 go cl.incomingConnection(conn)
487 log.Fmsg("accepted %q connection at %q from %q",
491 ).SetLevel(log.Debug).Log(cl.logger)
492 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
493 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
494 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
499 func regularConnString(nc net.Conn) string {
500 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
503 func (cl *Client) incomingConnection(nc net.Conn) {
505 if tc, ok := nc.(*net.TCPConn); ok {
508 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
509 regularConnString(nc))
510 c.Discovery = PeerSourceIncoming
511 cl.runReceivedConn(c)
514 // Returns a handle to the given torrent, if it's present in the client.
515 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
518 t, ok = cl.torrents[ih]
522 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
523 return cl.torrents[ih]
526 type dialResult struct {
531 func countDialResult(err error) {
533 torrent.Add("successful dials", 1)
535 torrent.Add("unsuccessful dials", 1)
539 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
540 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
541 if ret < minDialTimeout {
547 // Returns whether an address is known to connect to a client with our own ID.
548 func (cl *Client) dopplegangerAddr(addr string) bool {
549 _, ok := cl.dopplegangerAddrs[addr]
553 // Returns a connection over UTP or TCP, whichever is first to connect.
554 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
556 t := perf.NewTimer(perf.CallerName(0))
559 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
561 t.Mark("returned conn over " + res.Network)
565 ctx, cancel := context.WithCancel(ctx)
566 // As soon as we return one connection, cancel the others.
569 resCh := make(chan dialResult, left)
573 cl.eachDialer(func(s Dialer) bool {
576 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
579 cl.dialFromSocket(ctx, s, addr),
580 s.LocalAddr().Network(),
587 // Wait for a successful connection.
589 defer perf.ScopeTimer()()
590 for ; left > 0 && res.Conn == nil; left-- {
594 // There are still incompleted dials.
596 for ; left > 0; left-- {
597 conn := (<-resCh).Conn
604 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
606 //if res.Conn != nil {
607 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
609 // cl.logger.Printf("failed to dial %s", addr)
614 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
615 network := s.LocalAddr().Network()
616 cte := cl.config.ConnTracker.Wait(
618 conntrack.Entry{network, s.LocalAddr().String(), addr},
619 "dial torrent client",
622 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
623 // which dial errors allow us to forget the connection tracking entry handle.
624 if ctx.Err() != nil {
630 c, err := s.Dial(ctx, addr)
631 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
632 // it now in case we close the connection forthwith.
633 if tc, ok := c.(*net.TCPConn); ok {
638 if err != nil && forgettableDialError(err) {
645 return closeWrapper{c, func() error {
652 func forgettableDialError(err error) bool {
653 return strings.Contains(err.Error(), "no suitable address found")
656 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
657 if _, ok := t.halfOpen[addr]; !ok {
658 panic("invariant broken")
660 delete(t.halfOpen, addr)
664 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
665 // for valid reasons.
666 func (cl *Client) initiateProtocolHandshakes(
670 outgoing, encryptHeader bool,
672 network, connString string,
674 c *PeerConn, err error,
676 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
677 c.headerEncrypted = encryptHeader
678 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
680 dl, ok := ctx.Deadline()
684 err = nc.SetDeadline(dl)
688 err = cl.initiateHandshakes(c, t)
692 // Returns nil connection and nil error if no connection could be established for valid reasons.
693 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
694 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
697 return t.dialTimeout()
700 dr := cl.dialFirst(dialCtx, addr.String())
703 if dialCtx.Err() != nil {
704 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
706 return nil, errors.New("dial failed")
708 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularConnString(nc))
715 // Returns nil connection and nil error if no connection could be established
716 // for valid reasons.
717 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
718 torrent.Add("establish outgoing connection", 1)
719 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
720 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
722 torrent.Add("initiated conn with preferred header obfuscation", 1)
725 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
726 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
727 // We should have just tried with the preferred header obfuscation. If it was required,
728 // there's nothing else to try.
731 // Try again with encryption if we didn't earlier, or without if we did.
732 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
734 torrent.Add("initiated conn with fallback header obfuscation", 1)
736 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
740 // Called to dial out and run a connection. The addr we're given is already
741 // considered half-open.
742 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
743 cl.dialRateLimiter.Wait(context.Background())
744 c, err := cl.establishOutgoingConn(t, addr)
747 // Don't release lock between here and addConnection, unless it's for
749 cl.noLongerHalfOpen(t, addr.String())
752 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
759 t.runHandshookConnLoggingErr(c)
762 // The port number for incoming peer connections. 0 if the client isn't listening.
763 func (cl *Client) incomingPeerPort() int {
764 return cl.LocalPort()
767 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
768 if c.headerEncrypted {
771 rw, c.cryptoMethod, err = mse.InitiateHandshake(
778 cl.config.CryptoProvides,
782 return xerrors.Errorf("header obfuscation handshake: %w", err)
785 ih, err := cl.connBtHandshake(c, &t.infoHash)
787 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
789 if ih != t.infoHash {
790 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
795 // Calls f with any secret keys.
796 func (cl *Client) forSkeys(f func([]byte) bool) {
799 if false { // Emulate the bug from #114
801 for ih := range cl.torrents {
805 for range cl.torrents {
812 for ih := range cl.torrents {
819 // Do encryption and bittorrent handshakes as receiver.
820 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
821 defer perf.ScopeTimerErr(&err)()
823 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
825 if err == nil || err == mse.ErrNoSecretKeyMatch {
826 if c.headerEncrypted {
827 torrent.Add("handshakes received encrypted", 1)
829 torrent.Add("handshakes received unencrypted", 1)
832 torrent.Add("handshakes received with error while handling encryption", 1)
835 if err == mse.ErrNoSecretKeyMatch {
840 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
841 err = errors.New("connection not have required header obfuscation")
844 ih, err := cl.connBtHandshake(c, nil)
846 err = xerrors.Errorf("during bt handshake: %w", err)
855 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
856 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
861 c.PeerExtensionBytes = res.PeerExtensionBits
862 c.PeerID = res.PeerID
863 c.completedHandshake = time.Now()
867 func (cl *Client) runReceivedConn(c *PeerConn) {
868 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
872 t, err := cl.receiveHandshakes(c)
875 "error receiving handshakes on %v: %s", c, err,
876 ).SetLevel(log.Debug).
878 "network", c.network,
880 torrent.Add("error receiving handshake", 1)
882 cl.onBadAccept(c.remoteAddr)
887 torrent.Add("received handshake for unloaded torrent", 1)
888 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
890 cl.onBadAccept(c.remoteAddr)
894 torrent.Add("received handshake for loaded torrent", 1)
897 t.runHandshookConnLoggingErr(c)
900 // Client lock must be held before entering this.
901 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
903 if c.PeerID == cl.peerID {
906 addr := c.conn.RemoteAddr().String()
907 cl.dopplegangerAddrs[addr] = struct{}{}
909 // Because the remote address is not necessarily the same as its client's torrent listen
910 // address, we won't record the remote address as a doppleganger. Instead, the initiator
911 // can record *us* as the doppleganger.
913 return errors.New("local and remote peer ids are the same")
915 c.conn.SetWriteDeadline(time.Time{})
916 c.r = deadlineReader{c.conn, c.r}
917 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
918 if connIsIpv6(c.conn) {
919 torrent.Add("completed handshake over ipv6", 1)
921 if err := t.addConnection(c); err != nil {
922 return fmt.Errorf("adding connection: %w", err)
924 defer t.dropConnection(c)
925 go c.writer(time.Minute)
926 cl.sendInitialMessages(c, t)
927 err := c.mainReadLoop()
929 return fmt.Errorf("main read loop: %w", err)
934 // See the order given in Transmission's tr_peerMsgsNew.
935 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
936 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
937 conn.post(pp.Message{
939 ExtendedID: pp.HandshakeExtendedID,
940 ExtendedPayload: func() []byte {
941 msg := pp.ExtendedHandshakeMessage{
942 M: map[pp.ExtensionName]pp.ExtensionNumber{
943 pp.ExtensionNameMetadata: metadataExtendedId,
945 V: cl.config.ExtendedHandshakeClientVersion,
946 Reqq: 64, // TODO: Really?
947 YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
948 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
949 Port: cl.incomingPeerPort(),
950 MetadataSize: torrent.metadataSize(),
951 // TODO: We can figured these out specific to the socket
953 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
954 Ipv6: cl.config.PublicIp6.To16(),
956 if !cl.config.DisablePEX {
957 msg.M[pp.ExtensionNamePex] = pexExtendedId
959 return bencode.MustMarshal(msg)
964 if conn.fastEnabled() {
965 if torrent.haveAllPieces() {
966 conn.post(pp.Message{Type: pp.HaveAll})
967 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
969 } else if !torrent.haveAnyPieces() {
970 conn.post(pp.Message{Type: pp.HaveNone})
971 conn.sentHaves.Clear()
977 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
978 conn.post(pp.Message{
985 func (cl *Client) dhtPort() (ret uint16) {
986 cl.eachDhtServer(func(s DhtServer) {
987 ret = uint16(missinggo.AddrPort(s.Addr()))
992 func (cl *Client) haveDhtServer() (ret bool) {
993 cl.eachDhtServer(func(_ DhtServer) {
999 // Process incoming ut_metadata message.
1000 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1001 var d map[string]int
1002 err := bencode.Unmarshal(payload, &d)
1003 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1004 } else if err != nil {
1005 return fmt.Errorf("error unmarshalling bencode: %s", err)
1007 msgType, ok := d["msg_type"]
1009 return errors.New("missing msg_type field")
1013 case pp.DataMetadataExtensionMsgType:
1014 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1015 if !c.requestedMetadataPiece(piece) {
1016 return fmt.Errorf("got unexpected piece %d", piece)
1018 c.metadataRequests[piece] = false
1019 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1020 if begin < 0 || begin >= len(payload) {
1021 return fmt.Errorf("data has bad offset in payload: %d", begin)
1023 t.saveMetadataPiece(piece, payload[begin:])
1024 c.lastUsefulChunkReceived = time.Now()
1025 return t.maybeCompleteMetadata()
1026 case pp.RequestMetadataExtensionMsgType:
1027 if !t.haveMetadataPiece(piece) {
1028 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1031 start := (1 << 14) * piece
1032 c.logger.Printf("sending metadata piece %d", piece)
1033 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1035 case pp.RejectMetadataExtensionMsgType:
1038 return errors.New("unknown msg_type value")
1042 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1043 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1044 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1049 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1053 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1056 if _, ok := cl.ipBlockRange(ip); ok {
1059 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1065 // Return a Torrent ready for insertion into a Client.
1066 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1067 // use provided storage, if provided
1068 storageClient := cl.defaultStorage
1069 if specStorage != nil {
1070 storageClient = storage.NewClient(specStorage)
1076 peers: prioritizedPeers{
1078 getPrio: func(p Peer) peerPriority {
1079 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1082 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1084 halfOpen: make(map[string]Peer),
1085 pieceStateChanges: pubsub.NewPubSub(),
1087 storageOpener: storageClient,
1088 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1090 networkingEnabled: true,
1091 metadataChanged: sync.Cond{
1095 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1096 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1097 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1098 return fmt.Sprintf("%v: %s", t, m.Text())
1100 t.setChunkSize(defaultChunkSize)
1104 // A file-like handle to some torrent data resource.
1105 type Handle interface {
1112 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1113 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1116 // Adds a torrent by InfoHash with a custom Storage implementation.
1117 // If the torrent already exists then this Storage is ignored and the
1118 // existing torrent returned with `new` set to `false`
1119 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1122 t, ok := cl.torrents[infoHash]
1128 t = cl.newTorrent(infoHash, specStorage)
1129 cl.eachDhtServer(func(s DhtServer) {
1130 go t.dhtAnnouncer(s)
1132 cl.torrents[infoHash] = t
1133 cl.clearAcceptLimits()
1134 t.updateWantPeersEvent()
1135 // Tickle Client.waitAccept, new torrent may want conns.
1136 cl.event.Broadcast()
1140 // Add or merge a torrent spec. If the torrent is already present, the
1141 // trackers will be merged with the existing ones. If the Info isn't yet
1142 // known, it will be set. The display name is replaced if the new spec
1143 // provides one. Returns new if the torrent wasn't already in the client.
1144 // Note that any `Storage` defined on the spec will be ignored if the
1145 // torrent is already present (i.e. `new` return value is `true`)
1146 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1147 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1148 if spec.DisplayName != "" {
1149 t.SetDisplayName(spec.DisplayName)
1151 if spec.InfoBytes != nil {
1152 err = t.SetInfoBytes(spec.InfoBytes)
1159 if spec.ChunkSize != 0 {
1160 t.setChunkSize(pp.Integer(spec.ChunkSize))
1162 t.addTrackers(spec.Trackers)
1167 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1168 t, ok := cl.torrents[infoHash]
1170 err = fmt.Errorf("no such torrent")
1177 delete(cl.torrents, infoHash)
1181 func (cl *Client) allTorrentsCompleted() bool {
1182 for _, t := range cl.torrents {
1186 if !t.haveAllPieces() {
1193 // Returns true when all torrents are completely downloaded and false if the
1194 // client is stopped before that.
1195 func (cl *Client) WaitAll() bool {
1198 for !cl.allTorrentsCompleted() {
1199 if cl.closed.IsSet() {
1207 // Returns handles to all the torrents loaded in the Client.
1208 func (cl *Client) Torrents() []*Torrent {
1211 return cl.torrentsAsSlice()
1214 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1215 for _, t := range cl.torrents {
1216 ret = append(ret, t)
1221 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1222 spec, err := TorrentSpecFromMagnetURI(uri)
1226 T, _, err = cl.AddTorrentSpec(spec)
1230 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1231 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1233 slices.MakeInto(&ss, mi.Nodes)
1238 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1239 mi, err := metainfo.LoadFromFile(filename)
1243 return cl.AddTorrent(mi)
1246 func (cl *Client) DhtServers() []DhtServer {
1247 return cl.dhtServers
1250 func (cl *Client) AddDHTNodes(nodes []string) {
1251 for _, n := range nodes {
1252 hmp := missinggo.SplitHostMaybePort(n)
1253 ip := net.ParseIP(hmp.Host)
1255 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1258 ni := krpc.NodeInfo{
1259 Addr: krpc.NodeAddr{
1264 cl.eachDhtServer(func(s DhtServer) {
1270 func (cl *Client) banPeerIP(ip net.IP) {
1271 cl.logger.Printf("banning ip %v", ip)
1272 if cl.badPeerIPs == nil {
1273 cl.badPeerIPs = make(map[string]struct{})
1275 cl.badPeerIPs[ip.String()] = struct{}{}
1278 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1284 PeerMaxRequests: 250,
1285 writeBuffer: new(bytes.Buffer),
1286 remoteAddr: remoteAddr,
1288 connString: connString,
1290 c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string {
1291 return fmt.Sprintf("%v: %s", c, m.Text())
1293 c.writerCond.L = cl.locker()
1294 c.setRW(connStatsReadWriter{nc, c})
1295 c.r = &rateLimitedReader{
1296 l: cl.config.DownloadRateLimiter,
1299 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1303 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1311 Addr: ipPortAddr{ip, port},
1312 Source: PeerSourceDhtAnnouncePeer,
1316 func firstNotNil(ips ...net.IP) net.IP {
1317 for _, ip := range ips {
1325 func (cl *Client) eachDialer(f func(Dialer) bool) {
1326 for _, s := range cl.dialers {
1333 func (cl *Client) eachListener(f func(Listener) bool) {
1334 for _, s := range cl.listeners {
1341 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1342 cl.eachListener(func(l Listener) bool {
1349 func (cl *Client) publicIp(peer net.IP) net.IP {
1350 // TODO: Use BEP 10 to determine how peers are seeing us.
1351 if peer.To4() != nil {
1353 cl.config.PublicIp4,
1354 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1359 cl.config.PublicIp6,
1360 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1364 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1365 l := cl.findListener(
1366 func(l net.Listener) bool {
1367 return f(addrIpOrNil(l.Addr()))
1373 return addrIpOrNil(l.Addr())
1376 // Our IP as a peer should see it.
1377 func (cl *Client) publicAddr(peer net.IP) IpPort {
1378 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1381 // ListenAddrs addresses currently being listened to.
1382 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1385 cl.eachListener(func(l Listener) bool {
1386 ret = append(ret, l.Addr())
1392 func (cl *Client) onBadAccept(addr net.Addr) {
1393 ipa, ok := tryIpPortFromNetAddr(addr)
1397 ip := maskIpForAcceptLimiting(ipa.IP)
1398 if cl.acceptLimiter == nil {
1399 cl.acceptLimiter = make(map[ipStr]int)
1401 cl.acceptLimiter[ipStr(ip.String())]++
1404 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1405 if ip4 := ip.To4(); ip4 != nil {
1406 return ip4.Mask(net.CIDRMask(24, 32))
1411 func (cl *Client) clearAcceptLimits() {
1412 cl.acceptLimiter = nil
1415 func (cl *Client) acceptLimitClearer() {
1418 case <-cl.closed.LockedChan(cl.locker()):
1420 case <-time.After(15 * time.Minute):
1422 cl.clearAcceptLimits()
1428 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1429 if cl.config.DisableAcceptRateLimiting {
1432 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1435 func (cl *Client) rLock() {
1439 func (cl *Client) rUnlock() {
1443 func (cl *Client) lock() {
1447 func (cl *Client) unlock() {
1451 func (cl *Client) locker() *lockWithDeferreds {
1455 func (cl *Client) String() string {
1456 return fmt.Sprintf("<%[1]T %[1]p>", cl)