8 g "github.com/anacrolix/generics"
9 "github.com/anacrolix/log"
11 pp "github.com/anacrolix/torrent/peer_protocol"
15 pexRetryDelay = 10 * time.Second
16 pexInterval = 1 * time.Minute
19 // per-connection PEX state
20 type pexConnState struct {
22 xid pp.ExtensionNumber
30 // Running record of live connections the remote end of the connection purports to have.
31 remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
35 func (s *pexConnState) IsEnabled() bool {
39 // Init is called from the reader goroutine upon the extended handshake completion
40 func (s *pexConnState) Init(c *PeerConn) {
41 xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
42 if !ok || xid == 0 || c.t.cl.config.DisablePEX {
48 s.logger = c.logger.WithDefaultLevel(log.Debug).WithNames("pex")
49 s.readyfn = c.tickleWriter
50 s.gate = make(chan struct{}, 1)
51 s.timer = time.AfterFunc(0, func() {
53 s.readyfn() // wake up the writer
58 // schedule next PEX message
59 func (s *pexConnState) sched(delay time.Duration) {
63 // generate next PEX message for the peer; returns nil if nothing yet to send
64 func (s *pexConnState) genmsg() *pp.PexMsg {
65 tx, last := s.torrent.pex.Genmsg(s.last)
73 func (s *pexConnState) numPending() int {
77 return s.torrent.pex.numPending(s.last)
80 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
81 // Returns whether there's more room on the send buffer to write to.
82 func (s *pexConnState) Share(postfn messageWriter) bool {
85 if tx := s.genmsg(); tx != nil {
86 s.logger.Print("sending PEX message: ", tx)
87 flow := postfn(tx.Message(s.xid))
91 // no PEX to send this time - try again shortly
92 s.sched(pexRetryDelay)
99 func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
100 for _, dropped := range rx.Dropped {
101 addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
102 delete(s.remoteLiveConns, addrPort)
104 for _, dropped := range rx.Dropped6 {
105 addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
106 delete(s.remoteLiveConns, addrPort)
108 for i, added := range rx.Added {
109 addr := netip.AddrFrom4(*(*[4]byte)(added.IP.To4()))
110 addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
111 flags := g.SliceGet(rx.AddedFlags, i)
112 g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
114 for i, added := range rx.Added6 {
115 addr := netip.AddrFrom16(*(*[16]byte)(added.IP.To16()))
116 addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
117 flags := g.SliceGet(rx.Added6Flags, i)
118 g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
123 // Recv is called from the reader goroutine
124 func (s *pexConnState) Recv(payload []byte) error {
125 rx, err := pp.LoadPexMsg(payload)
127 return fmt.Errorf("unmarshalling pex message: %w", err)
129 s.logger.Printf("received pex message: %v", rx)
130 torrent.Add("pex added peers received", int64(len(rx.Added)))
131 torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
133 // "Clients must batch updates to send no more than 1 PEX message per minute."
134 timeSinceLastRecv := time.Since(s.lastRecv)
135 if timeSinceLastRecv < 45*time.Second {
136 return fmt.Errorf("last received only %v ago", timeSinceLastRecv)
138 s.lastRecv = time.Now()
139 s.updateRemoteLiveConns(rx)
142 peers.AppendFromPex(rx.Added6, rx.Added6Flags)
143 peers.AppendFromPex(rx.Added, rx.AddedFlags)
144 if time.Now().Before(s.torrent.pex.rest) {
145 s.logger.Printf("in cooldown period, incoming PEX discarded")
148 added := s.torrent.addPeers(peers)
149 s.logger.Printf("got %v peers over pex, added %v", len(peers), added)
152 s.torrent.pex.rest = time.Now().Add(pexInterval)
155 // one day we may also want to:
156 // - handle drops somehow
157 // - detect malicious peers
162 func (s *pexConnState) Close() {