]> Sergey Matveev's repositories - btrtrc.git/blobdiff - pexconn.go
Reintroduce torrent-wide PEX throttling
[btrtrc.git] / pexconn.go
index 5ccc02200d96a8f4bb8cc01b3e8e0d84a91f1faa..9254f5e1876c15ceff293d9f37c687d004d73b5c 100644 (file)
@@ -6,7 +6,6 @@ import (
        "time"
 
        g "github.com/anacrolix/generics"
-
        "github.com/anacrolix/log"
 
        pp "github.com/anacrolix/torrent/peer_protocol"
@@ -31,6 +30,7 @@ type pexConnState struct {
        dbg     log.Logger
        // Running record of live connections the remote end of the connection purports to have.
        remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
+       lastRecv        time.Time
 }
 
 func (s *pexConnState) IsEnabled() bool {
@@ -108,13 +108,13 @@ func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
                delete(s.remoteLiveConns, addrPort)
        }
        for i, added := range rx.Added {
-               addr := netip.AddrFrom4([4]byte(added.IP.To4()))
+               addr := netip.AddrFrom4(*(*[4]byte)(added.IP.To4()))
                addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
                flags := g.SliceGet(rx.AddedFlags, i)
                g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
        }
        for i, added := range rx.Added6 {
-               addr := netip.AddrFrom16([16]byte(added.IP.To16()))
+               addr := netip.AddrFrom16(*(*[16]byte)(added.IP.To16()))
                addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
                flags := g.SliceGet(rx.Added6Flags, i)
                g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
@@ -131,29 +131,30 @@ func (s *pexConnState) Recv(payload []byte) error {
        s.dbg.Printf("received pex message: %v", rx)
        torrent.Add("pex added peers received", int64(len(rx.Added)))
        torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
-       s.updateRemoteLiveConns(rx)
 
-       if !s.torrent.wantPeers() {
-               s.dbg.Printf("peer reserve ok, incoming PEX discarded")
-               return nil
+       // "Clients must batch updates to send no more than 1 PEX message per minute."
+       timeSinceLastRecv := time.Since(s.lastRecv)
+       if timeSinceLastRecv < 45*time.Second {
+               return fmt.Errorf("last received only %v ago", timeSinceLastRecv)
        }
-       // TODO: This should be per conn, not for the whole Torrent.
+       s.lastRecv = time.Now()
+       s.updateRemoteLiveConns(rx)
+
+       var peers peerInfos
+       peers.AppendFromPex(rx.Added6, rx.Added6Flags)
+       peers.AppendFromPex(rx.Added, rx.AddedFlags)
        if time.Now().Before(s.torrent.pex.rest) {
                s.dbg.Printf("in cooldown period, incoming PEX discarded")
                return nil
        }
+       added := s.torrent.addPeers(peers)
+       s.dbg.Printf("got %v peers over pex, added %v", len(peers), added)
 
-       var peers peerInfos
-       peers.AppendFromPex(rx.Added6, rx.Added6Flags)
-       peers.AppendFromPex(rx.Added, rx.AddedFlags)
-       s.dbg.Printf("adding %d peers from PEX", len(peers))
        if len(peers) > 0 {
                s.torrent.pex.rest = time.Now().Add(pexInterval)
-               s.torrent.addPeers(peers)
        }
 
        // one day we may also want to:
-       // - check if the peer is not flooding us with PEX updates
        // - handle drops somehow
        // - detect malicious peers