From 93430aa01a6ea619a00573f488b3f3dce61aa30a Mon Sep 17 00:00:00 2001 From: Yaroslav Kolomiiets Date: Tue, 31 Mar 2020 21:14:43 +0100 Subject: [PATCH] PEX: add connection tracking --- client.go | 18 ++- peer_protocol/pex.go | 79 +++++++++++-- peer_protocol/pex_test.go | 165 +++++++++++++++++--------- peerconn.go | 16 ++- pex.go | 100 ++++++++++++++++ pex_test.go | 239 ++++++++++++++++++++++++++++++++++++++ torrent.go | 43 +------ torrent_test.go | 32 ----- 8 files changed, 545 insertions(+), 147 deletions(-) create mode 100644 pex.go create mode 100644 pex_test.go diff --git a/client.go b/client.go index b5b76145..bb2bf4ae 100644 --- a/client.go +++ b/client.go @@ -935,21 +935,19 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { } func (cl *Client) sendInitialPEX(conn *PeerConn, t *Torrent) { - peerPexExtendedId, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex] + xid, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex] if !ok { - // peer did not advertise support for the PEX extension - conn.logger.Printf("no PEX support - not sending initial") return } - pexMsg := t.pexInitial() - if pexMsg == nil { - // not enough peers to share — e.g. len(t.conns < 50) - conn.logger.Printf("skipping PEX initial") + m, seq := t.pex.Genmsg(0) + conn.pexSeq = seq + if m.Len() == 0 { + cl.logger.Printf("no initial PEX this time") + // FIXME see how can we schedule another initial for later return } - log.Printf("preparing PEX initial message: %v", pexMsg) - tx := pexMsg.Message(peerPexExtendedId) - conn.post(tx) + conn.logger.Printf("sending initial PEX message: %v", m) + conn.post(m.Message(xid)) } func (cl *Client) dhtPort() (ret uint16) { diff --git a/peer_protocol/pex.go b/peer_protocol/pex.go index 8d091061..52801865 100644 --- a/peer_protocol/pex.go +++ b/peer_protocol/pex.go @@ -16,26 +16,89 @@ type PexMsg struct { Dropped6 krpc.CompactIPv6NodeAddrs `bencode:"dropped6"` } -func (m *PexMsg) AppendAdded(addr krpc.NodeAddr, f PexPeerFlags) { - ip := addr.IP - if ip.To4() != nil { +func addrEqual(a, b *krpc.NodeAddr) bool { + return a.IP.Equal(b.IP) && a.Port == b.Port +} + +func addrIndex(v []krpc.NodeAddr, a *krpc.NodeAddr) int { + for i := range v { + if addrEqual(&v[i], a) { + return i + } + } + return -1 +} + +func (m *PexMsg) Add(addr krpc.NodeAddr, f PexPeerFlags) { + if addr.IP.To4() != nil { + if addrIndex(m.Added.NodeAddrs(), &addr) >= 0 { + // already added + return + } + if i := addrIndex(m.Dropped.NodeAddrs(), &addr); i >= 0 { + // on the dropped list - cancel out + m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...) + return + } m.Added = append(m.Added, addr) m.AddedFlags = append(m.AddedFlags, f) - } else if len(ip) == net.IPv6len { + } else if len(addr.IP) == net.IPv6len { + if addrIndex(m.Added6.NodeAddrs(), &addr) >= 0 { + // already added + return + } + if i := addrIndex(m.Dropped6.NodeAddrs(), &addr); i >= 0 { + // on the dropped list - cancel out + m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...) + return + } m.Added6 = append(m.Added6, addr) m.Added6Flags = append(m.Added6Flags, f) } } -func (m *PexMsg) AppendDropped(addr krpc.NodeAddr) { - ip := addr.IP - if ip.To4() != nil { +func (m *PexMsg) Drop(addr krpc.NodeAddr) { + if addr.IP.To4() != nil { + if addrIndex(m.Dropped.NodeAddrs(), &addr) >= 0 { + // already dropped + return + } + if i := addrIndex(m.Added.NodeAddrs(), &addr); i >= 0 { + // on the added list - cancel out + m.Added = append(m.Added[:i], m.Added[i+1:]...) + m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...) + return + } m.Dropped = append(m.Dropped, addr) - } else if len(ip) == net.IPv6len { + } else if len(addr.IP) == net.IPv6len { + if addrIndex(m.Dropped6.NodeAddrs(), &addr) >= 0 { + // already dropped + return + } + if i := addrIndex(m.Added6.NodeAddrs(), &addr); i >= 0 { + // on the added list - cancel out + m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...) + m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...) + return + } m.Dropped6 = append(m.Dropped6, addr) } } +func (m *PexMsg) Len() int { + return len(m.Added)+len(m.Added6)+len(m.Dropped)+len(m.Dropped6) +} + +// DeltaLen returns max of {added+added6, dropped+dropped6} +func (m *PexMsg) DeltaLen() int { + lenAdded := len(m.Added)+len(m.Added6) + lenDropped := len(m.Dropped)+len(m.Dropped6) + if lenAdded > lenDropped { + return lenAdded + } + return lenDropped +} + func (pexMsg *PexMsg) Message(pexExtendedId ExtensionNumber) Message { payload := bencode.MustMarshal(pexMsg) return Message{ diff --git a/peer_protocol/pex_test.go b/peer_protocol/pex_test.go index 30e0d363..7a493ead 100644 --- a/peer_protocol/pex_test.go +++ b/peer_protocol/pex_test.go @@ -29,77 +29,138 @@ func TestEmptyPexMsg(t *testing.T) { require.NoError(t, bencode.Unmarshal(b, &pm)) } -func TestPexAppendAdded(t *testing.T) { +func TestPexAdd(t *testing.T) { + addrs4 := []krpc.NodeAddr{ + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0 + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1 + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2 + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3 + } + addrs6 := []krpc.NodeAddr{ + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0 + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1 + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2 + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3 + } + f := PexPrefersEncryption | PexOutgoingConn + t.Run("ipv4", func(t *testing.T) { - addr := krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747} - f := PexPrefersEncryption | PexOutgoingConn - xm := PexMsg{} - xm.AppendAdded(addr, f) - require.EqualValues(t, len(xm.Added), 1) - require.EqualValues(t, len(xm.AddedFlags), 1) - require.EqualValues(t, len(xm.Added6), 0) - require.EqualValues(t, len(xm.Added6Flags), 0) - require.True(t, xm.Added[0].IP.Equal(addr.IP), "IPs should match") - require.EqualValues(t, xm.Added[0].Port, addr.Port) - require.EqualValues(t, xm.AddedFlags[0], f) + addrs := addrs4 + m := new(PexMsg) + m.Drop(addrs[0]) + m.Add(addrs[1], f) + for _, addr := range addrs { + m.Add(addr, f) + } + targ := &PexMsg{ + Added: krpc.CompactIPv4NodeAddrs{ + addrs[1], + addrs[2], + addrs[3], + }, + AddedFlags: []PexPeerFlags{f, f, f}, + Dropped: krpc.CompactIPv4NodeAddrs{}, + } + require.EqualValues(t, targ, m) }) t.Run("ipv6", func(t *testing.T) { - addr := krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747} - f := PexPrefersEncryption | PexOutgoingConn - xm := PexMsg{} - xm.AppendAdded(addr, f) - require.EqualValues(t, len(xm.Added), 0) - require.EqualValues(t, len(xm.AddedFlags), 0) - require.EqualValues(t, len(xm.Added6), 1) - require.EqualValues(t, len(xm.Added6Flags), 1) - require.True(t, xm.Added6[0].IP.Equal(addr.IP), "IPs should match") - require.EqualValues(t, xm.Added6[0].Port, addr.Port) - require.EqualValues(t, xm.Added6Flags[0], f) + addrs := addrs6 + m := new(PexMsg) + m.Drop(addrs[0]) + m.Add(addrs[1], f) + for _, addr := range addrs { + m.Add(addr, f) + } + targ := &PexMsg{ + Added6: krpc.CompactIPv6NodeAddrs{ + addrs[1], + addrs[2], + addrs[3], + }, + Added6Flags: []PexPeerFlags{f, f, f}, + Dropped6: krpc.CompactIPv6NodeAddrs{}, + } + require.EqualValues(t, targ, m) }) - t.Run("unspecified", func(t *testing.T) { + t.Run("empty", func(t *testing.T) { addr := krpc.NodeAddr{} - xm := PexMsg{} - xm.AppendAdded(addr, 0) - require.EqualValues(t, len(xm.Added), 0) - require.EqualValues(t, len(xm.AddedFlags), 0) - require.EqualValues(t, len(xm.Added6), 0) - require.EqualValues(t, len(xm.Added6Flags), 0) + xm := new(PexMsg) + xm.Add(addr, f) + require.EqualValues(t, 0, len(xm.Added)) + require.EqualValues(t, 0, len(xm.AddedFlags)) + require.EqualValues(t, 0, len(xm.Added6)) + require.EqualValues(t, 0, len(xm.Added6Flags)) }) } -func TestPexAppendDropped(t *testing.T) { +func TestPexDrop(t *testing.T) { + addrs4 := []krpc.NodeAddr{ + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0 + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1 + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2 + krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3 + } + addrs6 := []krpc.NodeAddr{ + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0 + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1 + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2 + krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3 + } + f := PexPrefersEncryption | PexOutgoingConn + t.Run("ipv4", func(t *testing.T) { - addr := krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747} - xm := PexMsg{} - xm.AppendDropped(addr) - require.EqualValues(t, len(xm.Dropped), 1) - require.EqualValues(t, len(xm.Dropped6), 0) - require.True(t, xm.Dropped[0].IP.Equal(addr.IP), "IPs should match") - require.EqualValues(t, xm.Dropped[0].Port, addr.Port) + addrs := addrs4 + m := new(PexMsg) + m.Add(addrs[0], f) + m.Drop(addrs[1]) + for _, addr := range addrs { + m.Drop(addr) + } + targ := &PexMsg{ + AddedFlags: []PexPeerFlags{}, + Added: krpc.CompactIPv4NodeAddrs{}, + Dropped: krpc.CompactIPv4NodeAddrs{ + addrs[1], + addrs[2], + addrs[3], + }, + } + require.EqualValues(t, targ, m) }) t.Run("ipv6", func(t *testing.T) { - addr := krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747} - xm := PexMsg{} - xm.AppendDropped(addr) - require.EqualValues(t, len(xm.Dropped), 0) - require.EqualValues(t, len(xm.Dropped6), 1) - require.True(t, xm.Dropped6[0].IP.Equal(addr.IP), "IPs should match") - require.EqualValues(t, xm.Dropped6[0].Port, addr.Port) + addrs := addrs6 + m := new(PexMsg) + m.Add(addrs[0], f) + m.Drop(addrs[1]) + for _, addr := range addrs { + m.Drop(addr) + } + targ := &PexMsg{ + Added6Flags: []PexPeerFlags{}, + Added6: krpc.CompactIPv6NodeAddrs{}, + Dropped6: krpc.CompactIPv6NodeAddrs{ + addrs[1], + addrs[2], + addrs[3], + }, + } + require.EqualValues(t, targ, m) }) - t.Run("unspecified", func(t *testing.T) { + t.Run("empty", func(t *testing.T) { addr := krpc.NodeAddr{} - xm := PexMsg{} - xm.AppendDropped(addr) - require.EqualValues(t, len(xm.Dropped), 0) - require.EqualValues(t, len(xm.Dropped6), 0) + xm := new(PexMsg) + xm.Drop(addr) + require.EqualValues(t, 0, len(xm.Dropped)) + require.EqualValues(t, 0, len(xm.Dropped6)) }) } func TestMarshalPexMessage(t *testing.T) { addr := krpc.NodeAddr{IP: net.IP{127, 0, 0, 1}, Port: 0x55aa} f := PexPrefersEncryption | PexOutgoingConn - pm := PexMsg{} - pm.AppendAdded(addr, f) + pm := new(PexMsg) + pm.Added = append(pm.Added, addr) + pm.AddedFlags = append(pm.AddedFlags, f) b, err := bencode.Marshal(pm) require.NoError(t, err) diff --git a/peerconn.go b/peerconn.go index f90dfd4f..be8018d6 100644 --- a/peerconn.go +++ b/peerconn.go @@ -85,13 +85,14 @@ type PeerConn struct { // response. metadataRequests []bool sentHaves bitmap.Bitmap + pexSeq int // Stuff controlled by the remote peer. - PeerID PeerID - peerInterested bool - peerChoking bool - peerRequests map[request]struct{} - PeerExtensionBytes pp.PeerExtensionBits + PeerID PeerID + peerInterested bool + peerChoking bool + peerRequests map[request]struct{} + PeerExtensionBytes pp.PeerExtensionBits PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake PeerListenPort int // The pieces the peer has claimed to have. @@ -1493,6 +1494,11 @@ func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags { return f } +func (c *PeerConn) pexEvent(t pexEventType) pexEvent { + f := c.pexPeerFlags() + return pexEvent{t, c.remoteAddr, f} +} + func (c *PeerConn) String() string { return fmt.Sprintf("connection %p", c) } diff --git a/pex.go b/pex.go new file mode 100644 index 00000000..5f22e82b --- /dev/null +++ b/pex.go @@ -0,0 +1,100 @@ +package torrent + +import ( + "net" + + "github.com/anacrolix/dht/v2/krpc" + pp "github.com/anacrolix/torrent/peer_protocol" +) + +type pexEventType int + +const ( + pexAdd pexEventType = iota + pexDrop +) + +// internal, based on BEP11 +const ( + pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this + pexMaxHold = 25 // length of the drop hold-back buffer + pexMaxDelta = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message +) + +// represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event +type pexEvent struct { + t pexEventType + addr net.Addr + f pp.PexPeerFlags +} + +// records the event into the peer protocol PEX message +func (e *pexEvent) put(m *pp.PexMsg) { + switch e.t { + case pexAdd: + m.Add(nodeAddr(e.addr), e.f) + case pexDrop: + m.Drop(nodeAddr(e.addr)) + } +} + +func nodeAddr(addr net.Addr) krpc.NodeAddr { + ipport, _ := tryIpPortFromNetAddr(addr) + return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port} +} + +// mainly for the krpc marshallers +func shortestIP(ip net.IP) net.IP { + if ip4 := ip.To4(); ip4 != nil { + return ip4 + } + return ip +} + +// Per-torrent PEX state +type pexState struct { + ev []pexEvent // event feed, append-only + hold []pexEvent // delayed drops + nc int // net number of alive conns +} + +func (s *pexState) Reset() { + s.ev = nil + s.hold = nil + s.nc = 0 +} + +func (s *pexState) Add(c *PeerConn) { + s.nc++ + if s.nc >= pexTargAdded { + s.ev = append(s.ev, s.hold...) + s.hold = s.hold[:0] + } + e := c.pexEvent(pexAdd) + s.ev = append(s.ev, e) +} + +func (s *pexState) Drop(c *PeerConn) { + e := c.pexEvent(pexDrop) + s.nc-- + if s.nc < pexTargAdded && len(s.hold) < pexMaxHold { + s.hold = append(s.hold, e) + } else { + s.ev = append(s.ev, e) + } +} + +// Generate a PEX message based on the event feed. +// Also returns an index to pass to the subsequent calls, producing incremental deltas. +func (s *pexState) Genmsg(start int) (*pp.PexMsg, int) { + m := new(pp.PexMsg) + n := start + for _, e := range s.ev[start:] { + if start > 0 && m.DeltaLen() >= pexMaxDelta { + break + } + e.put(m) + n++ + } + return m, n +} diff --git a/pex_test.go b/pex_test.go new file mode 100644 index 00000000..c170f0d7 --- /dev/null +++ b/pex_test.go @@ -0,0 +1,239 @@ +package torrent + +import ( + "net" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/anacrolix/dht/v2/krpc" + pp "github.com/anacrolix/torrent/peer_protocol" +) + +var ( + addrs = []net.Addr{ + &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}, + &net.TCPAddr{IP: net.IPv6loopback, Port: 4748}, + &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, + &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, + } + f = pp.PexOutgoingConn +) + +func TestPexAdded(t *testing.T) { + t.Run("noHold", func(t *testing.T) { + s := new(pexState) + s.Add(&PeerConn{remoteAddr: addrs[0], outgoing: true}) + targ := &pexState{ + ev: []pexEvent{ + pexEvent{pexAdd, addrs[0], pp.PexOutgoingConn}, + }, + nc: 1, + } + require.EqualValues(t, targ, s) + }) + t.Run("belowTarg", func(t *testing.T) { + s := &pexState{ + hold: []pexEvent{ + pexEvent{pexDrop, addrs[1], 0}, + }, + nc: 0, + } + s.Add(&PeerConn{remoteAddr: addrs[0]}) + targ := &pexState{ + hold: []pexEvent{ + pexEvent{pexDrop, addrs[1], 0}, + }, + ev: []pexEvent{ + pexEvent{pexAdd, addrs[0], 0}, + }, + nc: 1, + } + require.EqualValues(t, targ, s) + }) + t.Run("aboveTarg", func(t *testing.T) { + holdAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} + s := &pexState{ + hold: []pexEvent{ + pexEvent{pexDrop, holdAddr, 0}, + }, + nc: pexTargAdded, + } + s.Add(&PeerConn{remoteAddr: addrs[0]}) + targ := &pexState{ + hold: []pexEvent{}, + ev: []pexEvent{ + pexEvent{pexDrop, holdAddr, 0}, + pexEvent{pexAdd, addrs[0], 0}, + }, + nc: pexTargAdded + 1, + } + require.EqualValues(t, targ, s) + }) +} + +func TestPexDropped(t *testing.T) { + t.Run("belowTarg", func(t *testing.T) { + s := &pexState{nc: 1} + s.Drop(&PeerConn{remoteAddr: addrs[0]}) + targ := &pexState{ + hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}}, + nc: 0, + } + require.EqualValues(t, targ, s) + }) + t.Run("aboveTarg", func(t *testing.T) { + s := &pexState{nc: pexTargAdded + 1} + s.Drop(&PeerConn{remoteAddr: addrs[0]}) + targ := &pexState{ + ev: []pexEvent{pexEvent{pexDrop, addrs[0], 0}}, + nc: pexTargAdded, + } + require.EqualValues(t, targ, s) + }) +} + +func TestPexReset(t *testing.T) { + s := &pexState{ + hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}}, + ev: []pexEvent{pexEvent{pexAdd, addrs[1], 0}}, + nc: 1, + } + s.Reset() + targ := new(pexState) + require.EqualValues(t, targ, s) +} + +var testcases = []struct { + name string + in *pexState + arg int + targM *pp.PexMsg + targS int +}{ + { + name: "empty", + in: &pexState{}, + arg: 0, + targM: &pp.PexMsg{}, + targS: 0, + }, + { + name: "add4", + in: &pexState{ + ev: []pexEvent{ + pexEvent{pexAdd, addrs[0], f}, + pexEvent{pexAdd, addrs[1], f}, + pexEvent{pexAdd, addrs[2], f}, + pexEvent{pexAdd, addrs[3], f}, + }, + }, + arg: 0, + targM: &pp.PexMsg{ + Added: krpc.CompactIPv4NodeAddrs{ + nodeAddr(addrs[2]), + nodeAddr(addrs[3]), + }, + AddedFlags: []pp.PexPeerFlags{f, f}, + Added6: krpc.CompactIPv6NodeAddrs{ + nodeAddr(addrs[0]), + nodeAddr(addrs[1]), + }, + Added6Flags: []pp.PexPeerFlags{f, f}, + }, + targS: 4, + }, + { + name: "drop2", + arg: 0, + in: &pexState{ + ev: []pexEvent{ + pexEvent{pexDrop, addrs[0], f}, + pexEvent{pexDrop, addrs[2], f}, + }, + }, + targM: &pp.PexMsg{ + Dropped: krpc.CompactIPv4NodeAddrs{ + nodeAddr(addrs[2]), + }, + Dropped6: krpc.CompactIPv6NodeAddrs{ + nodeAddr(addrs[0]), + }, + }, + targS: 2, + }, + { + name: "add2drop1", + arg: 0, + in: &pexState{ + ev: []pexEvent{ + pexEvent{pexAdd, addrs[0], f}, + pexEvent{pexAdd, addrs[1], f}, + pexEvent{pexDrop, addrs[0], f}, + }, + }, + targM: &pp.PexMsg{ + Added6: krpc.CompactIPv6NodeAddrs{ + nodeAddr(addrs[1]), + }, + Added6Flags: []pp.PexPeerFlags{f}, + }, + targS: 3, + }, + { + name: "delayed", + arg: 0, + in: &pexState{ + ev: []pexEvent{ + pexEvent{pexAdd, addrs[0], f}, + pexEvent{pexAdd, addrs[1], f}, + pexEvent{pexAdd, addrs[2], f}, + }, + hold: []pexEvent{ + pexEvent{pexDrop, addrs[0], f}, + pexEvent{pexDrop, addrs[2], f}, + pexEvent{pexDrop, addrs[1], f}, + }, + }, + targM: &pp.PexMsg{ + Added: krpc.CompactIPv4NodeAddrs{ + nodeAddr(addrs[2]), + }, + AddedFlags: []pp.PexPeerFlags{f}, + Added6: krpc.CompactIPv6NodeAddrs{ + nodeAddr(addrs[0]), + nodeAddr(addrs[1]), + }, + Added6Flags: []pp.PexPeerFlags{f, f}, + }, + targS: 3, + }, + { + name: "followup", + arg: 1, + in: &pexState{ + ev: []pexEvent{ + pexEvent{pexAdd, addrs[0], f}, + pexEvent{pexAdd, addrs[1], f}, + }, + }, + targM: &pp.PexMsg{ + Added6: krpc.CompactIPv6NodeAddrs{ + nodeAddr(addrs[1]), + }, + Added6Flags: []pp.PexPeerFlags{f}, + }, + targS: 2, + }, +} + +func TestPexGenmsg(t *testing.T) { + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + s := tc.in + m, seen := s.Genmsg(tc.arg) + require.EqualValues(t, tc.targM, m) + require.EqualValues(t, tc.targS, seen) + }) + } +} diff --git a/torrent.go b/torrent.go index fc335357..3b75e255 100644 --- a/torrent.go +++ b/torrent.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "math/rand" - "net" "net/url" "sync" "text/tabwriter" @@ -17,7 +16,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/anacrolix/dht/v2" - "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/log" "github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo/perf" @@ -707,7 +705,7 @@ func (t *Torrent) close() (err error) { for conn := range t.conns { conn.close() } - // PEX wipe state here + t.pex.Reset() t.cl.event.Broadcast() t.pieceStateChanges.Close() t.updateWantPeersEvent() @@ -1226,7 +1224,7 @@ func (t *Torrent) assertNoPendingRequests() { func (t *Torrent) dropConnection(c *PeerConn) { t.cl.event.Broadcast() if !t.cl.config.DisablePEX { - t.pex.dropped(c) + t.pex.Drop(c) } c.close() if t.deleteConnection(c) { @@ -1501,7 +1499,7 @@ func (t *Torrent) addConnection(c *PeerConn) (err error) { } t.conns[c] = struct{}{} if !t.cl.config.DisablePEX { - t.pex.added(c) + t.pex.Add(c) } return nil } @@ -1863,38 +1861,3 @@ func (t *Torrent) SetOnWriteChunkError(f func(error)) { defer t.cl.unlock() t.userOnWriteChunkErr = f } - -func nodeAddr(addr net.Addr) krpc.NodeAddr { - ipport, _ := tryIpPortFromNetAddr(addr) - ip := ipport.IP - if ip4 := ip.To4(); ip4 != nil && len(ip) != net.IPv4len { - ip = ip4 - } - return krpc.NodeAddr{IP: ip, Port: ipport.Port} -} - -func (t *Torrent) pexInitial() *pp.PexMsg { - // BUG FIXME PEX prepare 25 recently connected peers - tx := &pp.PexMsg{} - for c := range t.conns { - addr := nodeAddr(c.remoteAddr) - f := c.pexPeerFlags() - tx.AppendAdded(addr, f) - } - nc := len(tx.Added) + len(tx.Added6) - // BUG if nc < 50 { - if nc < 1 { - return nil - } - return tx -} - -type pexState struct{} - -func (s *pexState) added(c *PeerConn) { - return -} - -func (s *pexState) dropped(c *PeerConn) { - return -} diff --git a/torrent_test.go b/torrent_test.go index b8cea974..5ecd93b4 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -218,35 +218,3 @@ func TestTorrentMetainfoIncompleteMetadata(t *testing.T) { assert.False(t, tt.haveAllMetadataPieces()) assert.Nil(t, tt.Metainfo().InfoBytes) } - -func TestTorrentPexInitial(t *testing.T) { - v := []*PeerConn{ - &PeerConn{ - remoteAddr: &net.UDPAddr{IP: net.IPv4(172, 17, 0, 2), Port: 5555}, - }, - &PeerConn{ - remoteAddr: &net.UDPAddr{ - IP: net.IP{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, - Port: 11111, - }, - outgoing: true, - }, - &PeerConn{ - remoteAddr: &net.UDPAddr{IP: net.IP(nil), Port: 0}, - }, - } - torrent := &Torrent{conns: make(map[*PeerConn]struct{})} - for _, conn := range v { - torrent.conns[conn] = struct{}{} - } - tx := torrent.pexInitial() - require.NotNil(t, tx) - require.EqualValues(t, 1, len(tx.Added)) - require.EqualValues(t, tx.Added[0].UDP().Network(), v[0].remoteAddr.Network()) - require.EqualValues(t, tx.Added[0].UDP().String(), v[0].remoteAddr.String()) - require.Zero(t, tx.AddedFlags[0]) - require.EqualValues(t, 1, len(tx.Added6)) - require.EqualValues(t, tx.Added6[0].UDP().Network(), v[1].remoteAddr.Network()) - require.EqualValues(t, tx.Added6[0].UDP().String(), v[1].remoteAddr.String()) - require.NotZero(t, tx.Added6Flags[0]&pp.PexOutgoingConn) -} -- 2.44.0