import (
"fmt"
+ "net/netip"
"time"
+ g "github.com/anacrolix/generics"
"github.com/anacrolix/log"
pp "github.com/anacrolix/torrent/peer_protocol"
type pexConnState struct {
enabled bool
xid pp.ExtensionNumber
- seq int
+ last *pexEvent
timer *time.Timer
gate chan struct{}
readyfn func()
Listed bool
info log.Logger
dbg log.Logger
+ // Running record of live connections the remote end of the connection purports to have.
+ remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
+ lastRecv time.Time
}
func (s *pexConnState) IsEnabled() bool {
return
}
s.xid = xid
- s.seq = 0
+ s.last = nil
s.torrent = c.t
s.info = c.t.cl.logger.WithDefaultLevel(log.Info)
s.dbg = c.logger.WithDefaultLevel(log.Debug)
// generate next PEX message for the peer; returns nil if nothing yet to send
func (s *pexConnState) genmsg() *pp.PexMsg {
- tx, seq := s.torrent.pex.Genmsg(s.seq)
+ tx, last := s.torrent.pex.Genmsg(s.last)
if tx.Len() == 0 {
return nil
}
- s.seq = seq
+ s.last = last
return &tx
}
+func (s *pexConnState) numPending() int {
+ if s.torrent == nil {
+ return 0
+ }
+ return s.torrent.pex.numPending(s.last)
+}
+
// Share is called from the writer goroutine if when it is woken up with the write buffers empty
// Returns whether there's more room on the send buffer to write to.
func (s *pexConnState) Share(postfn messageWriter) bool {
return true
}
-// Recv is called from the reader goroutine
-func (s *pexConnState) Recv(payload []byte) error {
- if !s.torrent.wantPeers() {
- s.dbg.Printf("peer reserve ok, incoming PEX discarded")
- return nil
+func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
+ for _, dropped := range rx.Dropped {
+ addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
+ delete(s.remoteLiveConns, addrPort)
}
- if time.Now().Before(s.torrent.pex.rest) {
- s.dbg.Printf("in cooldown period, incoming PEX discarded")
- return nil
+ for _, dropped := range rx.Dropped6 {
+ addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
+ delete(s.remoteLiveConns, addrPort)
+ }
+ for i, added := range rx.Added {
+ addr := netip.AddrFrom4(*(*[4]byte)(added.IP.To4()))
+ addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
+ flags := g.SliceGet(rx.AddedFlags, i)
+ g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
+ }
+ for i, added := range rx.Added6 {
+ addr := netip.AddrFrom16(*(*[16]byte)(added.IP.To16()))
+ addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
+ flags := g.SliceGet(rx.Added6Flags, i)
+ g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
}
+ return
+}
+// Recv is called from the reader goroutine
+func (s *pexConnState) Recv(payload []byte) error {
rx, err := pp.LoadPexMsg(payload)
if err != nil {
- return fmt.Errorf("error unmarshalling PEX message: %s", err)
+ return fmt.Errorf("unmarshalling pex message: %w", err)
}
- s.dbg.Print("incoming PEX message: ", rx)
+ s.dbg.Printf("received pex message: %v", rx)
torrent.Add("pex added peers received", int64(len(rx.Added)))
torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
+ // "Clients must batch updates to send no more than 1 PEX message per minute."
+ timeSinceLastRecv := time.Since(s.lastRecv)
+ if timeSinceLastRecv < 45*time.Second {
+ return fmt.Errorf("last received only %v ago", timeSinceLastRecv)
+ }
+ s.lastRecv = time.Now()
+ s.updateRemoteLiveConns(rx)
+
var peers peerInfos
peers.AppendFromPex(rx.Added6, rx.Added6Flags)
peers.AppendFromPex(rx.Added, rx.AddedFlags)
- s.dbg.Printf("adding %d peers from PEX", len(peers))
+ if time.Now().Before(s.torrent.pex.rest) {
+ s.dbg.Printf("in cooldown period, incoming PEX discarded")
+ return nil
+ }
+ added := s.torrent.addPeers(peers)
+ s.dbg.Printf("got %v peers over pex, added %v", len(peers), added)
+
if len(peers) > 0 {
s.torrent.pex.rest = time.Now().Add(pexInterval)
- s.torrent.addPeers(peers)
}
// one day we may also want to:
- // - check if the peer is not flooding us with PEX updates
// - handle drops somehow
// - detect malicious peers