t pexEventType
addr PeerRemoteAddr
f pp.PexPeerFlags
+ next *pexEvent // event feed list
}
// facilitates efficient de-duplication while generating PEX messages
me.dropped[key] = struct{}{}
}
-func (me *pexMsgFactory) addEvent(event pexEvent) {
+func (me *pexMsgFactory) append(event pexEvent) {
switch event.t {
case pexAdd:
me.add(event)
// Per-torrent PEX state
type pexState struct {
- ev []pexEvent // event feed, append-only
- hold []pexEvent // delayed drops
- rest time.Time // cooldown deadline on inbound
- nc int // net number of alive conns
- initCache pexMsgFactory // last generated initial message
- initSeq int // number of events which went into initCache
- initLock sync.RWMutex // serialise access to initCache and initSeq
+ sync.RWMutex
+ tail *pexEvent // event feed list
+ hold []pexEvent // delayed drops
+ rest time.Time // cooldown deadline on inbound
+ nc int // net number of alive conns
+ msg0 pexMsgFactory // initial message
}
// Reset wipes the state clean, releasing resources. Called from Torrent.Close().
func (s *pexState) Reset() {
- s.ev = nil
+ s.Lock()
+ defer s.Unlock()
+ s.tail = nil
s.hold = nil
s.nc = 0
s.rest = time.Time{}
- s.initLock.Lock()
- s.initCache = pexMsgFactory{}
- s.initSeq = 0
- s.initLock.Unlock()
+ s.msg0 = pexMsgFactory{}
+}
+
+func (s *pexState) append(e *pexEvent) {
+ if s.tail != nil {
+ s.tail.next = e
+ }
+ s.tail = e
+ s.msg0.append(*e)
}
func (s *pexState) Add(c *PeerConn) {
+ s.Lock()
+ defer s.Unlock()
s.nc++
if s.nc >= pexTargAdded {
- s.ev = append(s.ev, s.hold...)
+ for _, e := range s.hold {
+ ne := e
+ s.append(&ne)
+ }
s.hold = s.hold[:0]
}
e := c.pexEvent(pexAdd)
- s.ev = append(s.ev, e)
c.pex.Listed = true
+ s.append(&e)
}
func (s *pexState) Drop(c *PeerConn) {
// skip connections which were not previously Added
return
}
+ s.Lock()
+ defer s.Unlock()
e := c.pexEvent(pexDrop)
s.nc--
if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
s.hold = append(s.hold, e)
} else {
- s.ev = append(s.ev, e)
+ s.append(&e)
}
}
-// Generate a PEX message based on the event feed. Also returns an index to pass to the subsequent
-// calls, producing incremental deltas.
-func (s *pexState) Genmsg(start int) (pp.PexMsg, int) {
- if start == 0 {
- return s.genmsg0()
+// Generate a PEX message based on the event feed.
+// Also returns a pointer to pass to the subsequent calls
+// to produce incremental deltas.
+func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
+ s.RLock()
+ defer s.RUnlock()
+ if start == nil {
+ return s.msg0.PexMsg(), s.tail
}
-
- var factory pexMsgFactory
- n := start
- for _, e := range s.ev[start:] {
- if start > 0 && factory.DeltaLen() >= pexMaxDelta {
+ var msg pexMsgFactory
+ last := start
+ for e := start.next; e != nil; e = e.next {
+ if msg.DeltaLen() >= pexMaxDelta {
break
}
- factory.addEvent(e)
- n++
- }
- return factory.PexMsg(), n
-}
-
-func (s *pexState) genmsg0() (pp.PexMsg, int) {
- s.initLock.Lock()
- for _, e := range s.ev[s.initSeq:] {
- s.initCache.addEvent(e)
- s.initSeq++
+ msg.append(*e)
+ last = e
}
- s.initLock.Unlock()
- s.initLock.RLock()
- n := s.initSeq
- msg := s.initCache.PexMsg()
- s.initLock.RUnlock()
- return msg, n
+ return msg.PexMsg(), last
}
addrs4[0],
addrs4[1],
}
- f = pp.PexOutgoingConn
)
-func TestPexAdded(t *testing.T) {
- t.Run("noHold", func(t *testing.T) {
- s := new(pexState)
- s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0], outgoing: true}})
- targ := &pexState{
- ev: []pexEvent{
- {pexAdd, addrs[0], pp.PexOutgoingConn},
- },
- nc: 1,
- }
- require.EqualValues(t, targ, s)
- })
- t.Run("belowTarg", func(t *testing.T) {
- s := &pexState{
- hold: []pexEvent{
- {pexDrop, addrs[1], 0},
- },
- nc: 0,
- }
- s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}})
- targ := &pexState{
- hold: []pexEvent{
- {pexDrop, addrs[1], 0},
- },
- ev: []pexEvent{
- {pexAdd, addrs[0], 0},
- },
- nc: 1,
- }
- require.EqualValues(t, targ, s)
- })
- t.Run("aboveTarg", func(t *testing.T) {
- holdAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
- s := &pexState{
- hold: []pexEvent{
- {pexDrop, holdAddr, 0},
- },
- nc: pexTargAdded,
- }
- s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}})
- targ := &pexState{
- hold: []pexEvent{},
- ev: []pexEvent{
- {pexDrop, holdAddr, 0},
- {pexAdd, addrs[0], 0},
- },
- nc: pexTargAdded + 1,
- }
- require.EqualValues(t, targ, s)
- })
-}
-
-func TestPexDropped(t *testing.T) {
- t.Run("belowTarg", func(t *testing.T) {
- s := &pexState{nc: 1}
- s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}, pex: pexConnState{Listed: true}})
- targ := &pexState{
- hold: []pexEvent{{pexDrop, addrs[0], 0}},
- nc: 0,
- }
- require.EqualValues(t, targ, s)
- })
- t.Run("aboveTarg", func(t *testing.T) {
- s := &pexState{nc: pexTargAdded + 1}
- s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}, pex: pexConnState{Listed: true}})
- targ := &pexState{
- ev: []pexEvent{{pexDrop, addrs[0], 0}},
- nc: pexTargAdded,
- }
- require.EqualValues(t, targ, s)
- })
- t.Run("aboveTargNotListed", func(t *testing.T) {
- s := &pexState{nc: pexTargAdded + 1}
- s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}, pex: pexConnState{Listed: false}})
- targ := &pexState{nc: pexTargAdded + 1}
- require.EqualValues(t, targ, s)
- })
-}
-
func TestPexReset(t *testing.T) {
- s := &pexState{
- hold: []pexEvent{{pexDrop, addrs[0], 0}},
- ev: []pexEvent{{pexAdd, addrs[1], 0}},
- nc: 1,
+ s := &pexState{}
+ conns := []PeerConn{
+ {Peer: Peer{RemoteAddr: addrs[0]}},
+ {Peer: Peer{RemoteAddr: addrs[1]}},
+ {Peer: Peer{RemoteAddr: addrs[2]}},
}
+ s.Add(&conns[0])
+ s.Add(&conns[1])
+ s.Drop(&conns[0])
s.Reset()
targ := new(pexState)
require.EqualValues(t, targ, s)
}
var testcases = []struct {
- name string
- in *pexState
- arg int
- targM pp.PexMsg
- targS int
+ name string
+ in *pexState
+ targ pp.PexMsg
+ update func(*pexState)
+ targ1 pp.PexMsg
}{
{
- name: "empty",
- in: &pexState{},
- arg: 0,
- targM: pp.PexMsg{},
- targS: 0,
+ name: "empty",
+ in: &pexState{},
+ targ: pp.PexMsg{},
+ },
+ {
+ name: "add0",
+ in: func() *pexState {
+ s := new(pexState)
+ nullAddr := &net.TCPAddr{}
+ s.Add(&PeerConn{Peer: Peer{RemoteAddr: nullAddr}})
+ return s
+ }(),
+ targ: pp.PexMsg{},
+ },
+ {
+ name: "drop0",
+ in: func() *pexState {
+ nullAddr := &net.TCPAddr{}
+ s := new(pexState)
+ s.Drop(&PeerConn{Peer: Peer{RemoteAddr: nullAddr}, pex: pexConnState{Listed: true}})
+ return s
+ }(),
+ targ: pp.PexMsg{},
},
{
name: "add4",
- in: &pexState{
- ev: []pexEvent{
- {pexAdd, addrs[0], f},
- {pexAdd, addrs[1], f},
- {pexAdd, addrs[2], f},
- {pexAdd, addrs[3], f},
- },
- },
- arg: 0,
- targM: pp.PexMsg{
+ in: func() *pexState {
+ s := new(pexState)
+ s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}})
+ s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[1], outgoing: true}})
+ s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[2], outgoing: true}})
+ s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[3]}})
+ return s
+ }(),
+ targ: pp.PexMsg{
Added: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[2]),
mustNodeAddr(addrs[3]),
},
- AddedFlags: []pp.PexPeerFlags{f, f},
+ AddedFlags: []pp.PexPeerFlags{pp.PexOutgoingConn, 0},
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]),
mustNodeAddr(addrs[1]),
},
- Added6Flags: []pp.PexPeerFlags{f, f},
+ Added6Flags: []pp.PexPeerFlags{0, pp.PexOutgoingConn},
},
- targS: 4,
},
{
name: "drop2",
- arg: 0,
- in: &pexState{
- ev: []pexEvent{
- {pexDrop, addrs[0], f},
- {pexDrop, addrs[2], f},
- },
- },
- targM: pp.PexMsg{
+ in: func() *pexState {
+ s := &pexState{nc: pexTargAdded + 2}
+ s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}, pex: pexConnState{Listed: true}})
+ s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[2]}, pex: pexConnState{Listed: true}})
+ return s
+ }(),
+ targ: pp.PexMsg{
Dropped: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[2]),
},
mustNodeAddr(addrs[0]),
},
},
- targS: 2,
},
{
name: "add2drop1",
- arg: 0,
- in: &pexState{
- ev: []pexEvent{
- {pexAdd, addrs[0], f},
- {pexAdd, addrs[1], f},
- {pexDrop, addrs[0], f},
- },
- },
- targM: pp.PexMsg{
+ in: func() *pexState {
+ conns := []PeerConn{
+ {Peer: Peer{RemoteAddr: addrs[0]}},
+ {Peer: Peer{RemoteAddr: addrs[1]}},
+ {Peer: Peer{RemoteAddr: addrs[2]}},
+ }
+ s := &pexState{nc: pexTargAdded}
+ s.Add(&conns[0])
+ s.Add(&conns[1])
+ s.Drop(&conns[0])
+ s.Drop(&conns[2]) // to be ignored: it wasn't added
+ return s
+ }(),
+ targ: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]),
},
- Added6Flags: []pp.PexPeerFlags{f},
+ Added6Flags: []pp.PexPeerFlags{0},
},
- targS: 3,
},
{
name: "delayed",
- arg: 0,
- in: &pexState{
- ev: []pexEvent{
- {pexAdd, addrs[0], f},
- {pexAdd, addrs[1], f},
- {pexAdd, addrs[2], f},
- },
- hold: []pexEvent{
- {pexDrop, addrs[0], f},
- {pexDrop, addrs[2], f},
- {pexDrop, addrs[1], f},
- },
- },
- targM: pp.PexMsg{
+ in: func() *pexState {
+ conns := []PeerConn{
+ {Peer: Peer{RemoteAddr: addrs[0]}},
+ {Peer: Peer{RemoteAddr: addrs[1]}},
+ {Peer: Peer{RemoteAddr: addrs[2]}},
+ }
+ s := new(pexState)
+ s.Add(&conns[0])
+ s.Add(&conns[1])
+ s.Add(&conns[2])
+ s.Drop(&conns[0]) // on hold: s.nc < pexTargAdded
+ s.Drop(&conns[2])
+ s.Drop(&conns[1])
+ return s
+ }(),
+ targ: pp.PexMsg{
Added: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[2]),
},
- AddedFlags: []pp.PexPeerFlags{f},
+ AddedFlags: []pp.PexPeerFlags{0},
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]),
mustNodeAddr(addrs[1]),
},
- Added6Flags: []pp.PexPeerFlags{f, f},
+ Added6Flags: []pp.PexPeerFlags{0, 0},
+ },
+ },
+ {
+ name: "unheld",
+ in: func() *pexState {
+ conns := []PeerConn{
+ {Peer: Peer{RemoteAddr: addrs[0]}},
+ {Peer: Peer{RemoteAddr: addrs[1]}},
+ }
+ s := &pexState{nc: pexTargAdded - 1}
+ s.Add(&conns[0])
+ s.Drop(&conns[0]) // on hold: s.nc < pexTargAdded
+ s.Add(&conns[1]) // unholds the above
+ return s
+ }(),
+ targ: pp.PexMsg{
+ Added6: krpc.CompactIPv6NodeAddrs{
+ mustNodeAddr(addrs[1]),
+ },
+ Added6Flags: []pp.PexPeerFlags{0},
},
- targS: 3,
},
{
name: "followup",
- arg: 1,
- in: &pexState{
- ev: []pexEvent{
- {pexAdd, addrs[0], f},
- {pexAdd, addrs[1], f},
+ in: func() *pexState {
+ s := new(pexState)
+ s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}})
+ return s
+ }(),
+ targ: pp.PexMsg{
+ Added6: krpc.CompactIPv6NodeAddrs{
+ mustNodeAddr(addrs[0]),
},
+ Added6Flags: []pp.PexPeerFlags{0},
+ },
+ update: func(s *pexState) {
+ s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[1]}})
},
- targM: pp.PexMsg{
+ targ1: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]),
},
- Added6Flags: []pp.PexPeerFlags{f},
+ Added6Flags: []pp.PexPeerFlags{0},
},
- targS: 2,
},
}
ac.AssertEqual(t, ec)
}
-func TestPexGenmsg(t *testing.T) {
+func TestPexGenmsg0(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
- s := tc.in
- m, seen := s.Genmsg(tc.arg)
- assertPexMsgsEqual(t, tc.targM, m)
- require.EqualValues(t, tc.targS, seen)
+ s := *tc.in
+ m, last := s.Genmsg(nil)
+ assertPexMsgsEqual(t, tc.targ, m)
+ if tc.update != nil {
+ tc.update(&s)
+ m1, last := s.Genmsg(last)
+ assertPexMsgsEqual(t, tc.targ1, m1)
+ assert.NotNil(t, last)
+ }
})
}
}
for addr := range c {
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addr}})
}
- m, seq := s.Genmsg(0)
+ m, _ := s.Genmsg(nil)
- require.EqualValues(t, n, seq)
require.EqualValues(t, n, len(m.Added))
require.EqualValues(t, n, len(m.AddedFlags))
require.EqualValues(t, 0, len(m.Added6))
c := addrgen(npeers)
for addr := range c {
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addr}})
- s.Genmsg(0)
+ s.Genmsg(nil)
}
}
}
func BenchmarkPexInitial100(b *testing.B) { benchmarkPexInitialN(b, 100) }
func BenchmarkPexInitial200(b *testing.B) { benchmarkPexInitialN(b, 200) }
func BenchmarkPexInitial400(b *testing.B) { benchmarkPexInitialN(b, 400) }
-
-func TestPexAdd(t *testing.T) {
- t.Run("ipv4", func(t *testing.T) {
- addrs := addrs4
- var m pexMsgFactory
- m.addEvent(pexEvent{pexDrop, addrs[0], 0})
- m.addEvent(pexEvent{pexAdd, addrs[1], f})
- for _, addr := range addrs {
- m.addEvent(pexEvent{pexAdd, addr, f})
- }
- targ := pp.PexMsg{
- Added: krpc.CompactIPv4NodeAddrs{
- mustNodeAddr(addrs[1]),
- mustNodeAddr(addrs[2]),
- mustNodeAddr(addrs[3]),
- },
- AddedFlags: []pp.PexPeerFlags{f, f, f},
- }
- out := m.PexMsg()
- assertPexMsgsEqual(t, targ, out)
- })
- t.Run("ipv6", func(t *testing.T) {
- addrs := addrs6
- var m pexMsgFactory
- m.addEvent(pexEvent{pexDrop, addrs[0], 0})
- m.addEvent(pexEvent{pexAdd, addrs[1], f})
- for _, addr := range addrs {
- m.addEvent(pexEvent{pexAdd, addr, f})
- }
- targ := pp.PexMsg{
- Added6: krpc.CompactIPv6NodeAddrs{
- mustNodeAddr(addrs[1]),
- mustNodeAddr(addrs[2]),
- mustNodeAddr(addrs[3]),
- },
- Added6Flags: []pp.PexPeerFlags{f, f, f},
- }
- assertPexMsgsEqual(t, targ, m.PexMsg())
- })
- t.Run("empty", func(t *testing.T) {
- nullAddr := &net.TCPAddr{}
- var xm pexMsgFactory
- xm.addEvent(pexEvent{pexAdd, nullAddr, f})
- m := xm.PexMsg()
- require.EqualValues(t, 0, len(m.Added))
- require.EqualValues(t, 0, len(m.AddedFlags))
- require.EqualValues(t, 0, len(m.Added6))
- require.EqualValues(t, 0, len(m.Added6Flags))
- })
-}
-
-func TestPexDrop(t *testing.T) {
- t.Run("ipv4", func(t *testing.T) {
- addrs := addrs4
- var m pexMsgFactory
- m.addEvent(pexEvent{pexAdd, addrs[0], f})
- m.addEvent(pexEvent{pexDrop, addrs[1], 0})
- for _, addr := range addrs {
- m.addEvent(pexEvent{pexDrop, addr, 0})
- }
- targ := pp.PexMsg{
- Dropped: krpc.CompactIPv4NodeAddrs{
- mustNodeAddr(addrs[1]),
- mustNodeAddr(addrs[2]),
- mustNodeAddr(addrs[3]),
- },
- }
- assertPexMsgsEqual(t, targ, m.PexMsg())
- })
- t.Run("ipv6", func(t *testing.T) {
- addrs := addrs6
- var m pexMsgFactory
- m.addEvent(pexEvent{pexAdd, addrs[0], f})
- m.addEvent(pexEvent{pexDrop, addrs[1], 0})
- for _, addr := range addrs {
- m.addEvent(pexEvent{pexDrop, addr, 0})
- }
- targ := pp.PexMsg{
- Dropped6: krpc.CompactIPv6NodeAddrs{
- mustNodeAddr(addrs[1]),
- mustNodeAddr(addrs[2]),
- mustNodeAddr(addrs[3]),
- },
- }
- assertPexMsgsEqual(t, targ, m.PexMsg())
- })
- t.Run("empty", func(t *testing.T) {
- nullAddr := &net.TCPAddr{}
- var xm pexMsgFactory
- xm.addEvent(pexEvent{pexDrop, nullAddr, f})
- m := xm.PexMsg()
- require.EqualValues(t, 0, len(m.Dropped))
- require.EqualValues(t, 0, len(m.Dropped6))
- })
-}