import (
"net"
+ "sync"
"github.com/anacrolix/dht/v2/krpc"
pp "github.com/anacrolix/torrent/peer_protocol"
// Per-torrent PEX state
type pexState struct {
- ev []pexEvent // event feed, append-only
- hold []pexEvent // delayed drops
- nc int // net number of alive conns
+ ev []pexEvent // event feed, append-only
+ hold []pexEvent // delayed drops
+ nc int // net number of alive conns
+ initCache pexMsgFactory // last generated initial message
+ initSeq int // number of events which went into initCache
+ initLock sync.RWMutex // serialise access to initCache and initSeq
}
+// Reset wipes the state clean, releasing resources. Called from Torrent.Close().
func (s *pexState) Reset() {
s.ev = nil
s.hold = nil
s.nc = 0
+ s.initLock.Lock()
+ s.initCache = pexMsgFactory{}
+ s.initSeq = 0
+ s.initLock.Unlock()
}
func (s *pexState) Add(c *PeerConn) {
// Generate a PEX message based on the event feed. Also returns an index to pass to the subsequent
// calls, producing incremental deltas.
func (s *pexState) Genmsg(start int) (pp.PexMsg, int) {
+ if start == 0 {
+ return s.genmsg0()
+ }
+
var factory pexMsgFactory
n := start
for _, e := range s.ev[start:] {
}
return factory.PexMsg(), n
}
+
+func (s *pexState) genmsg0() (pp.PexMsg, int) {
+ s.initLock.Lock()
+ for _, e := range s.ev[s.initSeq:] {
+ s.initCache.addEvent(e)
+ s.initSeq++
+ }
+ s.initLock.Unlock()
+ s.initLock.RLock()
+ n := s.initSeq
+ msg := s.initCache.PexMsg()
+ s.initLock.RUnlock()
+ return msg, n
+}
func addrgen(n int) chan net.Addr {
c := make(chan net.Addr)
go func() {
+ defer close(c)
for i := 4747; i < 65535 && n > 0; i++ {
c <- &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: i}
n--
}
- close(c)
}()
return c
}
require.EqualValues(t, 0, len(m.Dropped6))
}
+func benchmarkPexInitialN(b *testing.B, npeers int) {
+ for i := 0; i < b.N; i++ {
+ var s pexState
+ c := addrgen(npeers)
+ for addr := range c {
+ s.Add(&PeerConn{peer: peer{RemoteAddr: addr}})
+ s.Genmsg(0)
+ }
+ }
+}
+
+// obtain at least 5 points, e.g. to plot a graph
+func BenchmarkPexInitial4(b *testing.B) { benchmarkPexInitialN(b, 4) }
+func BenchmarkPexInitial50(b *testing.B) { benchmarkPexInitialN(b, 50) }
+func BenchmarkPexInitial100(b *testing.B) { benchmarkPexInitialN(b, 100) }
+func BenchmarkPexInitial200(b *testing.B) { benchmarkPexInitialN(b, 200) }
+func BenchmarkPexInitial400(b *testing.B) { benchmarkPexInitialN(b, 400) }
+
func TestPexAdd(t *testing.T) {
t.Run("ipv4", func(t *testing.T) {
addrs := addrs4