X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=pexconn.go;h=9254f5e1876c15ceff293d9f37c687d004d73b5c;hb=HEAD;hp=d0308f756db44ddc9d134842f7f85ed5521fa529;hpb=4a06517856eff49efa03e9f6c550aa1baeb35500;p=btrtrc.git diff --git a/pexconn.go b/pexconn.go index d0308f75..9254f5e1 100644 --- a/pexconn.go +++ b/pexconn.go @@ -2,8 +2,10 @@ package torrent import ( "fmt" + "net/netip" "time" + g "github.com/anacrolix/generics" "github.com/anacrolix/log" pp "github.com/anacrolix/torrent/peer_protocol" @@ -26,6 +28,9 @@ type pexConnState struct { Listed bool info log.Logger dbg log.Logger + // Running record of live connections the remote end of the connection purports to have. + remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags] + lastRecv time.Time } func (s *pexConnState) IsEnabled() bool { @@ -67,6 +72,13 @@ func (s *pexConnState) genmsg() *pp.PexMsg { return &tx } +func (s *pexConnState) numPending() int { + if s.torrent == nil { + return 0 + } + return s.torrent.pex.numPending(s.last) +} + // Share is called from the writer goroutine if when it is woken up with the write buffers empty // Returns whether there's more room on the send buffer to write to. func (s *pexConnState) Share(postfn messageWriter) bool { @@ -86,36 +98,63 @@ func (s *pexConnState) Share(postfn messageWriter) bool { return true } -// Recv is called from the reader goroutine -func (s *pexConnState) Recv(payload []byte) error { - if !s.torrent.wantPeers() { - s.dbg.Printf("peer reserve ok, incoming PEX discarded") - return nil +func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) { + for _, dropped := range rx.Dropped { + addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped) + delete(s.remoteLiveConns, addrPort) } - if time.Now().Before(s.torrent.pex.rest) { - s.dbg.Printf("in cooldown period, incoming PEX discarded") - return nil + for _, dropped := range rx.Dropped6 { + addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped) + delete(s.remoteLiveConns, addrPort) + } + for i, added := range rx.Added { + addr := netip.AddrFrom4(*(*[4]byte)(added.IP.To4())) + addrPort := netip.AddrPortFrom(addr, uint16(added.Port)) + flags := g.SliceGet(rx.AddedFlags, i) + g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags) + } + for i, added := range rx.Added6 { + addr := netip.AddrFrom16(*(*[16]byte)(added.IP.To16())) + addrPort := netip.AddrPortFrom(addr, uint16(added.Port)) + flags := g.SliceGet(rx.Added6Flags, i) + g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags) } + return +} +// Recv is called from the reader goroutine +func (s *pexConnState) Recv(payload []byte) error { rx, err := pp.LoadPexMsg(payload) if err != nil { - return fmt.Errorf("error unmarshalling PEX message: %s", err) + return fmt.Errorf("unmarshalling pex message: %w", err) } - s.dbg.Print("incoming PEX message: ", rx) + s.dbg.Printf("received pex message: %v", rx) torrent.Add("pex added peers received", int64(len(rx.Added))) torrent.Add("pex added6 peers received", int64(len(rx.Added6))) + // "Clients must batch updates to send no more than 1 PEX message per minute." + timeSinceLastRecv := time.Since(s.lastRecv) + if timeSinceLastRecv < 45*time.Second { + return fmt.Errorf("last received only %v ago", timeSinceLastRecv) + } + s.lastRecv = time.Now() + s.updateRemoteLiveConns(rx) + var peers peerInfos peers.AppendFromPex(rx.Added6, rx.Added6Flags) peers.AppendFromPex(rx.Added, rx.AddedFlags) - s.dbg.Printf("adding %d peers from PEX", len(peers)) + if time.Now().Before(s.torrent.pex.rest) { + s.dbg.Printf("in cooldown period, incoming PEX discarded") + return nil + } + added := s.torrent.addPeers(peers) + s.dbg.Printf("got %v peers over pex, added %v", len(peers), added) + if len(peers) > 0 { s.torrent.pex.rest = time.Now().Add(pexInterval) - s.torrent.addPeers(peers) } // one day we may also want to: - // - check if the peer is not flooding us with PEX updates // - handle drops somehow // - detect malicious peers