]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rate limit received PEX messages per connection
authorMatt Joiner <anacrolix@gmail.com>
Tue, 25 Apr 2023 02:18:49 +0000 (12:18 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 1 May 2023 00:19:26 +0000 (10:19 +1000)
peerconn.go
pex.go
pexconn.go

index 1c55db625825f132d9e2314d975de09e53b84f86..1163ad7740d6143278fe8cc02681222e71925a90 100644 (file)
@@ -874,7 +874,11 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
                if !c.pex.IsEnabled() {
                        return nil // or hang-up maybe?
                }
-               return c.pex.Recv(payload)
+               err = c.pex.Recv(payload)
+               if err != nil {
+                       err = fmt.Errorf("receiving pex message: %w", err)
+               }
+               return
        default:
                return fmt.Errorf("unexpected extended message ID: %v", id)
        }
diff --git a/pex.go b/pex.go
index 4561c5e4d281ab8dea3d1269dab4ddba86f8fa0d..c5ed6099d3f046636e0f86f51abc7d3a3efa7929 100644 (file)
--- a/pex.go
+++ b/pex.go
@@ -3,7 +3,6 @@ package torrent
 import (
        "net"
        "sync"
-       "time"
 
        "github.com/anacrolix/dht/v2/krpc"
 
@@ -162,7 +161,6 @@ type pexState struct {
        sync.RWMutex
        tail *pexEvent     // event feed list
        hold []pexEvent    // delayed drops
-       rest time.Time     // cooldown deadline on inbound
        nc   int           // net number of alive conns
        msg0 pexMsgFactory // initial message
 }
@@ -174,7 +172,6 @@ func (s *pexState) Reset() {
        s.tail = nil
        s.hold = nil
        s.nc = 0
-       s.rest = time.Time{}
        s.msg0 = pexMsgFactory{}
 }
 
index 5ccc02200d96a8f4bb8cc01b3e8e0d84a91f1faa..b01edbb487a0207a67088b8f26f81e93638eb485 100644 (file)
@@ -31,6 +31,7 @@ type pexConnState struct {
        dbg     log.Logger
        // Running record of live connections the remote end of the connection purports to have.
        remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
+       lastRecv        time.Time
 }
 
 func (s *pexConnState) IsEnabled() bool {
@@ -131,26 +132,20 @@ func (s *pexConnState) Recv(payload []byte) error {
        s.dbg.Printf("received pex message: %v", rx)
        torrent.Add("pex added peers received", int64(len(rx.Added)))
        torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
-       s.updateRemoteLiveConns(rx)
 
-       if !s.torrent.wantPeers() {
-               s.dbg.Printf("peer reserve ok, incoming PEX discarded")
-               return nil
-       }
-       // TODO: This should be per conn, not for the whole Torrent.
-       if time.Now().Before(s.torrent.pex.rest) {
-               s.dbg.Printf("in cooldown period, incoming PEX discarded")
-               return nil
+       // "Clients must batch updates to send no more than 1 PEX message per minute."
+       timeSinceLastRecv := time.Since(s.lastRecv)
+       if timeSinceLastRecv < 45*time.Second {
+               return fmt.Errorf("last received only %v ago", timeSinceLastRecv)
        }
+       s.lastRecv = time.Now()
+       s.updateRemoteLiveConns(rx)
 
        var peers peerInfos
        peers.AppendFromPex(rx.Added6, rx.Added6Flags)
        peers.AppendFromPex(rx.Added, rx.AddedFlags)
-       s.dbg.Printf("adding %d peers from PEX", len(peers))
-       if len(peers) > 0 {
-               s.torrent.pex.rest = time.Now().Add(pexInterval)
-               s.torrent.addPeers(peers)
-       }
+       added := s.torrent.addPeers(peers)
+       s.dbg.Printf("got %v peers over pex, added %v", len(peers), added)
 
        // one day we may also want to:
        // - check if the peer is not flooding us with PEX updates