From 0c1a506e04a0760f567a010abf5778a22bb62989 Mon Sep 17 00:00:00 2001 From: Yaroslav Kolomiiets Date: Tue, 10 Nov 2020 12:32:40 +0000 Subject: [PATCH] optimise generation of the initial PEX --- pex.go | 33 ++++++++++++++++++++++++++++++--- pex_test.go | 20 +++++++++++++++++++- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/pex.go b/pex.go index a6295f36..7d2ba208 100644 --- a/pex.go +++ b/pex.go @@ -2,6 +2,7 @@ package torrent import ( "net" + "sync" "github.com/anacrolix/dht/v2/krpc" pp "github.com/anacrolix/torrent/peer_protocol" @@ -140,15 +141,23 @@ func shortestIP(ip net.IP) net.IP { // Per-torrent PEX state type pexState struct { - ev []pexEvent // event feed, append-only - hold []pexEvent // delayed drops - nc int // net number of alive conns + ev []pexEvent // event feed, append-only + hold []pexEvent // delayed drops + nc int // net number of alive conns + initCache pexMsgFactory // last generated initial message + initSeq int // number of events which went into initCache + initLock sync.RWMutex // serialise access to initCache and initSeq } +// Reset wipes the state clean, releasing resources. Called from Torrent.Close(). func (s *pexState) Reset() { s.ev = nil s.hold = nil s.nc = 0 + s.initLock.Lock() + s.initCache = pexMsgFactory{} + s.initSeq = 0 + s.initLock.Unlock() } func (s *pexState) Add(c *PeerConn) { @@ -179,6 +188,10 @@ func (s *pexState) Drop(c *PeerConn) { // Generate a PEX message based on the event feed. Also returns an index to pass to the subsequent // calls, producing incremental deltas. func (s *pexState) Genmsg(start int) (pp.PexMsg, int) { + if start == 0 { + return s.genmsg0() + } + var factory pexMsgFactory n := start for _, e := range s.ev[start:] { @@ -190,3 +203,17 @@ func (s *pexState) Genmsg(start int) (pp.PexMsg, int) { } return factory.PexMsg(), n } + +func (s *pexState) genmsg0() (pp.PexMsg, int) { + s.initLock.Lock() + for _, e := range s.ev[s.initSeq:] { + s.initCache.addEvent(e) + s.initSeq++ + } + s.initLock.Unlock() + s.initLock.RLock() + n := s.initSeq + msg := s.initCache.PexMsg() + s.initLock.RUnlock() + return msg, n +} diff --git a/pex_test.go b/pex_test.go index 006644c2..96b1ed71 100644 --- a/pex_test.go +++ b/pex_test.go @@ -307,11 +307,11 @@ func TestPexGenmsg(t *testing.T) { func addrgen(n int) chan net.Addr { c := make(chan net.Addr) go func() { + defer close(c) for i := 4747; i < 65535 && n > 0; i++ { c <- &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: i} n-- } - close(c) }() return c } @@ -335,6 +335,24 @@ func TestPexInitialNoCutoff(t *testing.T) { require.EqualValues(t, 0, len(m.Dropped6)) } +func benchmarkPexInitialN(b *testing.B, npeers int) { + for i := 0; i < b.N; i++ { + var s pexState + c := addrgen(npeers) + for addr := range c { + s.Add(&PeerConn{peer: peer{RemoteAddr: addr}}) + s.Genmsg(0) + } + } +} + +// obtain at least 5 points, e.g. to plot a graph +func BenchmarkPexInitial4(b *testing.B) { benchmarkPexInitialN(b, 4) } +func BenchmarkPexInitial50(b *testing.B) { benchmarkPexInitialN(b, 50) } +func BenchmarkPexInitial100(b *testing.B) { benchmarkPexInitialN(b, 100) } +func BenchmarkPexInitial200(b *testing.B) { benchmarkPexInitialN(b, 200) } +func BenchmarkPexInitial400(b *testing.B) { benchmarkPexInitialN(b, 400) } + func TestPexAdd(t *testing.T) { t.Run("ipv4", func(t *testing.T) { addrs := addrs4 -- 2.44.0