pex.go | 3 +++ pexconn.go | 15 +++++++++++++-- diff --git a/pex.go b/pex.go index 83376d481c16b30b4d342c0322d90fc7742f84c5..a808af2bc0f9bc6c3724b5d8c4663e03182d67e6 100644 --- a/pex.go +++ b/pex.go @@ -3,6 +3,7 @@ import ( "net" "sync" + "time" "github.com/anacrolix/dht/v2/krpc" pp "github.com/anacrolix/torrent/peer_protocol" @@ -179,6 +180,7 @@ // Per-torrent PEX state type pexState struct { ev []pexEvent // event feed, append-only hold []pexEvent // delayed drops + rest time.Time // cooldown deadline on inbound nc int // net number of alive conns initCache pexMsgFactory // last generated initial message initSeq int // number of events which went into initCache @@ -190,6 +192,7 @@ func (s *pexState) Reset() { s.ev = nil s.hold = nil s.nc = 0 + s.rest = time.Time{} s.initLock.Lock() s.initCache = pexMsgFactory{} s.initSeq = 0 diff --git a/pexconn.go b/pexconn.go index b3719ec869f2a24618b3ec75934c11b17ca7fd09..2940c4d28ae3c3d8d549dbaa471a50da5849aab5 100644 --- a/pexconn.go +++ b/pexconn.go @@ -88,6 +88,15 @@ } // Recv is called from the reader goroutine func (s *pexConnState) Recv(payload []byte) error { + if !s.torrent.wantPeers() { + s.dbg.Printf("peer reserve ok, incoming PEX discarded") + return nil + } + if time.Now().Before(s.torrent.pex.rest) { + s.dbg.Printf("in cooldown period, incoming PEX discarded") + return nil + } + rx, err := pp.LoadPexMsg(payload) if err != nil { return fmt.Errorf("error unmarshalling PEX message: %s", err) @@ -100,8 +109,10 @@ var peers peerInfos peers.AppendFromPex(rx.Added6, rx.Added6Flags) peers.AppendFromPex(rx.Added, rx.AddedFlags) s.dbg.Printf("adding %d peers from PEX", len(peers)) - s.torrent.addPeers(peers) - // s.dbg.Print("known swarm now:", s.torrent.KnownSwarm()) + if len(peers) > 0 { + s.torrent.pex.rest = time.Now().Add(pexInterval) + s.torrent.addPeers(peers) + } // one day we may also want to: // - check if the peer is not flooding us with PEX updates