]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Performance improvements to PEX
authorMatt Joiner <anacrolix@gmail.com>
Thu, 22 Oct 2020 21:58:55 +0000 (08:58 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 22 Oct 2020 21:58:55 +0000 (08:58 +1100)
go.sum
peer_protocol/pex.go
peer_protocol/pex_test.go
peerconn.go
pex.go
pex_test.go
pexconn.go
pexconn_test.go
torrent.go

diff --git a/go.sum b/go.sum
index 2653e801c5eb90c64970acda83947f4fcda6f6b5..e87b8af27484387ec2d40f316de1d8e6f433ca30 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -488,6 +488,7 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO
 github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
 github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.1 h1:52QO5WkIUcHGIR7EnGagH88x1bUzqGXTC5/1bDTUQ7U=
 github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -499,6 +500,10 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
 github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
+github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.6.2-0.20200818115829-54d05a4e1844 h1:LTTabKLNrhI/+rSSEG19b4pRD2ipmZx+DF5upbVZmrk=
+github.com/stretchr/testify v1.6.2-0.20200818115829-54d05a4e1844/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/syncthing/syncthing v0.14.48-rc.4/go.mod h1:nw3siZwHPA6M8iSfjDCWQ402eqvEIasMQOE8nFOxy7M=
 github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
 github.com/tinylib/msgp v1.0.2 h1:DfdQrzQa7Yh2es9SuLkixqxuXS2SxsdYn0KbdrOGWD8=
@@ -705,6 +710,8 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
 gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o=
 honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
index 91a4fdb8a3685374554bb6b8d862f33a98d0702d..bcf05f3606f6dd253797d3b8756ff353ad001e73 100644 (file)
@@ -1,8 +1,6 @@
 package peer_protocol
 
 import (
-       "net"
-
        "github.com/anacrolix/dht/v2/krpc"
        "github.com/anacrolix/torrent/bencode"
 )
@@ -16,87 +14,8 @@ type PexMsg struct {
        Dropped6    krpc.CompactIPv6NodeAddrs `bencode:"dropped6"`
 }
 
-func addrEqual(a, b *krpc.NodeAddr) bool {
-       return a.IP.Equal(b.IP) && a.Port == b.Port
-}
-
-func addrIndex(v []krpc.NodeAddr, a *krpc.NodeAddr) int {
-       for i := range v {
-               if addrEqual(&v[i], a) {
-                       return i
-               }
-       }
-       return -1
-}
-
-func (m *PexMsg) Add(addr krpc.NodeAddr, f PexPeerFlags) {
-       if addr.IP.To4() != nil {
-               if addrIndex(m.Added.NodeAddrs(), &addr) >= 0 {
-                       // already added
-                       return
-               }
-               if i := addrIndex(m.Dropped.NodeAddrs(), &addr); i >= 0 {
-                       // on the dropped list - cancel out
-                       m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...)
-                       return
-               }
-               m.Added = append(m.Added, addr)
-               m.AddedFlags = append(m.AddedFlags, f)
-       } else if len(addr.IP) == net.IPv6len {
-               if addrIndex(m.Added6.NodeAddrs(), &addr) >= 0 {
-                       // already added
-                       return
-               }
-               if i := addrIndex(m.Dropped6.NodeAddrs(), &addr); i >= 0 {
-                       // on the dropped list - cancel out
-                       m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...)
-                       return
-               }
-               m.Added6 = append(m.Added6, addr)
-               m.Added6Flags = append(m.Added6Flags, f)
-       }
-}
-
-func (m *PexMsg) Drop(addr krpc.NodeAddr) {
-       if addr.IP.To4() != nil {
-               if addrIndex(m.Dropped.NodeAddrs(), &addr) >= 0 {
-                       // already dropped
-                       return
-               }
-               if i := addrIndex(m.Added.NodeAddrs(), &addr); i >= 0 {
-                       // on the added list - cancel out
-                       m.Added = append(m.Added[:i], m.Added[i+1:]...)
-                       m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...)
-                       return
-               }
-               m.Dropped = append(m.Dropped, addr)
-       } else if len(addr.IP) == net.IPv6len {
-               if addrIndex(m.Dropped6.NodeAddrs(), &addr) >= 0 {
-                       // already dropped
-                       return
-               }
-               if i := addrIndex(m.Added6.NodeAddrs(), &addr); i >= 0 {
-                       // on the added list - cancel out
-                       m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...)
-                       m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...)
-                       return
-               }
-               m.Dropped6 = append(m.Dropped6, addr)
-       }
-}
-
 func (m *PexMsg) Len() int {
-       return len(m.Added)+len(m.Added6)+len(m.Dropped)+len(m.Dropped6)
-}
-
-// DeltaLen returns max of {added+added6, dropped+dropped6}
-func (m *PexMsg) DeltaLen() int {
-       lenAdded := len(m.Added)+len(m.Added6)
-       lenDropped := len(m.Dropped)+len(m.Dropped6)
-       if lenAdded > lenDropped {
-               return lenAdded
-       }
-       return lenDropped
+       return len(m.Added) + len(m.Added6) + len(m.Dropped) + len(m.Dropped6)
 }
 
 func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
@@ -108,15 +27,11 @@ func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
        }
 }
 
-func LoadPexMsg(b []byte) (*PexMsg, error) {
-       m := new(PexMsg) 
-       if err := bencode.Unmarshal(b, m); err != nil {
-               return nil, err
-       }
-       return m, nil
+func LoadPexMsg(b []byte) (ret PexMsg, err error) {
+       err = bencode.Unmarshal(b, &ret)
+       return
 }
 
-
 type PexPeerFlags byte
 
 func (me PexPeerFlags) Get(f PexPeerFlags) bool {
index 7a493ead37fa7515a966c1ba60c9b16b02ea3b36..374dfaa4be370b8944781991c53361c6fd93626d 100644 (file)
@@ -29,132 +29,6 @@ func TestEmptyPexMsg(t *testing.T) {
        require.NoError(t, bencode.Unmarshal(b, &pm))
 }
 
-func TestPexAdd(t *testing.T) {
-       addrs4 := []krpc.NodeAddr{
-               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0
-               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1
-               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2
-               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3
-       }
-       addrs6 := []krpc.NodeAddr{
-               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0
-               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1
-               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2
-               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3
-       }
-       f := PexPrefersEncryption | PexOutgoingConn
-
-       t.Run("ipv4", func(t *testing.T) {
-               addrs := addrs4
-               m := new(PexMsg)
-               m.Drop(addrs[0])
-               m.Add(addrs[1], f)
-               for _, addr := range addrs {
-                       m.Add(addr, f)
-               }
-               targ := &PexMsg{
-                       Added: krpc.CompactIPv4NodeAddrs{
-                               addrs[1],
-                               addrs[2],
-                               addrs[3],
-                       },
-                       AddedFlags: []PexPeerFlags{f, f, f},
-                       Dropped:    krpc.CompactIPv4NodeAddrs{},
-               }
-               require.EqualValues(t, targ, m)
-       })
-       t.Run("ipv6", func(t *testing.T) {
-               addrs := addrs6
-               m := new(PexMsg)
-               m.Drop(addrs[0])
-               m.Add(addrs[1], f)
-               for _, addr := range addrs {
-                       m.Add(addr, f)
-               }
-               targ := &PexMsg{
-                       Added6: krpc.CompactIPv6NodeAddrs{
-                               addrs[1],
-                               addrs[2],
-                               addrs[3],
-                       },
-                       Added6Flags: []PexPeerFlags{f, f, f},
-                       Dropped6:    krpc.CompactIPv6NodeAddrs{},
-               }
-               require.EqualValues(t, targ, m)
-       })
-       t.Run("empty", func(t *testing.T) {
-               addr := krpc.NodeAddr{}
-               xm := new(PexMsg)
-               xm.Add(addr, f)
-               require.EqualValues(t, 0, len(xm.Added))
-               require.EqualValues(t, 0, len(xm.AddedFlags))
-               require.EqualValues(t, 0, len(xm.Added6))
-               require.EqualValues(t, 0, len(xm.Added6Flags))
-       })
-}
-
-func TestPexDrop(t *testing.T) {
-       addrs4 := []krpc.NodeAddr{
-               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0
-               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1
-               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2
-               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3
-       }
-       addrs6 := []krpc.NodeAddr{
-               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0
-               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1
-               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2
-               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3
-       }
-       f := PexPrefersEncryption | PexOutgoingConn
-
-       t.Run("ipv4", func(t *testing.T) {
-               addrs := addrs4
-               m := new(PexMsg)
-               m.Add(addrs[0], f)
-               m.Drop(addrs[1])
-               for _, addr := range addrs {
-                       m.Drop(addr)
-               }
-               targ := &PexMsg{
-                       AddedFlags: []PexPeerFlags{},
-                       Added:    krpc.CompactIPv4NodeAddrs{},
-                       Dropped: krpc.CompactIPv4NodeAddrs{
-                               addrs[1],
-                               addrs[2],
-                               addrs[3],
-                       },
-               }
-               require.EqualValues(t, targ, m)
-       })
-       t.Run("ipv6", func(t *testing.T) {
-               addrs := addrs6
-               m := new(PexMsg)
-               m.Add(addrs[0], f)
-               m.Drop(addrs[1])
-               for _, addr := range addrs {
-                       m.Drop(addr)
-               }
-               targ := &PexMsg{
-                       Added6Flags: []PexPeerFlags{},
-                       Added6:    krpc.CompactIPv6NodeAddrs{},
-                       Dropped6: krpc.CompactIPv6NodeAddrs{
-                               addrs[1],
-                               addrs[2],
-                               addrs[3],
-                       },
-               }
-               require.EqualValues(t, targ, m)
-       })
-       t.Run("empty", func(t *testing.T) {
-               addr := krpc.NodeAddr{}
-               xm := new(PexMsg)
-               xm.Drop(addr)
-               require.EqualValues(t, 0, len(xm.Dropped))
-               require.EqualValues(t, 0, len(xm.Dropped6))
-       })
-}
-
 func TestMarshalPexMessage(t *testing.T) {
        addr := krpc.NodeAddr{IP: net.IP{127, 0, 0, 1}, Port: 0x55aa}
        f := PexPrefersEncryption | PexOutgoingConn
index c6019d29f078630c76a6e9c87a49e1b1a3333472..312861ff75d60f24b12974d3da11ea88335708de 100644 (file)
@@ -1598,6 +1598,8 @@ func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags {
        return f
 }
 
+// This returns the address to use if we want to dial the peer again. It incorporates the peer's
+// advertised listen port.
 func (c *PeerConn) dialAddr() net.Addr {
        if !c.outgoing && c.PeerListenPort != 0 {
                switch addr := c.RemoteAddr.(type) {
diff --git a/pex.go b/pex.go
index acedd71d05bcdb85fc886de09cd8acf3c5d069f6..b626850be4cdc3d173220128247314529ee585cb 100644 (file)
--- a/pex.go
+++ b/pex.go
@@ -28,19 +28,142 @@ type pexEvent struct {
        f    pp.PexPeerFlags
 }
 
-// records the event into the peer protocol PEX message
-func (e *pexEvent) put(m *pp.PexMsg) {
-       switch e.t {
+// Combines the node addr, as required for pp.PexMsg.
+type pexMsgAdded struct {
+       krpc.NodeAddr
+       pp.PexPeerFlags
+}
+
+// Makes generating a PexMsg more efficient.
+type pexMsgFactory struct {
+       added   map[string]pexMsgAdded
+       dropped map[string]krpc.NodeAddr
+}
+
+func (me *pexMsgFactory) DeltaLen() int {
+       return int(max(
+               int64(len(me.added)),
+               int64(len(me.dropped))))
+}
+
+// Returns the key to use to identify a given addr in the factory. Panics if we can't support the
+// addr later in generating a PexMsg (since adding an unusable addr will cause DeltaLen to be out.)
+func (me *pexMsgFactory) addrKey(addr krpc.NodeAddr) string {
+       if addr.IP.To4() != nil {
+               addr.IP = addr.IP.To4()
+       }
+       keyBytes, err := addr.MarshalBinary()
+       if err != nil {
+               panic(err)
+       }
+       switch len(keyBytes) {
+       case compactIpv4NodeAddrElemSize:
+       case compactIpv6NodeAddrElemSize:
+       default:
+               panic(len(keyBytes))
+       }
+       return string(keyBytes)
+}
+
+// Returns whether the entry was added (we can check if we're cancelling out another entry and so
+// won't hit the limit consuming this event).
+func (me *pexMsgFactory) Add(addr krpc.NodeAddr, flags pp.PexPeerFlags) bool {
+       key := me.addrKey(addr)
+       if _, ok := me.dropped[key]; ok {
+               delete(me.dropped, key)
+               return true
+       }
+       if me.DeltaLen() >= pexMaxDelta {
+               return false
+       }
+       if me.added == nil {
+               me.added = make(map[string]pexMsgAdded, pexMaxDelta)
+       }
+       me.added[key] = pexMsgAdded{addr, flags}
+       return true
+
+}
+
+// Returns whether the entry was added (we can check if we're cancelling out another entry and so
+// won't hit the limit consuming this event).
+func (me *pexMsgFactory) Drop(addr krpc.NodeAddr) bool {
+       key := me.addrKey(addr)
+       if _, ok := me.added[key]; ok {
+               delete(me.added, key)
+               return true
+       }
+       if me.DeltaLen() >= pexMaxDelta {
+               return false
+       }
+       if me.dropped == nil {
+               me.dropped = make(map[string]krpc.NodeAddr, pexMaxDelta)
+       }
+       me.dropped[key] = addr
+       return true
+}
+
+// Returns whether the entry was added (we can check if we're cancelling out another entry and so
+// won't hit the limit consuming this event).
+func (me *pexMsgFactory) addEvent(event pexEvent) bool {
+       addr, ok := nodeAddr(event.addr)
+       if !ok {
+               return true
+       }
+       switch event.t {
        case pexAdd:
-               m.Add(nodeAddr(e.addr), e.f)
+               return me.Add(addr, event.f)
        case pexDrop:
-               m.Drop(nodeAddr(e.addr))
+               return me.Drop(addr)
+       default:
+               panic(event.t)
        }
 }
 
-func nodeAddr(addr net.Addr) krpc.NodeAddr {
-       ipport, _ := tryIpPortFromNetAddr(addr)
-       return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}
+var compactIpv4NodeAddrElemSize = krpc.CompactIPv4NodeAddrs{}.ElemSize()
+var compactIpv6NodeAddrElemSize = krpc.CompactIPv6NodeAddrs{}.ElemSize()
+
+func (me *pexMsgFactory) PexMsg() (ret pp.PexMsg) {
+       for key, added := range me.added {
+               switch len(key) {
+               case compactIpv4NodeAddrElemSize:
+                       ret.Added = append(ret.Added, added.NodeAddr)
+                       ret.AddedFlags = append(ret.AddedFlags, added.PexPeerFlags)
+               case compactIpv6NodeAddrElemSize:
+                       ret.Added6 = append(ret.Added6, added.NodeAddr)
+                       ret.Added6Flags = append(ret.Added6Flags, added.PexPeerFlags)
+               default:
+                       panic(key)
+               }
+       }
+       for key, addr := range me.dropped {
+               switch len(key) {
+               case compactIpv4NodeAddrElemSize:
+                       ret.Dropped = append(ret.Dropped, addr)
+               case compactIpv6NodeAddrElemSize:
+                       ret.Dropped6 = append(ret.Dropped6, addr)
+               default:
+                       panic(key)
+               }
+       }
+       return
+}
+
+func mustNodeAddr(addr net.Addr) krpc.NodeAddr {
+       ret, ok := nodeAddr(addr)
+       if !ok {
+               panic(addr)
+       }
+       return ret
+}
+
+// Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
+// format.
+func nodeAddr(addr net.Addr) (_ krpc.NodeAddr, ok bool) {
+       ipport, ok := tryIpPortFromNetAddr(addr)
+       if !ok {
+               return
+       }
+       return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}, true
 }
 
 // mainly for the krpc marshallers
@@ -89,17 +212,16 @@ func (s *pexState) Drop(c *PeerConn) {
        }
 }
 
-// Generate a PEX message based on the event feed.
-// Also returns an index to pass to the subsequent calls, producing incremental deltas.
-func (s *pexState) Genmsg(start int) (*pp.PexMsg, int) {
-       m := new(pp.PexMsg)
+// Generate a PEX message based on the event feed. Also returns an index to pass to the subsequent
+// calls, producing incremental deltas.
+func (s *pexState) Genmsg(start int) (pp.PexMsg, int) {
+       var factory pexMsgFactory
        n := start
        for _, e := range s.ev[start:] {
-               if start > 0 && m.DeltaLen() >= pexMaxDelta {
+               if !factory.addEvent(e) {
                        break
                }
-               e.put(m)
                n++
        }
-       return m, n
+       return factory.PexMsg(), n
 }
index 161ea1baa67d6b61e33732607a65a9a2b769e1e0..e002362717110360fbb14f4cb895a3fe7a48d345 100644 (file)
@@ -4,6 +4,7 @@ import (
        "net"
        "testing"
 
+       "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 
        "github.com/anacrolix/dht/v2/krpc"
@@ -114,14 +115,14 @@ var testcases = []struct {
        name  string
        in    *pexState
        arg   int
-       targM *pp.PexMsg
+       targM pp.PexMsg
        targS int
 }{
        {
                name:  "empty",
                in:    &pexState{},
                arg:   0,
-               targM: &pp.PexMsg{},
+               targM: pp.PexMsg{},
                targS: 0,
        },
        {
@@ -135,15 +136,15 @@ var testcases = []struct {
                        },
                },
                arg: 0,
-               targM: &pp.PexMsg{
+               targM: pp.PexMsg{
                        Added: krpc.CompactIPv4NodeAddrs{
-                               nodeAddr(addrs[2]),
-                               nodeAddr(addrs[3]),
+                               mustNodeAddr(addrs[2]),
+                               mustNodeAddr(addrs[3]),
                        },
                        AddedFlags: []pp.PexPeerFlags{f, f},
                        Added6: krpc.CompactIPv6NodeAddrs{
-                               nodeAddr(addrs[0]),
-                               nodeAddr(addrs[1]),
+                               mustNodeAddr(addrs[0]),
+                               mustNodeAddr(addrs[1]),
                        },
                        Added6Flags: []pp.PexPeerFlags{f, f},
                },
@@ -158,12 +159,12 @@ var testcases = []struct {
                                pexEvent{pexDrop, addrs[2], f},
                        },
                },
-               targM: &pp.PexMsg{
+               targM: pp.PexMsg{
                        Dropped: krpc.CompactIPv4NodeAddrs{
-                               nodeAddr(addrs[2]),
+                               mustNodeAddr(addrs[2]),
                        },
                        Dropped6: krpc.CompactIPv6NodeAddrs{
-                               nodeAddr(addrs[0]),
+                               mustNodeAddr(addrs[0]),
                        },
                },
                targS: 2,
@@ -178,9 +179,9 @@ var testcases = []struct {
                                pexEvent{pexDrop, addrs[0], f},
                        },
                },
-               targM: &pp.PexMsg{
+               targM: pp.PexMsg{
                        Added6: krpc.CompactIPv6NodeAddrs{
-                               nodeAddr(addrs[1]),
+                               mustNodeAddr(addrs[1]),
                        },
                        Added6Flags: []pp.PexPeerFlags{f},
                },
@@ -201,14 +202,14 @@ var testcases = []struct {
                                pexEvent{pexDrop, addrs[1], f},
                        },
                },
-               targM: &pp.PexMsg{
+               targM: pp.PexMsg{
                        Added: krpc.CompactIPv4NodeAddrs{
-                               nodeAddr(addrs[2]),
+                               mustNodeAddr(addrs[2]),
                        },
                        AddedFlags: []pp.PexPeerFlags{f},
                        Added6: krpc.CompactIPv6NodeAddrs{
-                               nodeAddr(addrs[0]),
-                               nodeAddr(addrs[1]),
+                               mustNodeAddr(addrs[0]),
+                               mustNodeAddr(addrs[1]),
                        },
                        Added6Flags: []pp.PexPeerFlags{f, f},
                },
@@ -223,9 +224,9 @@ var testcases = []struct {
                                pexEvent{pexAdd, addrs[1], f},
                        },
                },
-               targM: &pp.PexMsg{
+               targM: pp.PexMsg{
                        Added6: krpc.CompactIPv6NodeAddrs{
-                               nodeAddr(addrs[1]),
+                               mustNodeAddr(addrs[1]),
                        },
                        Added6Flags: []pp.PexPeerFlags{f},
                },
@@ -233,13 +234,178 @@ var testcases = []struct {
        },
 }
 
+// Represents the contents of a PexMsg in a way that supports equivalence checking in tests. This is
+// necessary because pexMsgFactory uses maps and so ordering of the resultant PexMsg isn't
+// deterministic. Because the flags are in a different array, we can't just use testify's
+// ElementsMatch because the ordering *does* still matter between an added addr and its flags.
+type comparablePexMsg struct {
+       added, added6     []pexMsgAdded
+       dropped, dropped6 []krpc.NodeAddr
+}
+
+func (me *comparablePexMsg) makeAdded(addrs []krpc.NodeAddr, flags []pp.PexPeerFlags) (ret []pexMsgAdded) {
+       for i, addr := range addrs {
+               ret = append(ret, pexMsgAdded{
+                       NodeAddr:     addr,
+                       PexPeerFlags: flags[i],
+               })
+       }
+       return
+}
+
+// Such Rust-inspired.
+func (me *comparablePexMsg) From(f pp.PexMsg) {
+       me.added = me.makeAdded(f.Added, f.AddedFlags)
+       me.added6 = me.makeAdded(f.Added6, f.Added6Flags)
+       me.dropped = f.Dropped
+       me.dropped6 = f.Dropped6
+}
+
+// For PexMsg created by pexMsgFactory, this is as good as it can get without using data structures
+// in pexMsgFactory that preserve insert ordering.
+func (actual comparablePexMsg) AssertEqual(t *testing.T, expected comparablePexMsg) {
+       assert.ElementsMatch(t, expected.added, actual.added)
+       assert.ElementsMatch(t, expected.added6, actual.added6)
+       assert.ElementsMatch(t, expected.dropped, actual.dropped)
+       assert.ElementsMatch(t, expected.dropped6, actual.dropped6)
+}
+
+func assertPexMsgsEqual(t *testing.T, expected, actual pp.PexMsg) {
+       var ec, ac comparablePexMsg
+       ec.From(expected)
+       ac.From(actual)
+       ac.AssertEqual(t, ec)
+}
+
 func TestPexGenmsg(t *testing.T) {
        for _, tc := range testcases {
                t.Run(tc.name, func(t *testing.T) {
                        s := tc.in
                        m, seen := s.Genmsg(tc.arg)
-                       require.EqualValues(t, tc.targM, m)
+                       assertPexMsgsEqual(t, tc.targM, m)
                        require.EqualValues(t, tc.targS, seen)
                })
        }
 }
+
+func TestPexAdd(t *testing.T) {
+       addrs4 := []krpc.NodeAddr{
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3
+       }
+       addrs6 := []krpc.NodeAddr{
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3
+       }
+       f := pp.PexPrefersEncryption | pp.PexOutgoingConn
+
+       t.Run("ipv4", func(t *testing.T) {
+               addrs := addrs4
+               var m pexMsgFactory
+               m.Drop(addrs[0])
+               m.Add(addrs[1], f)
+               for _, addr := range addrs {
+                       m.Add(addr, f)
+               }
+               targ := pp.PexMsg{
+                       Added: krpc.CompactIPv4NodeAddrs{
+                               addrs[1],
+                               addrs[2],
+                               addrs[3],
+                       },
+                       AddedFlags: []pp.PexPeerFlags{f, f, f},
+               }
+               assertPexMsgsEqual(t, targ, m.PexMsg())
+       })
+       t.Run("ipv6", func(t *testing.T) {
+               addrs := addrs6
+               var m pexMsgFactory
+               m.Drop(addrs[0])
+               m.Add(addrs[1], f)
+               for _, addr := range addrs {
+                       m.Add(addr, f)
+               }
+               targ := pp.PexMsg{
+                       Added6: krpc.CompactIPv6NodeAddrs{
+                               addrs[1],
+                               addrs[2],
+                               addrs[3],
+                       },
+                       Added6Flags: []pp.PexPeerFlags{f, f, f},
+               }
+               assertPexMsgsEqual(t, targ, m.PexMsg())
+       })
+       t.Run("empty", func(t *testing.T) {
+               addr := krpc.NodeAddr{}
+               var xm pexMsgFactory
+               assert.Panics(t, func() { xm.Add(addr, f) })
+               m := xm.PexMsg()
+               require.EqualValues(t, 0, len(m.Added))
+               require.EqualValues(t, 0, len(m.AddedFlags))
+               require.EqualValues(t, 0, len(m.Added6))
+               require.EqualValues(t, 0, len(m.Added6Flags))
+       })
+}
+
+func TestPexDrop(t *testing.T) {
+       addrs4 := []krpc.NodeAddr{
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3
+       }
+       addrs6 := []krpc.NodeAddr{
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3
+       }
+       f := pp.PexPrefersEncryption | pp.PexOutgoingConn
+
+       t.Run("ipv4", func(t *testing.T) {
+               addrs := addrs4
+               var m pexMsgFactory
+               m.Add(addrs[0], f)
+               m.Drop(addrs[1])
+               for _, addr := range addrs {
+                       m.Drop(addr)
+               }
+               targ := pp.PexMsg{
+                       Dropped: krpc.CompactIPv4NodeAddrs{
+                               addrs[1],
+                               addrs[2],
+                               addrs[3],
+                       },
+               }
+               assertPexMsgsEqual(t, targ, m.PexMsg())
+       })
+       t.Run("ipv6", func(t *testing.T) {
+               addrs := addrs6
+               var m pexMsgFactory
+               m.Add(addrs[0], f)
+               m.Drop(addrs[1])
+               for _, addr := range addrs {
+                       m.Drop(addr)
+               }
+               targ := pp.PexMsg{
+                       Dropped6: krpc.CompactIPv6NodeAddrs{
+                               addrs[1],
+                               addrs[2],
+                               addrs[3],
+                       },
+               }
+               assertPexMsgsEqual(t, targ, m.PexMsg())
+       })
+       t.Run("empty", func(t *testing.T) {
+               addr := krpc.NodeAddr{}
+               var xm pexMsgFactory
+               require.Panics(t, func() { xm.Drop(addr) })
+               m := xm.PexMsg()
+               require.EqualValues(t, 0, len(m.Dropped))
+               require.EqualValues(t, 0, len(m.Dropped6))
+       })
+}
index 57ec7ff52e5442171e0f32d883cc63fe30b04b63..b3719ec869f2a24618b3ec75934c11b17ca7fd09 100644 (file)
@@ -64,7 +64,7 @@ func (s *pexConnState) genmsg() *pp.PexMsg {
                return nil
        }
        s.seq = seq
-       return tx
+       return &tx
 }
 
 // Share is called from the writer goroutine if when it is woken up with the write buffers empty
index 0c204263e48cf00c5e2475474fc6dc87d34c4751..868936f3ddcc00a0782057ba784257cca1410d8e 100644 (file)
@@ -44,11 +44,11 @@ func TestPexConnState(t *testing.T) {
 
        x, err := pp.LoadPexMsg(out.ExtendedPayload)
        require.NoError(t, err)
-       targx := &pp.PexMsg{
+       targx := pp.PexMsg{
                Added:      krpc.CompactIPv4NodeAddrs(nil),
                AddedFlags: []pp.PexPeerFlags{},
                Added6: krpc.CompactIPv6NodeAddrs{
-                       nodeAddr(addr),
+                       mustNodeAddr(addr),
                },
                Added6Flags: []pp.PexPeerFlags{0},
        }
index d84e9e5c3e83ac74e759bc79c5112dfd4a0282a7..e034933fba17bd9f1c18ec0f8f8afb85515155d6 100644 (file)
@@ -1250,8 +1250,12 @@ func (t *Torrent) deleteConnection(c *PeerConn) (ret bool) {
        }
        _, ret = t.conns[c]
        delete(t.conns, c)
-       if !t.cl.config.DisablePEX {
-               t.pex.Drop(c)
+       // Avoid adding a drop event more than once. Probably we should track whether we've generated
+       // the drop event against the PexConnState instead.
+       if ret {
+               if !t.cl.config.DisablePEX {
+                       t.pex.Drop(c)
+               }
        }
        torrent.Add("deleted connections", 1)
        c.deleteAllRequests()