peerconn.go | 6 +++++- pex.go | 3 --- pexconn.go | 23 +++++++++-------------- diff --git a/peerconn.go b/peerconn.go index 1c55db625825f132d9e2314d975de09e53b84f86..1163ad7740d6143278fe8cc02681222e71925a90 100644 --- a/peerconn.go +++ b/peerconn.go @@ -874,7 +874,11 @@ case pexExtendedId: if !c.pex.IsEnabled() { return nil // or hang-up maybe? } - return c.pex.Recv(payload) + err = c.pex.Recv(payload) + if err != nil { + err = fmt.Errorf("receiving pex message: %w", err) + } + return default: return fmt.Errorf("unexpected extended message ID: %v", id) } diff --git a/pex.go b/pex.go index 4561c5e4d281ab8dea3d1269dab4ddba86f8fa0d..c5ed6099d3f046636e0f86f51abc7d3a3efa7929 100644 --- a/pex.go +++ b/pex.go @@ -3,7 +3,6 @@ import ( "net" "sync" - "time" "github.com/anacrolix/dht/v2/krpc" @@ -162,7 +161,6 @@ type pexState struct { sync.RWMutex tail *pexEvent // event feed list hold []pexEvent // delayed drops - rest time.Time // cooldown deadline on inbound nc int // net number of alive conns msg0 pexMsgFactory // initial message } @@ -174,7 +172,6 @@ defer s.Unlock() s.tail = nil s.hold = nil s.nc = 0 - s.rest = time.Time{} s.msg0 = pexMsgFactory{} } diff --git a/pexconn.go b/pexconn.go index 5ccc02200d96a8f4bb8cc01b3e8e0d84a91f1faa..b01edbb487a0207a67088b8f26f81e93638eb485 100644 --- a/pexconn.go +++ b/pexconn.go @@ -31,6 +31,7 @@ info log.Logger dbg log.Logger // Running record of live connections the remote end of the connection purports to have. remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags] + lastRecv time.Time } func (s *pexConnState) IsEnabled() bool { @@ -131,26 +132,20 @@ } s.dbg.Printf("received pex message: %v", rx) torrent.Add("pex added peers received", int64(len(rx.Added))) torrent.Add("pex added6 peers received", int64(len(rx.Added6))) - s.updateRemoteLiveConns(rx) - if !s.torrent.wantPeers() { - s.dbg.Printf("peer reserve ok, incoming PEX discarded") - return nil - } - // TODO: This should be per conn, not for the whole Torrent. - if time.Now().Before(s.torrent.pex.rest) { - s.dbg.Printf("in cooldown period, incoming PEX discarded") - return nil + // "Clients must batch updates to send no more than 1 PEX message per minute." + timeSinceLastRecv := time.Since(s.lastRecv) + if timeSinceLastRecv < 45*time.Second { + return fmt.Errorf("last received only %v ago", timeSinceLastRecv) } + s.lastRecv = time.Now() + s.updateRemoteLiveConns(rx) var peers peerInfos peers.AppendFromPex(rx.Added6, rx.Added6Flags) peers.AppendFromPex(rx.Added, rx.AddedFlags) - s.dbg.Printf("adding %d peers from PEX", len(peers)) - if len(peers) > 0 { - s.torrent.pex.rest = time.Now().Add(pexInterval) - s.torrent.addPeers(peers) - } + added := s.torrent.addPeers(peers) + s.dbg.Printf("got %v peers over pex, added %v", len(peers), added) // one day we may also want to: // - check if the peer is not flooding us with PEX updates