}
func (cl *Client) sendInitialPEX(conn *PeerConn, t *Torrent) {
- peerPexExtendedId, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex]
+ xid, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex]
if !ok {
- // peer did not advertise support for the PEX extension
- conn.logger.Printf("no PEX support - not sending initial")
return
}
- pexMsg := t.pexInitial()
- if pexMsg == nil {
- // not enough peers to share — e.g. len(t.conns < 50)
- conn.logger.Printf("skipping PEX initial")
+ m, seq := t.pex.Genmsg(0)
+ conn.pexSeq = seq
+ if m.Len() == 0 {
+ cl.logger.Printf("no initial PEX this time")
+ // FIXME see how can we schedule another initial for later
return
}
- log.Printf("preparing PEX initial message: %v", pexMsg)
- tx := pexMsg.Message(peerPexExtendedId)
- conn.post(tx)
+ conn.logger.Printf("sending initial PEX message: %v", m)
+ conn.post(m.Message(xid))
}
func (cl *Client) dhtPort() (ret uint16) {
Dropped6 krpc.CompactIPv6NodeAddrs `bencode:"dropped6"`
}
-func (m *PexMsg) AppendAdded(addr krpc.NodeAddr, f PexPeerFlags) {
- ip := addr.IP
- if ip.To4() != nil {
+func addrEqual(a, b *krpc.NodeAddr) bool {
+ return a.IP.Equal(b.IP) && a.Port == b.Port
+}
+
+func addrIndex(v []krpc.NodeAddr, a *krpc.NodeAddr) int {
+ for i := range v {
+ if addrEqual(&v[i], a) {
+ return i
+ }
+ }
+ return -1
+}
+
+func (m *PexMsg) Add(addr krpc.NodeAddr, f PexPeerFlags) {
+ if addr.IP.To4() != nil {
+ if addrIndex(m.Added.NodeAddrs(), &addr) >= 0 {
+ // already added
+ return
+ }
+ if i := addrIndex(m.Dropped.NodeAddrs(), &addr); i >= 0 {
+ // on the dropped list - cancel out
+ m.Dropped = append(m.Dropped[:i], m.Dropped[i+1:]...)
+ return
+ }
m.Added = append(m.Added, addr)
m.AddedFlags = append(m.AddedFlags, f)
- } else if len(ip) == net.IPv6len {
+ } else if len(addr.IP) == net.IPv6len {
+ if addrIndex(m.Added6.NodeAddrs(), &addr) >= 0 {
+ // already added
+ return
+ }
+ if i := addrIndex(m.Dropped6.NodeAddrs(), &addr); i >= 0 {
+ // on the dropped list - cancel out
+ m.Dropped6 = append(m.Dropped6[:i], m.Dropped6[i+1:]...)
+ return
+ }
m.Added6 = append(m.Added6, addr)
m.Added6Flags = append(m.Added6Flags, f)
}
}
-func (m *PexMsg) AppendDropped(addr krpc.NodeAddr) {
- ip := addr.IP
- if ip.To4() != nil {
+func (m *PexMsg) Drop(addr krpc.NodeAddr) {
+ if addr.IP.To4() != nil {
+ if addrIndex(m.Dropped.NodeAddrs(), &addr) >= 0 {
+ // already dropped
+ return
+ }
+ if i := addrIndex(m.Added.NodeAddrs(), &addr); i >= 0 {
+ // on the added list - cancel out
+ m.Added = append(m.Added[:i], m.Added[i+1:]...)
+ m.AddedFlags = append(m.AddedFlags[:i], m.AddedFlags[i+1:]...)
+ return
+ }
m.Dropped = append(m.Dropped, addr)
- } else if len(ip) == net.IPv6len {
+ } else if len(addr.IP) == net.IPv6len {
+ if addrIndex(m.Dropped6.NodeAddrs(), &addr) >= 0 {
+ // already dropped
+ return
+ }
+ if i := addrIndex(m.Added6.NodeAddrs(), &addr); i >= 0 {
+ // on the added list - cancel out
+ m.Added6 = append(m.Added6[:i], m.Added6[i+1:]...)
+ m.Added6Flags = append(m.Added6Flags[:i], m.Added6Flags[i+1:]...)
+ return
+ }
m.Dropped6 = append(m.Dropped6, addr)
}
}
+func (m *PexMsg) Len() int {
+ return len(m.Added)+len(m.Added6)+len(m.Dropped)+len(m.Dropped6)
+}
+
+// DeltaLen returns max of {added+added6, dropped+dropped6}
+func (m *PexMsg) DeltaLen() int {
+ lenAdded := len(m.Added)+len(m.Added6)
+ lenDropped := len(m.Dropped)+len(m.Dropped6)
+ if lenAdded > lenDropped {
+ return lenAdded
+ }
+ return lenDropped
+}
+
func (pexMsg *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
payload := bencode.MustMarshal(pexMsg)
return Message{
require.NoError(t, bencode.Unmarshal(b, &pm))
}
-func TestPexAppendAdded(t *testing.T) {
+func TestPexAdd(t *testing.T) {
+ addrs4 := []krpc.NodeAddr{
+ krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0
+ krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1
+ krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2
+ krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3
+ }
+ addrs6 := []krpc.NodeAddr{
+ krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0
+ krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1
+ krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2
+ krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3
+ }
+ f := PexPrefersEncryption | PexOutgoingConn
+
t.Run("ipv4", func(t *testing.T) {
- addr := krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}
- f := PexPrefersEncryption | PexOutgoingConn
- xm := PexMsg{}
- xm.AppendAdded(addr, f)
- require.EqualValues(t, len(xm.Added), 1)
- require.EqualValues(t, len(xm.AddedFlags), 1)
- require.EqualValues(t, len(xm.Added6), 0)
- require.EqualValues(t, len(xm.Added6Flags), 0)
- require.True(t, xm.Added[0].IP.Equal(addr.IP), "IPs should match")
- require.EqualValues(t, xm.Added[0].Port, addr.Port)
- require.EqualValues(t, xm.AddedFlags[0], f)
+ addrs := addrs4
+ m := new(PexMsg)
+ m.Drop(addrs[0])
+ m.Add(addrs[1], f)
+ for _, addr := range addrs {
+ m.Add(addr, f)
+ }
+ targ := &PexMsg{
+ Added: krpc.CompactIPv4NodeAddrs{
+ addrs[1],
+ addrs[2],
+ addrs[3],
+ },
+ AddedFlags: []PexPeerFlags{f, f, f},
+ Dropped: krpc.CompactIPv4NodeAddrs{},
+ }
+ require.EqualValues(t, targ, m)
})
t.Run("ipv6", func(t *testing.T) {
- addr := krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}
- f := PexPrefersEncryption | PexOutgoingConn
- xm := PexMsg{}
- xm.AppendAdded(addr, f)
- require.EqualValues(t, len(xm.Added), 0)
- require.EqualValues(t, len(xm.AddedFlags), 0)
- require.EqualValues(t, len(xm.Added6), 1)
- require.EqualValues(t, len(xm.Added6Flags), 1)
- require.True(t, xm.Added6[0].IP.Equal(addr.IP), "IPs should match")
- require.EqualValues(t, xm.Added6[0].Port, addr.Port)
- require.EqualValues(t, xm.Added6Flags[0], f)
+ addrs := addrs6
+ m := new(PexMsg)
+ m.Drop(addrs[0])
+ m.Add(addrs[1], f)
+ for _, addr := range addrs {
+ m.Add(addr, f)
+ }
+ targ := &PexMsg{
+ Added6: krpc.CompactIPv6NodeAddrs{
+ addrs[1],
+ addrs[2],
+ addrs[3],
+ },
+ Added6Flags: []PexPeerFlags{f, f, f},
+ Dropped6: krpc.CompactIPv6NodeAddrs{},
+ }
+ require.EqualValues(t, targ, m)
})
- t.Run("unspecified", func(t *testing.T) {
+ t.Run("empty", func(t *testing.T) {
addr := krpc.NodeAddr{}
- xm := PexMsg{}
- xm.AppendAdded(addr, 0)
- require.EqualValues(t, len(xm.Added), 0)
- require.EqualValues(t, len(xm.AddedFlags), 0)
- require.EqualValues(t, len(xm.Added6), 0)
- require.EqualValues(t, len(xm.Added6Flags), 0)
+ xm := new(PexMsg)
+ xm.Add(addr, f)
+ require.EqualValues(t, 0, len(xm.Added))
+ require.EqualValues(t, 0, len(xm.AddedFlags))
+ require.EqualValues(t, 0, len(xm.Added6))
+ require.EqualValues(t, 0, len(xm.Added6Flags))
})
}
-func TestPexAppendDropped(t *testing.T) {
+func TestPexDrop(t *testing.T) {
+ addrs4 := []krpc.NodeAddr{
+ krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0
+ krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1
+ krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2
+ krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3
+ }
+ addrs6 := []krpc.NodeAddr{
+ krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0
+ krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1
+ krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2
+ krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3
+ }
+ f := PexPrefersEncryption | PexOutgoingConn
+
t.Run("ipv4", func(t *testing.T) {
- addr := krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}
- xm := PexMsg{}
- xm.AppendDropped(addr)
- require.EqualValues(t, len(xm.Dropped), 1)
- require.EqualValues(t, len(xm.Dropped6), 0)
- require.True(t, xm.Dropped[0].IP.Equal(addr.IP), "IPs should match")
- require.EqualValues(t, xm.Dropped[0].Port, addr.Port)
+ addrs := addrs4
+ m := new(PexMsg)
+ m.Add(addrs[0], f)
+ m.Drop(addrs[1])
+ for _, addr := range addrs {
+ m.Drop(addr)
+ }
+ targ := &PexMsg{
+ AddedFlags: []PexPeerFlags{},
+ Added: krpc.CompactIPv4NodeAddrs{},
+ Dropped: krpc.CompactIPv4NodeAddrs{
+ addrs[1],
+ addrs[2],
+ addrs[3],
+ },
+ }
+ require.EqualValues(t, targ, m)
})
t.Run("ipv6", func(t *testing.T) {
- addr := krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}
- xm := PexMsg{}
- xm.AppendDropped(addr)
- require.EqualValues(t, len(xm.Dropped), 0)
- require.EqualValues(t, len(xm.Dropped6), 1)
- require.True(t, xm.Dropped6[0].IP.Equal(addr.IP), "IPs should match")
- require.EqualValues(t, xm.Dropped6[0].Port, addr.Port)
+ addrs := addrs6
+ m := new(PexMsg)
+ m.Add(addrs[0], f)
+ m.Drop(addrs[1])
+ for _, addr := range addrs {
+ m.Drop(addr)
+ }
+ targ := &PexMsg{
+ Added6Flags: []PexPeerFlags{},
+ Added6: krpc.CompactIPv6NodeAddrs{},
+ Dropped6: krpc.CompactIPv6NodeAddrs{
+ addrs[1],
+ addrs[2],
+ addrs[3],
+ },
+ }
+ require.EqualValues(t, targ, m)
})
- t.Run("unspecified", func(t *testing.T) {
+ t.Run("empty", func(t *testing.T) {
addr := krpc.NodeAddr{}
- xm := PexMsg{}
- xm.AppendDropped(addr)
- require.EqualValues(t, len(xm.Dropped), 0)
- require.EqualValues(t, len(xm.Dropped6), 0)
+ xm := new(PexMsg)
+ xm.Drop(addr)
+ require.EqualValues(t, 0, len(xm.Dropped))
+ require.EqualValues(t, 0, len(xm.Dropped6))
})
}
func TestMarshalPexMessage(t *testing.T) {
addr := krpc.NodeAddr{IP: net.IP{127, 0, 0, 1}, Port: 0x55aa}
f := PexPrefersEncryption | PexOutgoingConn
- pm := PexMsg{}
- pm.AppendAdded(addr, f)
+ pm := new(PexMsg)
+ pm.Added = append(pm.Added, addr)
+ pm.AddedFlags = append(pm.AddedFlags, f)
b, err := bencode.Marshal(pm)
require.NoError(t, err)
// response.
metadataRequests []bool
sentHaves bitmap.Bitmap
+ pexSeq int
// Stuff controlled by the remote peer.
- PeerID PeerID
- peerInterested bool
- peerChoking bool
- peerRequests map[request]struct{}
- PeerExtensionBytes pp.PeerExtensionBits
+ PeerID PeerID
+ peerInterested bool
+ peerChoking bool
+ peerRequests map[request]struct{}
+ PeerExtensionBytes pp.PeerExtensionBits
PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
PeerListenPort int
// The pieces the peer has claimed to have.
return f
}
+func (c *PeerConn) pexEvent(t pexEventType) pexEvent {
+ f := c.pexPeerFlags()
+ return pexEvent{t, c.remoteAddr, f}
+}
+
func (c *PeerConn) String() string {
return fmt.Sprintf("connection %p", c)
}
--- /dev/null
+package torrent
+
+import (
+ "net"
+
+ "github.com/anacrolix/dht/v2/krpc"
+ pp "github.com/anacrolix/torrent/peer_protocol"
+)
+
+type pexEventType int
+
+const (
+ pexAdd pexEventType = iota
+ pexDrop
+)
+
+// internal, based on BEP11
+const (
+ pexTargAdded = 25 // put drops on hold when the number of alive connections is lower than this
+ pexMaxHold = 25 // length of the drop hold-back buffer
+ pexMaxDelta = 50 // upper bound on added+added6 and dropped+dropped6 in a single PEX message
+)
+
+// represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
+type pexEvent struct {
+ t pexEventType
+ addr net.Addr
+ f pp.PexPeerFlags
+}
+
+// records the event into the peer protocol PEX message
+func (e *pexEvent) put(m *pp.PexMsg) {
+ switch e.t {
+ case pexAdd:
+ m.Add(nodeAddr(e.addr), e.f)
+ case pexDrop:
+ m.Drop(nodeAddr(e.addr))
+ }
+}
+
+func nodeAddr(addr net.Addr) krpc.NodeAddr {
+ ipport, _ := tryIpPortFromNetAddr(addr)
+ return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}
+}
+
+// mainly for the krpc marshallers
+func shortestIP(ip net.IP) net.IP {
+ if ip4 := ip.To4(); ip4 != nil {
+ return ip4
+ }
+ return ip
+}
+
+// Per-torrent PEX state
+type pexState struct {
+ ev []pexEvent // event feed, append-only
+ hold []pexEvent // delayed drops
+ nc int // net number of alive conns
+}
+
+func (s *pexState) Reset() {
+ s.ev = nil
+ s.hold = nil
+ s.nc = 0
+}
+
+func (s *pexState) Add(c *PeerConn) {
+ s.nc++
+ if s.nc >= pexTargAdded {
+ s.ev = append(s.ev, s.hold...)
+ s.hold = s.hold[:0]
+ }
+ e := c.pexEvent(pexAdd)
+ s.ev = append(s.ev, e)
+}
+
+func (s *pexState) Drop(c *PeerConn) {
+ e := c.pexEvent(pexDrop)
+ s.nc--
+ if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
+ s.hold = append(s.hold, e)
+ } else {
+ s.ev = append(s.ev, e)
+ }
+}
+
+// Generate a PEX message based on the event feed.
+// Also returns an index to pass to the subsequent calls, producing incremental deltas.
+func (s *pexState) Genmsg(start int) (*pp.PexMsg, int) {
+ m := new(pp.PexMsg)
+ n := start
+ for _, e := range s.ev[start:] {
+ if start > 0 && m.DeltaLen() >= pexMaxDelta {
+ break
+ }
+ e.put(m)
+ n++
+ }
+ return m, n
+}
--- /dev/null
+package torrent
+
+import (
+ "net"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/anacrolix/dht/v2/krpc"
+ pp "github.com/anacrolix/torrent/peer_protocol"
+)
+
+var (
+ addrs = []net.Addr{
+ &net.TCPAddr{IP: net.IPv6loopback, Port: 4747},
+ &net.TCPAddr{IP: net.IPv6loopback, Port: 4748},
+ &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747},
+ &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748},
+ }
+ f = pp.PexOutgoingConn
+)
+
+func TestPexAdded(t *testing.T) {
+ t.Run("noHold", func(t *testing.T) {
+ s := new(pexState)
+ s.Add(&PeerConn{remoteAddr: addrs[0], outgoing: true})
+ targ := &pexState{
+ ev: []pexEvent{
+ pexEvent{pexAdd, addrs[0], pp.PexOutgoingConn},
+ },
+ nc: 1,
+ }
+ require.EqualValues(t, targ, s)
+ })
+ t.Run("belowTarg", func(t *testing.T) {
+ s := &pexState{
+ hold: []pexEvent{
+ pexEvent{pexDrop, addrs[1], 0},
+ },
+ nc: 0,
+ }
+ s.Add(&PeerConn{remoteAddr: addrs[0]})
+ targ := &pexState{
+ hold: []pexEvent{
+ pexEvent{pexDrop, addrs[1], 0},
+ },
+ ev: []pexEvent{
+ pexEvent{pexAdd, addrs[0], 0},
+ },
+ nc: 1,
+ }
+ require.EqualValues(t, targ, s)
+ })
+ t.Run("aboveTarg", func(t *testing.T) {
+ holdAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
+ s := &pexState{
+ hold: []pexEvent{
+ pexEvent{pexDrop, holdAddr, 0},
+ },
+ nc: pexTargAdded,
+ }
+ s.Add(&PeerConn{remoteAddr: addrs[0]})
+ targ := &pexState{
+ hold: []pexEvent{},
+ ev: []pexEvent{
+ pexEvent{pexDrop, holdAddr, 0},
+ pexEvent{pexAdd, addrs[0], 0},
+ },
+ nc: pexTargAdded + 1,
+ }
+ require.EqualValues(t, targ, s)
+ })
+}
+
+func TestPexDropped(t *testing.T) {
+ t.Run("belowTarg", func(t *testing.T) {
+ s := &pexState{nc: 1}
+ s.Drop(&PeerConn{remoteAddr: addrs[0]})
+ targ := &pexState{
+ hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
+ nc: 0,
+ }
+ require.EqualValues(t, targ, s)
+ })
+ t.Run("aboveTarg", func(t *testing.T) {
+ s := &pexState{nc: pexTargAdded + 1}
+ s.Drop(&PeerConn{remoteAddr: addrs[0]})
+ targ := &pexState{
+ ev: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
+ nc: pexTargAdded,
+ }
+ require.EqualValues(t, targ, s)
+ })
+}
+
+func TestPexReset(t *testing.T) {
+ s := &pexState{
+ hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
+ ev: []pexEvent{pexEvent{pexAdd, addrs[1], 0}},
+ nc: 1,
+ }
+ s.Reset()
+ targ := new(pexState)
+ require.EqualValues(t, targ, s)
+}
+
+var testcases = []struct {
+ name string
+ in *pexState
+ arg int
+ targM *pp.PexMsg
+ targS int
+}{
+ {
+ name: "empty",
+ in: &pexState{},
+ arg: 0,
+ targM: &pp.PexMsg{},
+ targS: 0,
+ },
+ {
+ name: "add4",
+ in: &pexState{
+ ev: []pexEvent{
+ pexEvent{pexAdd, addrs[0], f},
+ pexEvent{pexAdd, addrs[1], f},
+ pexEvent{pexAdd, addrs[2], f},
+ pexEvent{pexAdd, addrs[3], f},
+ },
+ },
+ arg: 0,
+ targM: &pp.PexMsg{
+ Added: krpc.CompactIPv4NodeAddrs{
+ nodeAddr(addrs[2]),
+ nodeAddr(addrs[3]),
+ },
+ AddedFlags: []pp.PexPeerFlags{f, f},
+ Added6: krpc.CompactIPv6NodeAddrs{
+ nodeAddr(addrs[0]),
+ nodeAddr(addrs[1]),
+ },
+ Added6Flags: []pp.PexPeerFlags{f, f},
+ },
+ targS: 4,
+ },
+ {
+ name: "drop2",
+ arg: 0,
+ in: &pexState{
+ ev: []pexEvent{
+ pexEvent{pexDrop, addrs[0], f},
+ pexEvent{pexDrop, addrs[2], f},
+ },
+ },
+ targM: &pp.PexMsg{
+ Dropped: krpc.CompactIPv4NodeAddrs{
+ nodeAddr(addrs[2]),
+ },
+ Dropped6: krpc.CompactIPv6NodeAddrs{
+ nodeAddr(addrs[0]),
+ },
+ },
+ targS: 2,
+ },
+ {
+ name: "add2drop1",
+ arg: 0,
+ in: &pexState{
+ ev: []pexEvent{
+ pexEvent{pexAdd, addrs[0], f},
+ pexEvent{pexAdd, addrs[1], f},
+ pexEvent{pexDrop, addrs[0], f},
+ },
+ },
+ targM: &pp.PexMsg{
+ Added6: krpc.CompactIPv6NodeAddrs{
+ nodeAddr(addrs[1]),
+ },
+ Added6Flags: []pp.PexPeerFlags{f},
+ },
+ targS: 3,
+ },
+ {
+ name: "delayed",
+ arg: 0,
+ in: &pexState{
+ ev: []pexEvent{
+ pexEvent{pexAdd, addrs[0], f},
+ pexEvent{pexAdd, addrs[1], f},
+ pexEvent{pexAdd, addrs[2], f},
+ },
+ hold: []pexEvent{
+ pexEvent{pexDrop, addrs[0], f},
+ pexEvent{pexDrop, addrs[2], f},
+ pexEvent{pexDrop, addrs[1], f},
+ },
+ },
+ targM: &pp.PexMsg{
+ Added: krpc.CompactIPv4NodeAddrs{
+ nodeAddr(addrs[2]),
+ },
+ AddedFlags: []pp.PexPeerFlags{f},
+ Added6: krpc.CompactIPv6NodeAddrs{
+ nodeAddr(addrs[0]),
+ nodeAddr(addrs[1]),
+ },
+ Added6Flags: []pp.PexPeerFlags{f, f},
+ },
+ targS: 3,
+ },
+ {
+ name: "followup",
+ arg: 1,
+ in: &pexState{
+ ev: []pexEvent{
+ pexEvent{pexAdd, addrs[0], f},
+ pexEvent{pexAdd, addrs[1], f},
+ },
+ },
+ targM: &pp.PexMsg{
+ Added6: krpc.CompactIPv6NodeAddrs{
+ nodeAddr(addrs[1]),
+ },
+ Added6Flags: []pp.PexPeerFlags{f},
+ },
+ targS: 2,
+ },
+}
+
+func TestPexGenmsg(t *testing.T) {
+ for _, tc := range testcases {
+ t.Run(tc.name, func(t *testing.T) {
+ s := tc.in
+ m, seen := s.Genmsg(tc.arg)
+ require.EqualValues(t, tc.targM, m)
+ require.EqualValues(t, tc.targS, seen)
+ })
+ }
+}
"fmt"
"io"
"math/rand"
- "net"
"net/url"
"sync"
"text/tabwriter"
"github.com/davecgh/go-spew/spew"
"github.com/anacrolix/dht/v2"
- "github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/perf"
for conn := range t.conns {
conn.close()
}
- // PEX wipe state here
+ t.pex.Reset()
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
t.updateWantPeersEvent()
func (t *Torrent) dropConnection(c *PeerConn) {
t.cl.event.Broadcast()
if !t.cl.config.DisablePEX {
- t.pex.dropped(c)
+ t.pex.Drop(c)
}
c.close()
if t.deleteConnection(c) {
}
t.conns[c] = struct{}{}
if !t.cl.config.DisablePEX {
- t.pex.added(c)
+ t.pex.Add(c)
}
return nil
}
defer t.cl.unlock()
t.userOnWriteChunkErr = f
}
-
-func nodeAddr(addr net.Addr) krpc.NodeAddr {
- ipport, _ := tryIpPortFromNetAddr(addr)
- ip := ipport.IP
- if ip4 := ip.To4(); ip4 != nil && len(ip) != net.IPv4len {
- ip = ip4
- }
- return krpc.NodeAddr{IP: ip, Port: ipport.Port}
-}
-
-func (t *Torrent) pexInitial() *pp.PexMsg {
- // BUG FIXME PEX prepare 25 recently connected peers
- tx := &pp.PexMsg{}
- for c := range t.conns {
- addr := nodeAddr(c.remoteAddr)
- f := c.pexPeerFlags()
- tx.AppendAdded(addr, f)
- }
- nc := len(tx.Added) + len(tx.Added6)
- // BUG if nc < 50 {
- if nc < 1 {
- return nil
- }
- return tx
-}
-
-type pexState struct{}
-
-func (s *pexState) added(c *PeerConn) {
- return
-}
-
-func (s *pexState) dropped(c *PeerConn) {
- return
-}
assert.False(t, tt.haveAllMetadataPieces())
assert.Nil(t, tt.Metainfo().InfoBytes)
}
-
-func TestTorrentPexInitial(t *testing.T) {
- v := []*PeerConn{
- &PeerConn{
- remoteAddr: &net.UDPAddr{IP: net.IPv4(172, 17, 0, 2), Port: 5555},
- },
- &PeerConn{
- remoteAddr: &net.UDPAddr{
- IP: net.IP{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
- Port: 11111,
- },
- outgoing: true,
- },
- &PeerConn{
- remoteAddr: &net.UDPAddr{IP: net.IP(nil), Port: 0},
- },
- }
- torrent := &Torrent{conns: make(map[*PeerConn]struct{})}
- for _, conn := range v {
- torrent.conns[conn] = struct{}{}
- }
- tx := torrent.pexInitial()
- require.NotNil(t, tx)
- require.EqualValues(t, 1, len(tx.Added))
- require.EqualValues(t, tx.Added[0].UDP().Network(), v[0].remoteAddr.Network())
- require.EqualValues(t, tx.Added[0].UDP().String(), v[0].remoteAddr.String())
- require.Zero(t, tx.AddedFlags[0])
- require.EqualValues(t, 1, len(tx.Added6))
- require.EqualValues(t, tx.Added6[0].UDP().Network(), v[1].remoteAddr.Network())
- require.EqualValues(t, tx.Added6[0].UDP().String(), v[1].remoteAddr.String())
- require.NotZero(t, tx.Added6Flags[0]&pp.PexOutgoingConn)
-}