From: Matt Joiner Date: Tue, 25 Apr 2023 02:18:49 +0000 (+1000) Subject: Rate limit received PEX messages per connection X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=6818a9f77342c7bbb9edb16f530fc2db9a5c7041;p=btrtrc.git Rate limit received PEX messages per connection --- diff --git a/peerconn.go b/peerconn.go index 1c55db62..1163ad77 100644 --- a/peerconn.go +++ b/peerconn.go @@ -874,7 +874,11 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err if !c.pex.IsEnabled() { return nil // or hang-up maybe? } - return c.pex.Recv(payload) + err = c.pex.Recv(payload) + if err != nil { + err = fmt.Errorf("receiving pex message: %w", err) + } + return default: return fmt.Errorf("unexpected extended message ID: %v", id) } diff --git a/pex.go b/pex.go index 4561c5e4..c5ed6099 100644 --- a/pex.go +++ b/pex.go @@ -3,7 +3,6 @@ package torrent import ( "net" "sync" - "time" "github.com/anacrolix/dht/v2/krpc" @@ -162,7 +161,6 @@ type pexState struct { sync.RWMutex tail *pexEvent // event feed list hold []pexEvent // delayed drops - rest time.Time // cooldown deadline on inbound nc int // net number of alive conns msg0 pexMsgFactory // initial message } @@ -174,7 +172,6 @@ func (s *pexState) Reset() { s.tail = nil s.hold = nil s.nc = 0 - s.rest = time.Time{} s.msg0 = pexMsgFactory{} } diff --git a/pexconn.go b/pexconn.go index 5ccc0220..b01edbb4 100644 --- a/pexconn.go +++ b/pexconn.go @@ -31,6 +31,7 @@ type pexConnState struct { dbg log.Logger // Running record of live connections the remote end of the connection purports to have. remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags] + lastRecv time.Time } func (s *pexConnState) IsEnabled() bool { @@ -131,26 +132,20 @@ func (s *pexConnState) Recv(payload []byte) error { s.dbg.Printf("received pex message: %v", rx) torrent.Add("pex added peers received", int64(len(rx.Added))) torrent.Add("pex added6 peers received", int64(len(rx.Added6))) - s.updateRemoteLiveConns(rx) - if !s.torrent.wantPeers() { - s.dbg.Printf("peer reserve ok, incoming PEX discarded") - return nil - } - // TODO: This should be per conn, not for the whole Torrent. - if time.Now().Before(s.torrent.pex.rest) { - s.dbg.Printf("in cooldown period, incoming PEX discarded") - return nil + // "Clients must batch updates to send no more than 1 PEX message per minute." + timeSinceLastRecv := time.Since(s.lastRecv) + if timeSinceLastRecv < 45*time.Second { + return fmt.Errorf("last received only %v ago", timeSinceLastRecv) } + s.lastRecv = time.Now() + s.updateRemoteLiveConns(rx) var peers peerInfos peers.AppendFromPex(rx.Added6, rx.Added6Flags) peers.AppendFromPex(rx.Added, rx.AddedFlags) - s.dbg.Printf("adding %d peers from PEX", len(peers)) - if len(peers) > 0 { - s.torrent.pex.rest = time.Now().Add(pexInterval) - s.torrent.addPeers(peers) - } + added := s.torrent.addPeers(peers) + s.dbg.Printf("got %v peers over pex, added %v", len(peers), added) // one day we may also want to: // - check if the peer is not flooding us with PEX updates