]> Sergey Matveev's repositories - btrtrc.git/commitdiff
PEX: add connection tracking
authorYaroslav Kolomiiets <yarikos@gmail.com>
Tue, 31 Mar 2020 20:14:43 +0000 (21:14 +0100)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 15 Apr 2020 07:24:44 +0000 (17:24 +1000)
client.go
peer_protocol/pex.go
peer_protocol/pex_test.go
peerconn.go
pex.go [new file with mode: 0644]
pex_test.go [new file with mode: 0644]
torrent.go
torrent_test.go

index b5b76145cf18675311b60f63fef4985ba97a7a5a..bb2bf4aebc634947e5b4ee645ac4c9a5c522e170 100644 (file)
--- a/client.go
+++ b/client.go
@@ -935,21 +935,19 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
 }
 
 func (cl *Client) sendInitialPEX(conn *PeerConn, t *Torrent) {
-       peerPexExtendedId, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex]
+       xid, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex]
        if !ok {
-               // peer did not advertise support for the PEX extension
-               conn.logger.Printf("no PEX support -  not sending initial")
                return
        }
-       pexMsg := t.pexInitial()
-       if pexMsg == nil {
-               // not enough peers to share — e.g. len(t.conns < 50)
-               conn.logger.Printf("skipping PEX initial")
+       m, seq := t.pex.Genmsg(0)
+       conn.pexSeq = seq
+       if m.Len() == 0 {
+               cl.logger.Printf("no initial PEX this time")
+               // FIXME see how can we schedule another initial for later
                return
        }
-       log.Printf("preparing PEX initial message: %v", pexMsg)
-       tx := pexMsg.Message(peerPexExtendedId)
-       conn.post(tx)
+       conn.logger.Printf("sending initial PEX message: %v", m)
+       conn.post(m.Message(xid))
 }
 
 func (cl *Client) dhtPort() (ret uint16) {
index 8d09106103f20f4a2678b2beb36740b660fe10a4..52801865acbc66780e79fcc29ca3256fd2aa4aa7 100644 (file)
@@ -16,26 +16,89 @@ type PexMsg struct {
        Dropped6    krpc.CompactIPv6NodeAddrs `bencode:"dropped6"`
 }
 
-func (m *PexMsg) AppendAdded(addr krpc.NodeAddr, f PexPeerFlags) {
-       ip := addr.IP
-       if ip.To4() != nil {
+func addrEqual(a, b *krpc.NodeAddr) bool {
+       return a.IP.Equal(b.IP) && a.Port == b.Port
+}
+
+func addrIndex(v []krpc.NodeAddr, a *krpc.NodeAddr) int {
+       for i := range v {
+               if addrEqual(&v[i], a) {
+                       return i
+               }
+       }
+       return -1
+}
+
+func (m *PexMsg) Add(addr krpc.NodeAddr, f PexPeerFlags) {
+       if addr.IP.To4() != nil {
+               if addrIndex(m.Added.NodeAddrs(), &addr) >= 0 {
+                       // already added
+                       return
+               }
+               if i := addrIndex(m.Dropped.NodeAddrs(), &addr); i >= 0 {
+                       // on the dropped list - cancel out
+                       m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...)
+                       return
+               }
                m.Added = append(m.Added, addr)
                m.AddedFlags = append(m.AddedFlags, f)
-       } else if len(ip) == net.IPv6len {
+       } else if len(addr.IP) == net.IPv6len {
+               if addrIndex(m.Added6.NodeAddrs(), &addr) >= 0 {
+                       // already added
+                       return
+               }
+               if i := addrIndex(m.Dropped6.NodeAddrs(), &addr); i >= 0 {
+                       // on the dropped list - cancel out
+                       m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...)
+                       return
+               }
                m.Added6 = append(m.Added6, addr)
                m.Added6Flags = append(m.Added6Flags, f)
        }
 }
 
-func (m *PexMsg) AppendDropped(addr krpc.NodeAddr) {
-       ip := addr.IP
-       if ip.To4() != nil {
+func (m *PexMsg) Drop(addr krpc.NodeAddr) {
+       if addr.IP.To4() != nil {
+               if addrIndex(m.Dropped.NodeAddrs(), &addr) >= 0 {
+                       // already dropped
+                       return
+               }
+               if i := addrIndex(m.Added.NodeAddrs(), &addr); i >= 0 {
+                       // on the added list - cancel out
+                       m.Added = append(m.Added[:i], m.Added[i+1:]...)
+                       m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...)
+                       return
+               }
                m.Dropped = append(m.Dropped, addr)
-       } else if len(ip) == net.IPv6len {
+       } else if len(addr.IP) == net.IPv6len {
+               if addrIndex(m.Dropped6.NodeAddrs(), &addr) >= 0 {
+                       // already dropped
+                       return
+               }
+               if i := addrIndex(m.Added6.NodeAddrs(), &addr); i >= 0 {
+                       // on the added list - cancel out
+                       m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...)
+                       m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...)
+                       return
+               }
                m.Dropped6 = append(m.Dropped6, addr)
        }
 }
 
+func (m *PexMsg) Len() int {
+       return len(m.Added)+len(m.Added6)+len(m.Dropped)+len(m.Dropped6)
+}
+
+// DeltaLen returns max of {added+added6, dropped+dropped6}
+func (m *PexMsg) DeltaLen() int {
+       lenAdded := len(m.Added)+len(m.Added6)
+       lenDropped := len(m.Dropped)+len(m.Dropped6)
+       if lenAdded > lenDropped {
+               return lenAdded
+       }
+       return lenDropped
+}
+
 func (pexMsg *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
        payload := bencode.MustMarshal(pexMsg)
        return Message{
index 30e0d363a8cf381046ab662c63650bdc43652ad7..7a493ead37fa7515a966c1ba60c9b16b02ea3b36 100644 (file)
@@ -29,77 +29,138 @@ func TestEmptyPexMsg(t *testing.T) {
        require.NoError(t, bencode.Unmarshal(b, &pm))
 }
 
-func TestPexAppendAdded(t *testing.T) {
+func TestPexAdd(t *testing.T) {
+       addrs4 := []krpc.NodeAddr{
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3
+       }
+       addrs6 := []krpc.NodeAddr{
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3
+       }
+       f := PexPrefersEncryption | PexOutgoingConn
+
        t.Run("ipv4", func(t *testing.T) {
-               addr := krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}
-               f := PexPrefersEncryption | PexOutgoingConn
-               xm := PexMsg{}
-               xm.AppendAdded(addr, f)
-               require.EqualValues(t, len(xm.Added), 1)
-               require.EqualValues(t, len(xm.AddedFlags), 1)
-               require.EqualValues(t, len(xm.Added6), 0)
-               require.EqualValues(t, len(xm.Added6Flags), 0)
-               require.True(t, xm.Added[0].IP.Equal(addr.IP), "IPs should match")
-               require.EqualValues(t, xm.Added[0].Port, addr.Port)
-               require.EqualValues(t, xm.AddedFlags[0], f)
+               addrs := addrs4
+               m := new(PexMsg)
+               m.Drop(addrs[0])
+               m.Add(addrs[1], f)
+               for _, addr := range addrs {
+                       m.Add(addr, f)
+               }
+               targ := &PexMsg{
+                       Added: krpc.CompactIPv4NodeAddrs{
+                               addrs[1],
+                               addrs[2],
+                               addrs[3],
+                       },
+                       AddedFlags: []PexPeerFlags{f, f, f},
+                       Dropped:    krpc.CompactIPv4NodeAddrs{},
+               }
+               require.EqualValues(t, targ, m)
        })
        t.Run("ipv6", func(t *testing.T) {
-               addr := krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}
-               f := PexPrefersEncryption | PexOutgoingConn
-               xm := PexMsg{}
-               xm.AppendAdded(addr, f)
-               require.EqualValues(t, len(xm.Added), 0)
-               require.EqualValues(t, len(xm.AddedFlags), 0)
-               require.EqualValues(t, len(xm.Added6), 1)
-               require.EqualValues(t, len(xm.Added6Flags), 1)
-               require.True(t, xm.Added6[0].IP.Equal(addr.IP), "IPs should match")
-               require.EqualValues(t, xm.Added6[0].Port, addr.Port)
-               require.EqualValues(t, xm.Added6Flags[0], f)
+               addrs := addrs6
+               m := new(PexMsg)
+               m.Drop(addrs[0])
+               m.Add(addrs[1], f)
+               for _, addr := range addrs {
+                       m.Add(addr, f)
+               }
+               targ := &PexMsg{
+                       Added6: krpc.CompactIPv6NodeAddrs{
+                               addrs[1],
+                               addrs[2],
+                               addrs[3],
+                       },
+                       Added6Flags: []PexPeerFlags{f, f, f},
+                       Dropped6:    krpc.CompactIPv6NodeAddrs{},
+               }
+               require.EqualValues(t, targ, m)
        })
-       t.Run("unspecified", func(t *testing.T) {
+       t.Run("empty", func(t *testing.T) {
                addr := krpc.NodeAddr{}
-               xm := PexMsg{}
-               xm.AppendAdded(addr, 0)
-               require.EqualValues(t, len(xm.Added), 0)
-               require.EqualValues(t, len(xm.AddedFlags), 0)
-               require.EqualValues(t, len(xm.Added6), 0)
-               require.EqualValues(t, len(xm.Added6Flags), 0)
+               xm := new(PexMsg)
+               xm.Add(addr, f)
+               require.EqualValues(t, 0, len(xm.Added))
+               require.EqualValues(t, 0, len(xm.AddedFlags))
+               require.EqualValues(t, 0, len(xm.Added6))
+               require.EqualValues(t, 0, len(xm.Added6Flags))
        })
 }
 
-func TestPexAppendDropped(t *testing.T) {
+func TestPexDrop(t *testing.T) {
+       addrs4 := []krpc.NodeAddr{
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2
+               krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3
+       }
+       addrs6 := []krpc.NodeAddr{
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2
+               krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3
+       }
+       f := PexPrefersEncryption | PexOutgoingConn
+
        t.Run("ipv4", func(t *testing.T) {
-               addr := krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}
-               xm := PexMsg{}
-               xm.AppendDropped(addr)
-               require.EqualValues(t, len(xm.Dropped), 1)
-               require.EqualValues(t, len(xm.Dropped6), 0)
-               require.True(t, xm.Dropped[0].IP.Equal(addr.IP), "IPs should match")
-               require.EqualValues(t, xm.Dropped[0].Port, addr.Port)
+               addrs := addrs4
+               m := new(PexMsg)
+               m.Add(addrs[0], f)
+               m.Drop(addrs[1])
+               for _, addr := range addrs {
+                       m.Drop(addr)
+               }
+               targ := &PexMsg{
+                       AddedFlags: []PexPeerFlags{},
+                       Added:    krpc.CompactIPv4NodeAddrs{},
+                       Dropped: krpc.CompactIPv4NodeAddrs{
+                               addrs[1],
+                               addrs[2],
+                               addrs[3],
+                       },
+               }
+               require.EqualValues(t, targ, m)
        })
        t.Run("ipv6", func(t *testing.T) {
-               addr := krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}
-               xm := PexMsg{}
-               xm.AppendDropped(addr)
-               require.EqualValues(t, len(xm.Dropped), 0)
-               require.EqualValues(t, len(xm.Dropped6), 1)
-               require.True(t, xm.Dropped6[0].IP.Equal(addr.IP), "IPs should match")
-               require.EqualValues(t, xm.Dropped6[0].Port, addr.Port)
+               addrs := addrs6
+               m := new(PexMsg)
+               m.Add(addrs[0], f)
+               m.Drop(addrs[1])
+               for _, addr := range addrs {
+                       m.Drop(addr)
+               }
+               targ := &PexMsg{
+                       Added6Flags: []PexPeerFlags{},
+                       Added6:    krpc.CompactIPv6NodeAddrs{},
+                       Dropped6: krpc.CompactIPv6NodeAddrs{
+                               addrs[1],
+                               addrs[2],
+                               addrs[3],
+                       },
+               }
+               require.EqualValues(t, targ, m)
        })
-       t.Run("unspecified", func(t *testing.T) {
+       t.Run("empty", func(t *testing.T) {
                addr := krpc.NodeAddr{}
-               xm := PexMsg{}
-               xm.AppendDropped(addr)
-               require.EqualValues(t, len(xm.Dropped), 0)
-               require.EqualValues(t, len(xm.Dropped6), 0)
+               xm := new(PexMsg)
+               xm.Drop(addr)
+               require.EqualValues(t, 0, len(xm.Dropped))
+               require.EqualValues(t, 0, len(xm.Dropped6))
        })
 }
 
 func TestMarshalPexMessage(t *testing.T) {
        addr := krpc.NodeAddr{IP: net.IP{127, 0, 0, 1}, Port: 0x55aa}
        f := PexPrefersEncryption | PexOutgoingConn
-       pm := PexMsg{}
-       pm.AppendAdded(addr, f)
+       pm := new(PexMsg)
+       pm.Added = append(pm.Added, addr)
+       pm.AddedFlags = append(pm.AddedFlags, f)
 
        b, err := bencode.Marshal(pm)
        require.NoError(t, err)
index f90dfd4fbd07b0b33a0a704ee89c87513a4e9bb4..be8018d6e6ff7e52a2993ea438e8813e37a5df27 100644 (file)
@@ -85,13 +85,14 @@ type PeerConn struct {
        // response.
        metadataRequests []bool
        sentHaves        bitmap.Bitmap
+       pexSeq           int
 
        // Stuff controlled by the remote peer.
-       PeerID             PeerID
-       peerInterested     bool
-       peerChoking        bool
-       peerRequests       map[request]struct{}
-       PeerExtensionBytes pp.PeerExtensionBits
+       PeerID                PeerID
+       peerInterested        bool
+       peerChoking           bool
+       peerRequests          map[request]struct{}
+       PeerExtensionBytes    pp.PeerExtensionBits
        PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
        PeerListenPort        int
        // The pieces the peer has claimed to have.
@@ -1493,6 +1494,11 @@ func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags {
        return f
 }
 
+func (c *PeerConn) pexEvent(t pexEventType) pexEvent {
+       f := c.pexPeerFlags()
+       return pexEvent{t, c.remoteAddr, f}
+}
+
 func (c *PeerConn) String() string {
        return fmt.Sprintf("connection %p", c)
 }
diff --git a/pex.go b/pex.go
new file mode 100644 (file)
index 0000000..5f22e82
--- /dev/null
+++ b/pex.go
@@ -0,0 +1,100 @@
+package torrent
+
+import (
+       "net"
+
+       "github.com/anacrolix/dht/v2/krpc"
+       pp "github.com/anacrolix/torrent/peer_protocol"
+)
+
+type pexEventType int
+
+const (
+       pexAdd pexEventType = iota
+       pexDrop
+)
+
+// internal, based on BEP11
+const (
+       pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
+       pexMaxHold   = 25 // length of the drop hold-back buffer
+       pexMaxDelta  = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
+)
+
+// represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
+type pexEvent struct {
+       t    pexEventType
+       addr net.Addr
+       f    pp.PexPeerFlags
+}
+
+// records the event into the peer protocol PEX message
+func (e *pexEvent) put(m *pp.PexMsg) {
+       switch e.t {
+       case pexAdd:
+               m.Add(nodeAddr(e.addr), e.f)
+       case pexDrop:
+               m.Drop(nodeAddr(e.addr))
+       }
+}
+
+func nodeAddr(addr net.Addr) krpc.NodeAddr {
+       ipport, _ := tryIpPortFromNetAddr(addr)
+       return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}
+}
+
+// mainly for the krpc marshallers
+func shortestIP(ip net.IP) net.IP {
+       if ip4 := ip.To4(); ip4 != nil {
+               return ip4
+       }
+       return ip
+}
+
+// Per-torrent PEX state
+type pexState struct {
+       ev   []pexEvent // event feed, append-only
+       hold []pexEvent // delayed drops
+       nc   int        // net number of alive conns
+}
+
+func (s *pexState) Reset() {
+       s.ev = nil
+       s.hold = nil
+       s.nc = 0
+}
+
+func (s *pexState) Add(c *PeerConn) {
+       s.nc++
+       if s.nc >= pexTargAdded {
+               s.ev = append(s.ev, s.hold...)
+               s.hold = s.hold[:0]
+       }
+       e := c.pexEvent(pexAdd)
+       s.ev = append(s.ev, e)
+}
+
+func (s *pexState) Drop(c *PeerConn) {
+       e := c.pexEvent(pexDrop)
+       s.nc--
+       if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
+               s.hold = append(s.hold, e)
+       } else {
+               s.ev = append(s.ev, e)
+       }
+}
+
+// Generate a PEX message based on the event feed.
+// Also returns an index to pass to the subsequent calls, producing incremental deltas.
+func (s *pexState) Genmsg(start int) (*pp.PexMsg, int) {
+       m := new(pp.PexMsg)
+       n := start
+       for _, e := range s.ev[start:] {
+               if start > 0 && m.DeltaLen() >= pexMaxDelta {
+                       break
+               }
+               e.put(m)
+               n++
+       }
+       return m, n
+}
diff --git a/pex_test.go b/pex_test.go
new file mode 100644 (file)
index 0000000..c170f0d
--- /dev/null
@@ -0,0 +1,239 @@
+package torrent
+
+import (
+       "net"
+       "testing"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/anacrolix/dht/v2/krpc"
+       pp "github.com/anacrolix/torrent/peer_protocol"
+)
+
+var (
+       addrs = []net.Addr{
+               &net.TCPAddr{IP: net.IPv6loopback, Port: 4747},
+               &net.TCPAddr{IP: net.IPv6loopback, Port: 4748},
+               &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747},
+               &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748},
+       }
+       f = pp.PexOutgoingConn
+)
+
+func TestPexAdded(t *testing.T) {
+       t.Run("noHold", func(t *testing.T) {
+               s := new(pexState)
+               s.Add(&PeerConn{remoteAddr: addrs[0], outgoing: true})
+               targ := &pexState{
+                       ev: []pexEvent{
+                               pexEvent{pexAdd, addrs[0], pp.PexOutgoingConn},
+                       },
+                       nc: 1,
+               }
+               require.EqualValues(t, targ, s)
+       })
+       t.Run("belowTarg", func(t *testing.T) {
+               s := &pexState{
+                       hold: []pexEvent{
+                               pexEvent{pexDrop, addrs[1], 0},
+                       },
+                       nc: 0,
+               }
+               s.Add(&PeerConn{remoteAddr: addrs[0]})
+               targ := &pexState{
+                       hold: []pexEvent{
+                               pexEvent{pexDrop, addrs[1], 0},
+                       },
+                       ev: []pexEvent{
+                               pexEvent{pexAdd, addrs[0], 0},
+                       },
+                       nc: 1,
+               }
+               require.EqualValues(t, targ, s)
+       })
+       t.Run("aboveTarg", func(t *testing.T) {
+               holdAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
+               s := &pexState{
+                       hold: []pexEvent{
+                               pexEvent{pexDrop, holdAddr, 0},
+                       },
+                       nc: pexTargAdded,
+               }
+               s.Add(&PeerConn{remoteAddr: addrs[0]})
+               targ := &pexState{
+                       hold: []pexEvent{},
+                       ev: []pexEvent{
+                               pexEvent{pexDrop, holdAddr, 0},
+                               pexEvent{pexAdd, addrs[0], 0},
+                       },
+                       nc: pexTargAdded + 1,
+               }
+               require.EqualValues(t, targ, s)
+       })
+}
+
+func TestPexDropped(t *testing.T) {
+       t.Run("belowTarg", func(t *testing.T) {
+               s := &pexState{nc: 1}
+               s.Drop(&PeerConn{remoteAddr: addrs[0]})
+               targ := &pexState{
+                       hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
+                       nc:   0,
+               }
+               require.EqualValues(t, targ, s)
+       })
+       t.Run("aboveTarg", func(t *testing.T) {
+               s := &pexState{nc: pexTargAdded + 1}
+               s.Drop(&PeerConn{remoteAddr: addrs[0]})
+               targ := &pexState{
+                       ev: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
+                       nc: pexTargAdded,
+               }
+               require.EqualValues(t, targ, s)
+       })
+}
+
+func TestPexReset(t *testing.T) {
+       s := &pexState{
+               hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
+               ev:   []pexEvent{pexEvent{pexAdd, addrs[1], 0}},
+               nc:   1,
+       }
+       s.Reset()
+       targ := new(pexState)
+       require.EqualValues(t, targ, s)
+}
+
+var testcases = []struct {
+       name  string
+       in    *pexState
+       arg   int
+       targM *pp.PexMsg
+       targS int
+}{
+       {
+               name:  "empty",
+               in:    &pexState{},
+               arg:   0,
+               targM: &pp.PexMsg{},
+               targS: 0,
+       },
+       {
+               name: "add4",
+               in: &pexState{
+                       ev: []pexEvent{
+                               pexEvent{pexAdd, addrs[0], f},
+                               pexEvent{pexAdd, addrs[1], f},
+                               pexEvent{pexAdd, addrs[2], f},
+                               pexEvent{pexAdd, addrs[3], f},
+                       },
+               },
+               arg: 0,
+               targM: &pp.PexMsg{
+                       Added: krpc.CompactIPv4NodeAddrs{
+                               nodeAddr(addrs[2]),
+                               nodeAddr(addrs[3]),
+                       },
+                       AddedFlags: []pp.PexPeerFlags{f, f},
+                       Added6: krpc.CompactIPv6NodeAddrs{
+                               nodeAddr(addrs[0]),
+                               nodeAddr(addrs[1]),
+                       },
+                       Added6Flags: []pp.PexPeerFlags{f, f},
+               },
+               targS: 4,
+       },
+       {
+               name: "drop2",
+               arg:  0,
+               in: &pexState{
+                       ev: []pexEvent{
+                               pexEvent{pexDrop, addrs[0], f},
+                               pexEvent{pexDrop, addrs[2], f},
+                       },
+               },
+               targM: &pp.PexMsg{
+                       Dropped: krpc.CompactIPv4NodeAddrs{
+                               nodeAddr(addrs[2]),
+                       },
+                       Dropped6: krpc.CompactIPv6NodeAddrs{
+                               nodeAddr(addrs[0]),
+                       },
+               },
+               targS: 2,
+       },
+       {
+               name: "add2drop1",
+               arg:  0,
+               in: &pexState{
+                       ev: []pexEvent{
+                               pexEvent{pexAdd, addrs[0], f},
+                               pexEvent{pexAdd, addrs[1], f},
+                               pexEvent{pexDrop, addrs[0], f},
+                       },
+               },
+               targM: &pp.PexMsg{
+                       Added6: krpc.CompactIPv6NodeAddrs{
+                               nodeAddr(addrs[1]),
+                       },
+                       Added6Flags: []pp.PexPeerFlags{f},
+               },
+               targS: 3,
+       },
+       {
+               name: "delayed",
+               arg:  0,
+               in: &pexState{
+                       ev: []pexEvent{
+                               pexEvent{pexAdd, addrs[0], f},
+                               pexEvent{pexAdd, addrs[1], f},
+                               pexEvent{pexAdd, addrs[2], f},
+                       },
+                       hold: []pexEvent{
+                               pexEvent{pexDrop, addrs[0], f},
+                               pexEvent{pexDrop, addrs[2], f},
+                               pexEvent{pexDrop, addrs[1], f},
+                       },
+               },
+               targM: &pp.PexMsg{
+                       Added: krpc.CompactIPv4NodeAddrs{
+                               nodeAddr(addrs[2]),
+                       },
+                       AddedFlags: []pp.PexPeerFlags{f},
+                       Added6: krpc.CompactIPv6NodeAddrs{
+                               nodeAddr(addrs[0]),
+                               nodeAddr(addrs[1]),
+                       },
+                       Added6Flags: []pp.PexPeerFlags{f, f},
+               },
+               targS: 3,
+       },
+       {
+               name: "followup",
+               arg:  1,
+               in: &pexState{
+                       ev: []pexEvent{
+                               pexEvent{pexAdd, addrs[0], f},
+                               pexEvent{pexAdd, addrs[1], f},
+                       },
+               },
+               targM: &pp.PexMsg{
+                       Added6: krpc.CompactIPv6NodeAddrs{
+                               nodeAddr(addrs[1]),
+                       },
+                       Added6Flags: []pp.PexPeerFlags{f},
+               },
+               targS: 2,
+       },
+}
+
+func TestPexGenmsg(t *testing.T) {
+       for _, tc := range testcases {
+               t.Run(tc.name, func(t *testing.T) {
+                       s := tc.in
+                       m, seen := s.Genmsg(tc.arg)
+                       require.EqualValues(t, tc.targM, m)
+                       require.EqualValues(t, tc.targS, seen)
+               })
+       }
+}
index fc3353578a9f973ad0716b166dedf42745ebc992..3b75e25542c66056ea8ce1fa77b28ea4b3cddfb6 100644 (file)
@@ -7,7 +7,6 @@ import (
        "fmt"
        "io"
        "math/rand"
-       "net"
        "net/url"
        "sync"
        "text/tabwriter"
@@ -17,7 +16,6 @@ import (
        "github.com/davecgh/go-spew/spew"
 
        "github.com/anacrolix/dht/v2"
-       "github.com/anacrolix/dht/v2/krpc"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo"
        "github.com/anacrolix/missinggo/perf"
@@ -707,7 +705,7 @@ func (t *Torrent) close() (err error) {
        for conn := range t.conns {
                conn.close()
        }
-       // PEX wipe state here
+       t.pex.Reset()
        t.cl.event.Broadcast()
        t.pieceStateChanges.Close()
        t.updateWantPeersEvent()
@@ -1226,7 +1224,7 @@ func (t *Torrent) assertNoPendingRequests() {
 func (t *Torrent) dropConnection(c *PeerConn) {
        t.cl.event.Broadcast()
        if !t.cl.config.DisablePEX {
-               t.pex.dropped(c)
+               t.pex.Drop(c)
        }
        c.close()
        if t.deleteConnection(c) {
@@ -1501,7 +1499,7 @@ func (t *Torrent) addConnection(c *PeerConn) (err error) {
        }
        t.conns[c] = struct{}{}
        if !t.cl.config.DisablePEX {
-               t.pex.added(c)
+               t.pex.Add(c)
        }
        return nil
 }
@@ -1863,38 +1861,3 @@ func (t *Torrent) SetOnWriteChunkError(f func(error)) {
        defer t.cl.unlock()
        t.userOnWriteChunkErr = f
 }
-
-func nodeAddr(addr net.Addr) krpc.NodeAddr {
-       ipport, _ := tryIpPortFromNetAddr(addr)
-       ip := ipport.IP
-       if ip4 := ip.To4(); ip4 != nil && len(ip) != net.IPv4len {
-               ip = ip4
-       }
-       return krpc.NodeAddr{IP: ip, Port: ipport.Port}
-}
-
-func (t *Torrent) pexInitial() *pp.PexMsg {
-       // BUG FIXME PEX prepare 25 recently connected peers
-       tx := &pp.PexMsg{}
-       for c := range t.conns {
-               addr := nodeAddr(c.remoteAddr)
-               f := c.pexPeerFlags()
-               tx.AppendAdded(addr, f)
-       }
-       nc := len(tx.Added) + len(tx.Added6)
-       // BUG if nc < 50 {
-       if nc < 1 {
-               return nil
-       }
-       return tx
-}
-
-type pexState struct{}
-
-func (s *pexState) added(c *PeerConn) {
-       return
-}
-
-func (s *pexState) dropped(c *PeerConn) {
-       return
-}
index b8cea974f9ea5477a7fe6850b5312d6cfaf3366c..5ecd93b4378b52f6e5a4b9e521f7cdffcb49b5c4 100644 (file)
@@ -218,35 +218,3 @@ func TestTorrentMetainfoIncompleteMetadata(t *testing.T) {
        assert.False(t, tt.haveAllMetadataPieces())
        assert.Nil(t, tt.Metainfo().InfoBytes)
 }
-
-func TestTorrentPexInitial(t *testing.T) {
-       v := []*PeerConn{
-               &PeerConn{
-                       remoteAddr: &net.UDPAddr{IP:   net.IPv4(172, 17, 0, 2), Port: 5555},
-               },
-               &PeerConn{
-                       remoteAddr: &net.UDPAddr{
-                               IP:   net.IP{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
-                               Port: 11111,
-                       },
-                       outgoing: true,
-               },
-               &PeerConn{
-                       remoteAddr: &net.UDPAddr{IP: net.IP(nil), Port: 0},
-               },
-       }
-       torrent := &Torrent{conns: make(map[*PeerConn]struct{})}
-       for _, conn := range v {
-               torrent.conns[conn] = struct{}{}
-       }
-       tx := torrent.pexInitial()
-       require.NotNil(t, tx)
-       require.EqualValues(t, 1, len(tx.Added))
-       require.EqualValues(t, tx.Added[0].UDP().Network(), v[0].remoteAddr.Network())
-       require.EqualValues(t, tx.Added[0].UDP().String(), v[0].remoteAddr.String())
-       require.Zero(t, tx.AddedFlags[0])
-       require.EqualValues(t, 1, len(tx.Added6))
-       require.EqualValues(t, tx.Added6[0].UDP().Network(), v[1].remoteAddr.Network())
-       require.EqualValues(t, tx.Added6[0].UDP().String(), v[1].remoteAddr.String())
-       require.NotZero(t, tx.Added6Flags[0]&pp.PexOutgoingConn)
-}