8 g "github.com/anacrolix/generics"
9 "github.com/anacrolix/log"
11 pp "github.com/anacrolix/torrent/peer_protocol"
15 pexRetryDelay = 10 * time.Second
16 pexInterval = 1 * time.Minute
19 // per-connection PEX state
20 type pexConnState struct {
22 xid pp.ExtensionNumber
31 // Running record of live connections the remote end of the connection purports to have.
32 remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
36 func (s *pexConnState) IsEnabled() bool {
40 // Init is called from the reader goroutine upon the extended handshake completion
41 func (s *pexConnState) Init(c *PeerConn) {
42 xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
43 if !ok || xid == 0 || c.t.cl.config.DisablePEX {
49 s.info = c.t.cl.logger.WithDefaultLevel(log.Info)
50 s.dbg = c.logger.WithDefaultLevel(log.Debug)
51 s.readyfn = c.tickleWriter
52 s.gate = make(chan struct{}, 1)
53 s.timer = time.AfterFunc(0, func() {
55 s.readyfn() // wake up the writer
60 // schedule next PEX message
61 func (s *pexConnState) sched(delay time.Duration) {
65 // generate next PEX message for the peer; returns nil if nothing yet to send
66 func (s *pexConnState) genmsg() *pp.PexMsg {
67 tx, last := s.torrent.pex.Genmsg(s.last)
75 func (s *pexConnState) numPending() int {
79 return s.torrent.pex.numPending(s.last)
82 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
83 // Returns whether there's more room on the send buffer to write to.
84 func (s *pexConnState) Share(postfn messageWriter) bool {
87 if tx := s.genmsg(); tx != nil {
88 s.dbg.Print("sending PEX message: ", tx)
89 flow := postfn(tx.Message(s.xid))
93 // no PEX to send this time - try again shortly
94 s.sched(pexRetryDelay)
101 func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
102 for _, dropped := range rx.Dropped {
103 addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
104 delete(s.remoteLiveConns, addrPort)
106 for _, dropped := range rx.Dropped6 {
107 addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
108 delete(s.remoteLiveConns, addrPort)
110 for i, added := range rx.Added {
111 addr := netip.AddrFrom4(*(*[4]byte)(added.IP.To4()))
112 addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
113 flags := g.SliceGet(rx.AddedFlags, i)
114 g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
116 for i, added := range rx.Added6 {
117 addr := netip.AddrFrom16(*(*[16]byte)(added.IP.To16()))
118 addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
119 flags := g.SliceGet(rx.Added6Flags, i)
120 g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
125 // Recv is called from the reader goroutine
126 func (s *pexConnState) Recv(payload []byte) error {
127 rx, err := pp.LoadPexMsg(payload)
129 return fmt.Errorf("unmarshalling pex message: %w", err)
131 s.dbg.Printf("received pex message: %v", rx)
132 torrent.Add("pex added peers received", int64(len(rx.Added)))
133 torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
135 // "Clients must batch updates to send no more than 1 PEX message per minute."
136 timeSinceLastRecv := time.Since(s.lastRecv)
137 if timeSinceLastRecv < 45*time.Second {
138 return fmt.Errorf("last received only %v ago", timeSinceLastRecv)
140 s.lastRecv = time.Now()
141 s.updateRemoteLiveConns(rx)
144 peers.AppendFromPex(rx.Added6, rx.Added6Flags)
145 peers.AppendFromPex(rx.Added, rx.AddedFlags)
146 added := s.torrent.addPeers(peers)
147 s.dbg.Printf("got %v peers over pex, added %v", len(peers), added)
149 // one day we may also want to:
150 // - handle drops somehow
151 // - detect malicious peers
156 func (s *pexConnState) Close() {