]> Sergey Matveev's repositories - btrtrc.git/blobdiff - pexconn.go
Drop support for go 1.20
[btrtrc.git] / pexconn.go
index d0308f756db44ddc9d134842f7f85ed5521fa529..9254f5e1876c15ceff293d9f37c687d004d73b5c 100644 (file)
@@ -2,8 +2,10 @@ package torrent
 
 import (
        "fmt"
+       "net/netip"
        "time"
 
+       g "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
 
        pp "github.com/anacrolix/torrent/peer_protocol"
@@ -26,6 +28,9 @@ type pexConnState struct {
        Listed  bool
        info    log.Logger
        dbg     log.Logger
+       // Running record of live connections the remote end of the connection purports to have.
+       remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
+       lastRecv        time.Time
 }
 
 func (s *pexConnState) IsEnabled() bool {
@@ -67,6 +72,13 @@ func (s *pexConnState) genmsg() *pp.PexMsg {
        return &tx
 }
 
+func (s *pexConnState) numPending() int {
+       if s.torrent == nil {
+               return 0
+       }
+       return s.torrent.pex.numPending(s.last)
+}
+
 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
 // Returns whether there's more room on the send buffer to write to.
 func (s *pexConnState) Share(postfn messageWriter) bool {
@@ -86,36 +98,63 @@ func (s *pexConnState) Share(postfn messageWriter) bool {
        return true
 }
 
-// Recv is called from the reader goroutine
-func (s *pexConnState) Recv(payload []byte) error {
-       if !s.torrent.wantPeers() {
-               s.dbg.Printf("peer reserve ok, incoming PEX discarded")
-               return nil
+func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
+       for _, dropped := range rx.Dropped {
+               addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
+               delete(s.remoteLiveConns, addrPort)
        }
-       if time.Now().Before(s.torrent.pex.rest) {
-               s.dbg.Printf("in cooldown period, incoming PEX discarded")
-               return nil
+       for _, dropped := range rx.Dropped6 {
+               addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
+               delete(s.remoteLiveConns, addrPort)
+       }
+       for i, added := range rx.Added {
+               addr := netip.AddrFrom4(*(*[4]byte)(added.IP.To4()))
+               addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
+               flags := g.SliceGet(rx.AddedFlags, i)
+               g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
+       }
+       for i, added := range rx.Added6 {
+               addr := netip.AddrFrom16(*(*[16]byte)(added.IP.To16()))
+               addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
+               flags := g.SliceGet(rx.Added6Flags, i)
+               g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
        }
+       return
+}
 
+// Recv is called from the reader goroutine
+func (s *pexConnState) Recv(payload []byte) error {
        rx, err := pp.LoadPexMsg(payload)
        if err != nil {
-               return fmt.Errorf("error unmarshalling PEX message: %s", err)
+               return fmt.Errorf("unmarshalling pex message: %w", err)
        }
-       s.dbg.Print("incoming PEX message: ", rx)
+       s.dbg.Printf("received pex message: %v", rx)
        torrent.Add("pex added peers received", int64(len(rx.Added)))
        torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
 
+       // "Clients must batch updates to send no more than 1 PEX message per minute."
+       timeSinceLastRecv := time.Since(s.lastRecv)
+       if timeSinceLastRecv < 45*time.Second {
+               return fmt.Errorf("last received only %v ago", timeSinceLastRecv)
+       }
+       s.lastRecv = time.Now()
+       s.updateRemoteLiveConns(rx)
+
        var peers peerInfos
        peers.AppendFromPex(rx.Added6, rx.Added6Flags)
        peers.AppendFromPex(rx.Added, rx.AddedFlags)
-       s.dbg.Printf("adding %d peers from PEX", len(peers))
+       if time.Now().Before(s.torrent.pex.rest) {
+               s.dbg.Printf("in cooldown period, incoming PEX discarded")
+               return nil
+       }
+       added := s.torrent.addPeers(peers)
+       s.dbg.Printf("got %v peers over pex, added %v", len(peers), added)
+
        if len(peers) > 0 {
                s.torrent.pex.rest = time.Now().Add(pexInterval)
-               s.torrent.addPeers(peers)
        }
 
        // one day we may also want to:
-       // - check if the peer is not flooding us with PEX updates
        // - handle drops somehow
        // - detect malicious peers