X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=pexconn.go;h=9254f5e1876c15ceff293d9f37c687d004d73b5c;hb=HEAD;hp=dcb3136c3ecfb6190cdb4d377fd6a2b5298f54b0;hpb=86063859850cf1288a2773fc7c7a0c646513d597;p=btrtrc.git diff --git a/pexconn.go b/pexconn.go index dcb3136c..9254f5e1 100644 --- a/pexconn.go +++ b/pexconn.go @@ -2,8 +2,10 @@ package torrent import ( "fmt" + "net/netip" "time" + g "github.com/anacrolix/generics" "github.com/anacrolix/log" pp "github.com/anacrolix/torrent/peer_protocol" @@ -18,7 +20,7 @@ const ( type pexConnState struct { enabled bool xid pp.ExtensionNumber - seq int + last *pexEvent timer *time.Timer gate chan struct{} readyfn func() @@ -26,6 +28,9 @@ type pexConnState struct { Listed bool info log.Logger dbg log.Logger + // Running record of live connections the remote end of the connection purports to have. + remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags] + lastRecv time.Time } func (s *pexConnState) IsEnabled() bool { @@ -39,10 +44,10 @@ func (s *pexConnState) Init(c *PeerConn) { return } s.xid = xid - s.seq = 0 + s.last = nil s.torrent = c.t - s.info = c.t.cl.logger - s.dbg = c.logger + s.info = c.t.cl.logger.WithDefaultLevel(log.Info) + s.dbg = c.logger.WithDefaultLevel(log.Debug) s.readyfn = c.tickleWriter s.gate = make(chan struct{}, 1) s.timer = time.AfterFunc(0, func() { @@ -59,12 +64,19 @@ func (s *pexConnState) sched(delay time.Duration) { // generate next PEX message for the peer; returns nil if nothing yet to send func (s *pexConnState) genmsg() *pp.PexMsg { - tx, seq := s.torrent.pex.Genmsg(s.seq) + tx, last := s.torrent.pex.Genmsg(s.last) if tx.Len() == 0 { return nil } - s.seq = seq - return tx + s.last = last + return &tx +} + +func (s *pexConnState) numPending() int { + if s.torrent == nil { + return 0 + } + return s.torrent.pex.numPending(s.last) } // Share is called from the writer goroutine if when it is woken up with the write buffers empty @@ -86,25 +98,63 @@ func (s *pexConnState) Share(postfn messageWriter) bool { return true } +func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) { + for _, dropped := range rx.Dropped { + addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped) + delete(s.remoteLiveConns, addrPort) + } + for _, dropped := range rx.Dropped6 { + addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped) + delete(s.remoteLiveConns, addrPort) + } + for i, added := range rx.Added { + addr := netip.AddrFrom4(*(*[4]byte)(added.IP.To4())) + addrPort := netip.AddrPortFrom(addr, uint16(added.Port)) + flags := g.SliceGet(rx.AddedFlags, i) + g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags) + } + for i, added := range rx.Added6 { + addr := netip.AddrFrom16(*(*[16]byte)(added.IP.To16())) + addrPort := netip.AddrPortFrom(addr, uint16(added.Port)) + flags := g.SliceGet(rx.Added6Flags, i) + g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags) + } + return +} + // Recv is called from the reader goroutine func (s *pexConnState) Recv(payload []byte) error { rx, err := pp.LoadPexMsg(payload) if err != nil { - return fmt.Errorf("error unmarshalling PEX message: %s", err) + return fmt.Errorf("unmarshalling pex message: %w", err) } - s.dbg.Print("incoming PEX message: ", rx) + s.dbg.Printf("received pex message: %v", rx) torrent.Add("pex added peers received", int64(len(rx.Added))) torrent.Add("pex added6 peers received", int64(len(rx.Added6))) + // "Clients must batch updates to send no more than 1 PEX message per minute." + timeSinceLastRecv := time.Since(s.lastRecv) + if timeSinceLastRecv < 45*time.Second { + return fmt.Errorf("last received only %v ago", timeSinceLastRecv) + } + s.lastRecv = time.Now() + s.updateRemoteLiveConns(rx) + var peers peerInfos peers.AppendFromPex(rx.Added6, rx.Added6Flags) peers.AppendFromPex(rx.Added, rx.AddedFlags) - s.dbg.Printf("adding %d peers from PEX", len(peers)) - s.torrent.addPeers(peers) - // s.dbg.Print("known swarm now:", s.torrent.KnownSwarm()) + if time.Now().Before(s.torrent.pex.rest) { + s.dbg.Printf("in cooldown period, incoming PEX discarded") + return nil + } + added := s.torrent.addPeers(peers) + s.dbg.Printf("got %v peers over pex, added %v", len(peers), added) + + if len(peers) > 0 { + s.torrent.pex.rest = time.Now().Add(pexInterval) + } // one day we may also want to: - // - check if the peer is not flooding us with PEX updates // - handle drops somehow // - detect malicious peers