From: Matt Joiner Date: Thu, 22 Oct 2020 21:58:55 +0000 (+1100) Subject: Performance improvements to PEX X-Git-Tag: v1.19.0~69 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=c1d189ed31af99cbf6f10d9ffdc2cc4e19a73b91;p=btrtrc.git Performance improvements to PEX --- diff --git a/go.sum b/go.sum index 2653e801..e87b8af2 100644 --- a/go.sum +++ b/go.sum @@ -488,6 +488,7 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.1 h1:52QO5WkIUcHGIR7EnGagH88x1bUzqGXTC5/1bDTUQ7U= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -499,6 +500,10 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.6.2-0.20200818115829-54d05a4e1844 h1:LTTabKLNrhI/+rSSEG19b4pRD2ipmZx+DF5upbVZmrk= +github.com/stretchr/testify v1.6.2-0.20200818115829-54d05a4e1844/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/syncthing/syncthing v0.14.48-rc.4/go.mod h1:nw3siZwHPA6M8iSfjDCWQ402eqvEIasMQOE8nFOxy7M= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/tinylib/msgp v1.0.2 h1:DfdQrzQa7Yh2es9SuLkixqxuXS2SxsdYn0KbdrOGWD8= @@ -705,6 +710,8 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/peer_protocol/pex.go b/peer_protocol/pex.go index 91a4fdb8..bcf05f36 100644 --- a/peer_protocol/pex.go +++ b/peer_protocol/pex.go @@ -1,8 +1,6 @@ package peer_protocol import ( - "net" - "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/torrent/bencode" ) @@ -16,87 +14,8 @@ type PexMsg struct { Dropped6 krpc.CompactIPv6NodeAddrs `bencode:"dropped6"` } -func addrEqual(a, b *krpc.NodeAddr) bool { - return a.IP.Equal(b.IP) && a.Port == b.Port -} - -func addrIndex(v []krpc.NodeAddr, a *krpc.NodeAddr) int { - for i := range v { - if addrEqual(&v[i], a) { - return i - } - } - return -1 -} - -func (m *PexMsg) Add(addr krpc.NodeAddr, f PexPeerFlags) { - if addr.IP.To4() != nil { - if addrIndex(m.Added.NodeAddrs(), &addr) >= 0 { - // already added - return - } - if i := addrIndex(m.Dropped.NodeAddrs(), &addr); i >= 0 { - // on the dropped list - cancel out - m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...) - return - } - m.Added = append(m.Added, addr) - m.AddedFlags = append(m.AddedFlags, f) - } else if len(addr.IP) == net.IPv6len { - if addrIndex(m.Added6.NodeAddrs(), &addr) >= 0 { - // already added - return - } - if i := addrIndex(m.Dropped6.NodeAddrs(), &addr); i >= 0 { - // on the dropped list - cancel out - m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...) - return - } - m.Added6 = append(m.Added6, addr) - m.Added6Flags = append(m.Added6Flags, f) - } -} - -func (m *PexMsg) Drop(addr krpc.NodeAddr) { - if addr.IP.To4() != nil { - if addrIndex(m.Dropped.NodeAddrs(), &addr) >= 0 { - // already dropped - return - } - if i := addrIndex(m.Added.NodeAddrs(), &addr); i >= 0 { - // on the added list - cancel out - m.Added = append(m.Added[:i], m.Added[i+1:]...) - m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...) - return - } - m.Dropped = append(m.Dropped, addr) - } else if len(addr.IP) == net.IPv6len { - if addrIndex(m.Dropped6.NodeAddrs(), &addr) >= 0 { - // already dropped - return - } - if i := addrIndex(m.Added6.NodeAddrs(), &addr); i >= 0 { - // on the added list - cancel out - m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...) - m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...) - return - } - m.Dropped6 = append(m.Dropped6, addr) - } -} - func (m *PexMsg) Len() int { - return len(m.Added)+len(m.Added6)+len(m.Dropped)+len(m.Dropped6) -} - -// DeltaLen returns max of {added+added6, dropped+dropped6} -func (m *PexMsg) DeltaLen() int { - lenAdded := len(m.Added)+len(m.Added6) - lenDropped := len(m.Dropped)+len(m.Dropped6) - if lenAdded > lenDropped { - return lenAdded - } - return lenDropped + return len(m.Added) + len(m.Added6) + len(m.Dropped) + len(m.Dropped6) } func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message { @@ -108,15 +27,11 @@ func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message { } } -func LoadPexMsg(b []byte) (*PexMsg, error) { - m := new(PexMsg) - if err := bencode.Unmarshal(b, m); err != nil { - return nil, err - } - return m, nil +func LoadPexMsg(b []byte) (ret PexMsg, err error) { + err = bencode.Unmarshal(b, &ret) + return } - type PexPeerFlags byte func (me PexPeerFlags) Get(f PexPeerFlags) bool { diff --git a/peer_protocol/pex_test.go b/peer_protocol/pex_test.go index 7a493ead..374dfaa4 100644 --- a/peer_protocol/pex_test.go +++ b/peer_protocol/pex_test.go @@ -29,132 +29,6 @@ func TestEmptyPexMsg(t *testing.T) { require.NoError(t, bencode.Unmarshal(b, &pm)) } -func TestPexAdd(t *testing.T) { - addrs4 := []krpc.NodeAddr{ - krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0 - krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1 - krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2 - krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3 - } - addrs6 := []krpc.NodeAddr{ - krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0 - krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1 - krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2 - krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3 - } - f := PexPrefersEncryption | PexOutgoingConn - - t.Run("ipv4", func(t *testing.T) { - addrs := addrs4 - m := new(PexMsg) - m.Drop(addrs[0]) - m.Add(addrs[1], f) - for _, addr := range addrs { - m.Add(addr, f) - } - targ := &PexMsg{ - Added: krpc.CompactIPv4NodeAddrs{ - addrs[1], - addrs[2], - addrs[3], - }, - AddedFlags: []PexPeerFlags{f, f, f}, - Dropped: krpc.CompactIPv4NodeAddrs{}, - } - require.EqualValues(t, targ, m) - }) - t.Run("ipv6", func(t *testing.T) { - addrs := addrs6 - m := new(PexMsg) - m.Drop(addrs[0]) - m.Add(addrs[1], f) - for _, addr := range addrs { - m.Add(addr, f) - } - targ := &PexMsg{ - Added6: krpc.CompactIPv6NodeAddrs{ - addrs[1], - addrs[2], - addrs[3], - }, - Added6Flags: []PexPeerFlags{f, f, f}, - Dropped6: krpc.CompactIPv6NodeAddrs{}, - } - require.EqualValues(t, targ, m) - }) - t.Run("empty", func(t *testing.T) { - addr := krpc.NodeAddr{} - xm := new(PexMsg) - xm.Add(addr, f) - require.EqualValues(t, 0, len(xm.Added)) - require.EqualValues(t, 0, len(xm.AddedFlags)) - require.EqualValues(t, 0, len(xm.Added6)) - require.EqualValues(t, 0, len(xm.Added6Flags)) - }) -} - -func TestPexDrop(t *testing.T) { - addrs4 := []krpc.NodeAddr{ - krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0 - krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1 - krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2 - krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3 - } - addrs6 := []krpc.NodeAddr{ - krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0 - krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1 - krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2 - krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3 - } - f := PexPrefersEncryption | PexOutgoingConn - - t.Run("ipv4", func(t *testing.T) { - addrs := addrs4 - m := new(PexMsg) - m.Add(addrs[0], f) - m.Drop(addrs[1]) - for _, addr := range addrs { - m.Drop(addr) - } - targ := &PexMsg{ - AddedFlags: []PexPeerFlags{}, - Added: krpc.CompactIPv4NodeAddrs{}, - Dropped: krpc.CompactIPv4NodeAddrs{ - addrs[1], - addrs[2], - addrs[3], - }, - } - require.EqualValues(t, targ, m) - }) - t.Run("ipv6", func(t *testing.T) { - addrs := addrs6 - m := new(PexMsg) - m.Add(addrs[0], f) - m.Drop(addrs[1]) - for _, addr := range addrs { - m.Drop(addr) - } - targ := &PexMsg{ - Added6Flags: []PexPeerFlags{}, - Added6: krpc.CompactIPv6NodeAddrs{}, - Dropped6: krpc.CompactIPv6NodeAddrs{ - addrs[1], - addrs[2], - addrs[3], - }, - } - require.EqualValues(t, targ, m) - }) - t.Run("empty", func(t *testing.T) { - addr := krpc.NodeAddr{} - xm := new(PexMsg) - xm.Drop(addr) - require.EqualValues(t, 0, len(xm.Dropped)) - require.EqualValues(t, 0, len(xm.Dropped6)) - }) -} - func TestMarshalPexMessage(t *testing.T) { addr := krpc.NodeAddr{IP: net.IP{127, 0, 0, 1}, Port: 0x55aa} f := PexPrefersEncryption | PexOutgoingConn diff --git a/peerconn.go b/peerconn.go index c6019d29..312861ff 100644 --- a/peerconn.go +++ b/peerconn.go @@ -1598,6 +1598,8 @@ func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags { return f } +// This returns the address to use if we want to dial the peer again. It incorporates the peer's +// advertised listen port. func (c *PeerConn) dialAddr() net.Addr { if !c.outgoing && c.PeerListenPort != 0 { switch addr := c.RemoteAddr.(type) { diff --git a/pex.go b/pex.go index acedd71d..b626850b 100644 --- a/pex.go +++ b/pex.go @@ -28,19 +28,142 @@ type pexEvent struct { f pp.PexPeerFlags } -// records the event into the peer protocol PEX message -func (e *pexEvent) put(m *pp.PexMsg) { - switch e.t { +// Combines the node addr, as required for pp.PexMsg. +type pexMsgAdded struct { + krpc.NodeAddr + pp.PexPeerFlags +} + +// Makes generating a PexMsg more efficient. +type pexMsgFactory struct { + added map[string]pexMsgAdded + dropped map[string]krpc.NodeAddr +} + +func (me *pexMsgFactory) DeltaLen() int { + return int(max( + int64(len(me.added)), + int64(len(me.dropped)))) +} + +// Returns the key to use to identify a given addr in the factory. Panics if we can't support the +// addr later in generating a PexMsg (since adding an unusable addr will cause DeltaLen to be out.) +func (me *pexMsgFactory) addrKey(addr krpc.NodeAddr) string { + if addr.IP.To4() != nil { + addr.IP = addr.IP.To4() + } + keyBytes, err := addr.MarshalBinary() + if err != nil { + panic(err) + } + switch len(keyBytes) { + case compactIpv4NodeAddrElemSize: + case compactIpv6NodeAddrElemSize: + default: + panic(len(keyBytes)) + } + return string(keyBytes) +} + +// Returns whether the entry was added (we can check if we're cancelling out another entry and so +// won't hit the limit consuming this event). +func (me *pexMsgFactory) Add(addr krpc.NodeAddr, flags pp.PexPeerFlags) bool { + key := me.addrKey(addr) + if _, ok := me.dropped[key]; ok { + delete(me.dropped, key) + return true + } + if me.DeltaLen() >= pexMaxDelta { + return false + } + if me.added == nil { + me.added = make(map[string]pexMsgAdded, pexMaxDelta) + } + me.added[key] = pexMsgAdded{addr, flags} + return true + +} + +// Returns whether the entry was added (we can check if we're cancelling out another entry and so +// won't hit the limit consuming this event). +func (me *pexMsgFactory) Drop(addr krpc.NodeAddr) bool { + key := me.addrKey(addr) + if _, ok := me.added[key]; ok { + delete(me.added, key) + return true + } + if me.DeltaLen() >= pexMaxDelta { + return false + } + if me.dropped == nil { + me.dropped = make(map[string]krpc.NodeAddr, pexMaxDelta) + } + me.dropped[key] = addr + return true +} + +// Returns whether the entry was added (we can check if we're cancelling out another entry and so +// won't hit the limit consuming this event). +func (me *pexMsgFactory) addEvent(event pexEvent) bool { + addr, ok := nodeAddr(event.addr) + if !ok { + return true + } + switch event.t { case pexAdd: - m.Add(nodeAddr(e.addr), e.f) + return me.Add(addr, event.f) case pexDrop: - m.Drop(nodeAddr(e.addr)) + return me.Drop(addr) + default: + panic(event.t) } } -func nodeAddr(addr net.Addr) krpc.NodeAddr { - ipport, _ := tryIpPortFromNetAddr(addr) - return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port} +var compactIpv4NodeAddrElemSize = krpc.CompactIPv4NodeAddrs{}.ElemSize() +var compactIpv6NodeAddrElemSize = krpc.CompactIPv6NodeAddrs{}.ElemSize() + +func (me *pexMsgFactory) PexMsg() (ret pp.PexMsg) { + for key, added := range me.added { + switch len(key) { + case compactIpv4NodeAddrElemSize: + ret.Added = append(ret.Added, added.NodeAddr) + ret.AddedFlags = append(ret.AddedFlags, added.PexPeerFlags) + case compactIpv6NodeAddrElemSize: + ret.Added6 = append(ret.Added6, added.NodeAddr) + ret.Added6Flags = append(ret.Added6Flags, added.PexPeerFlags) + default: + panic(key) + } + } + for key, addr := range me.dropped { + switch len(key) { + case compactIpv4NodeAddrElemSize: + ret.Dropped = append(ret.Dropped, addr) + case compactIpv6NodeAddrElemSize: + ret.Dropped6 = append(ret.Dropped6, addr) + default: + panic(key) + } + } + return +} + +func mustNodeAddr(addr net.Addr) krpc.NodeAddr { + ret, ok := nodeAddr(addr) + if !ok { + panic(addr) + } + return ret +} + +// Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr +// format. +func nodeAddr(addr net.Addr) (_ krpc.NodeAddr, ok bool) { + ipport, ok := tryIpPortFromNetAddr(addr) + if !ok { + return + } + return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}, true } // mainly for the krpc marshallers @@ -89,17 +212,16 @@ func (s *pexState) Drop(c *PeerConn) { } } -// Generate a PEX message based on the event feed. -// Also returns an index to pass to the subsequent calls, producing incremental deltas. -func (s *pexState) Genmsg(start int) (*pp.PexMsg, int) { - m := new(pp.PexMsg) +// Generate a PEX message based on the event feed. Also returns an index to pass to the subsequent +// calls, producing incremental deltas. +func (s *pexState) Genmsg(start int) (pp.PexMsg, int) { + var factory pexMsgFactory n := start for _, e := range s.ev[start:] { - if start > 0 && m.DeltaLen() >= pexMaxDelta { + if !factory.addEvent(e) { break } - e.put(m) n++ } - return m, n + return factory.PexMsg(), n } diff --git a/pex_test.go b/pex_test.go index 161ea1ba..e0023627 100644 --- a/pex_test.go +++ b/pex_test.go @@ -4,6 +4,7 @@ import ( "net" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/anacrolix/dht/v2/krpc" @@ -114,14 +115,14 @@ var testcases = []struct { name string in *pexState arg int - targM *pp.PexMsg + targM pp.PexMsg targS int }{ { name: "empty", in: &pexState{}, arg: 0, - targM: &pp.PexMsg{}, + targM: pp.PexMsg{}, targS: 0, }, { @@ -135,15 +136,15 @@ var testcases = []struct { }, }, arg: 0, - targM: &pp.PexMsg{ + targM: pp.PexMsg{ Added: krpc.CompactIPv4NodeAddrs{ - nodeAddr(addrs[2]), - nodeAddr(addrs[3]), + mustNodeAddr(addrs[2]), + mustNodeAddr(addrs[3]), }, AddedFlags: []pp.PexPeerFlags{f, f}, Added6: krpc.CompactIPv6NodeAddrs{ - nodeAddr(addrs[0]), - nodeAddr(addrs[1]), + mustNodeAddr(addrs[0]), + mustNodeAddr(addrs[1]), }, Added6Flags: []pp.PexPeerFlags{f, f}, }, @@ -158,12 +159,12 @@ var testcases = []struct { pexEvent{pexDrop, addrs[2], f}, }, }, - targM: &pp.PexMsg{ + targM: pp.PexMsg{ Dropped: krpc.CompactIPv4NodeAddrs{ - nodeAddr(addrs[2]), + mustNodeAddr(addrs[2]), }, Dropped6: krpc.CompactIPv6NodeAddrs{ - nodeAddr(addrs[0]), + mustNodeAddr(addrs[0]), }, }, targS: 2, @@ -178,9 +179,9 @@ var testcases = []struct { pexEvent{pexDrop, addrs[0], f}, }, }, - targM: &pp.PexMsg{ + targM: pp.PexMsg{ Added6: krpc.CompactIPv6NodeAddrs{ - nodeAddr(addrs[1]), + mustNodeAddr(addrs[1]), }, Added6Flags: []pp.PexPeerFlags{f}, }, @@ -201,14 +202,14 @@ var testcases = []struct { pexEvent{pexDrop, addrs[1], f}, }, }, - targM: &pp.PexMsg{ + targM: pp.PexMsg{ Added: krpc.CompactIPv4NodeAddrs{ - nodeAddr(addrs[2]), + mustNodeAddr(addrs[2]), }, AddedFlags: []pp.PexPeerFlags{f}, Added6: krpc.CompactIPv6NodeAddrs{ - nodeAddr(addrs[0]), - nodeAddr(addrs[1]), + mustNodeAddr(addrs[0]), + mustNodeAddr(addrs[1]), }, Added6Flags: []pp.PexPeerFlags{f, f}, }, @@ -223,9 +224,9 @@ var testcases = []struct { pexEvent{pexAdd, addrs[1], f}, }, }, - targM: &pp.PexMsg{ + targM: pp.PexMsg{ Added6: krpc.CompactIPv6NodeAddrs{ - nodeAddr(addrs[1]), + mustNodeAddr(addrs[1]), }, Added6Flags: []pp.PexPeerFlags{f}, }, @@ -233,13 +234,178 @@ var testcases = []struct { }, } +// Represents the contents of a PexMsg in a way that supports equivalence checking in tests. This is +// necessary because pexMsgFactory uses maps and so ordering of the resultant PexMsg isn't +// deterministic. Because the flags are in a different array, we can't just use testify's +// ElementsMatch because the ordering *does* still matter between an added addr and its flags. +type comparablePexMsg struct { + added, added6 []pexMsgAdded + dropped, dropped6 []krpc.NodeAddr +} + +func (me *comparablePexMsg) makeAdded(addrs []krpc.NodeAddr, flags []pp.PexPeerFlags) (ret []pexMsgAdded) { + for i, addr := range addrs { + ret = append(ret, pexMsgAdded{ + NodeAddr: addr, + PexPeerFlags: flags[i], + }) + } + return +} + +// Such Rust-inspired. +func (me *comparablePexMsg) From(f pp.PexMsg) { + me.added = me.makeAdded(f.Added, f.AddedFlags) + me.added6 = me.makeAdded(f.Added6, f.Added6Flags) + me.dropped = f.Dropped + me.dropped6 = f.Dropped6 +} + +// For PexMsg created by pexMsgFactory, this is as good as it can get without using data structures +// in pexMsgFactory that preserve insert ordering. +func (actual comparablePexMsg) AssertEqual(t *testing.T, expected comparablePexMsg) { + assert.ElementsMatch(t, expected.added, actual.added) + assert.ElementsMatch(t, expected.added6, actual.added6) + assert.ElementsMatch(t, expected.dropped, actual.dropped) + assert.ElementsMatch(t, expected.dropped6, actual.dropped6) +} + +func assertPexMsgsEqual(t *testing.T, expected, actual pp.PexMsg) { + var ec, ac comparablePexMsg + ec.From(expected) + ac.From(actual) + ac.AssertEqual(t, ec) +} + func TestPexGenmsg(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { s := tc.in m, seen := s.Genmsg(tc.arg) - require.EqualValues(t, tc.targM, m) + assertPexMsgsEqual(t, tc.targM, m) require.EqualValues(t, tc.targS, seen) }) } } + +func TestPexAdd(t *testing.T) { + addrs4 := []krpc.NodeAddr{ + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0 + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1 + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2 + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3 + } + addrs6 := []krpc.NodeAddr{ + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0 + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1 + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2 + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3 + } + f := pp.PexPrefersEncryption | pp.PexOutgoingConn + + t.Run("ipv4", func(t *testing.T) { + addrs := addrs4 + var m pexMsgFactory + m.Drop(addrs[0]) + m.Add(addrs[1], f) + for _, addr := range addrs { + m.Add(addr, f) + } + targ := pp.PexMsg{ + Added: krpc.CompactIPv4NodeAddrs{ + addrs[1], + addrs[2], + addrs[3], + }, + AddedFlags: []pp.PexPeerFlags{f, f, f}, + } + assertPexMsgsEqual(t, targ, m.PexMsg()) + }) + t.Run("ipv6", func(t *testing.T) { + addrs := addrs6 + var m pexMsgFactory + m.Drop(addrs[0]) + m.Add(addrs[1], f) + for _, addr := range addrs { + m.Add(addr, f) + } + targ := pp.PexMsg{ + Added6: krpc.CompactIPv6NodeAddrs{ + addrs[1], + addrs[2], + addrs[3], + }, + Added6Flags: []pp.PexPeerFlags{f, f, f}, + } + assertPexMsgsEqual(t, targ, m.PexMsg()) + }) + t.Run("empty", func(t *testing.T) { + addr := krpc.NodeAddr{} + var xm pexMsgFactory + assert.Panics(t, func() { xm.Add(addr, f) }) + m := xm.PexMsg() + require.EqualValues(t, 0, len(m.Added)) + require.EqualValues(t, 0, len(m.AddedFlags)) + require.EqualValues(t, 0, len(m.Added6)) + require.EqualValues(t, 0, len(m.Added6Flags)) + }) +} + +func TestPexDrop(t *testing.T) { + addrs4 := []krpc.NodeAddr{ + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0 + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1 + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2 + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3 + } + addrs6 := []krpc.NodeAddr{ + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0 + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1 + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2 + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3 + } + f := pp.PexPrefersEncryption | pp.PexOutgoingConn + + t.Run("ipv4", func(t *testing.T) { + addrs := addrs4 + var m pexMsgFactory + m.Add(addrs[0], f) + m.Drop(addrs[1]) + for _, addr := range addrs { + m.Drop(addr) + } + targ := pp.PexMsg{ + Dropped: krpc.CompactIPv4NodeAddrs{ + addrs[1], + addrs[2], + addrs[3], + }, + } + assertPexMsgsEqual(t, targ, m.PexMsg()) + }) + t.Run("ipv6", func(t *testing.T) { + addrs := addrs6 + var m pexMsgFactory + m.Add(addrs[0], f) + m.Drop(addrs[1]) + for _, addr := range addrs { + m.Drop(addr) + } + targ := pp.PexMsg{ + Dropped6: krpc.CompactIPv6NodeAddrs{ + addrs[1], + addrs[2], + addrs[3], + }, + } + assertPexMsgsEqual(t, targ, m.PexMsg()) + }) + t.Run("empty", func(t *testing.T) { + addr := krpc.NodeAddr{} + var xm pexMsgFactory + require.Panics(t, func() { xm.Drop(addr) }) + m := xm.PexMsg() + require.EqualValues(t, 0, len(m.Dropped)) + require.EqualValues(t, 0, len(m.Dropped6)) + }) +} diff --git a/pexconn.go b/pexconn.go index 57ec7ff5..b3719ec8 100644 --- a/pexconn.go +++ b/pexconn.go @@ -64,7 +64,7 @@ func (s *pexConnState) genmsg() *pp.PexMsg { return nil } s.seq = seq - return tx + return &tx } // Share is called from the writer goroutine if when it is woken up with the write buffers empty diff --git a/pexconn_test.go b/pexconn_test.go index 0c204263..868936f3 100644 --- a/pexconn_test.go +++ b/pexconn_test.go @@ -44,11 +44,11 @@ func TestPexConnState(t *testing.T) { x, err := pp.LoadPexMsg(out.ExtendedPayload) require.NoError(t, err) - targx := &pp.PexMsg{ + targx := pp.PexMsg{ Added: krpc.CompactIPv4NodeAddrs(nil), AddedFlags: []pp.PexPeerFlags{}, Added6: krpc.CompactIPv6NodeAddrs{ - nodeAddr(addr), + mustNodeAddr(addr), }, Added6Flags: []pp.PexPeerFlags{0}, } diff --git a/torrent.go b/torrent.go index d84e9e5c..e034933f 100644 --- a/torrent.go +++ b/torrent.go @@ -1250,8 +1250,12 @@ func (t *Torrent) deleteConnection(c *PeerConn) (ret bool) { } _, ret = t.conns[c] delete(t.conns, c) - if !t.cl.config.DisablePEX { - t.pex.Drop(c) + // Avoid adding a drop event more than once. Probably we should track whether we've generated + // the drop event against the PexConnState instead. + if ret { + if !t.cl.config.DisablePEX { + t.pex.Drop(c) + } } torrent.Add("deleted connections", 1) c.deleteAllRequests()