From 634edd2875225845e35abe935153a58a6e6c8294 Mon Sep 17 00:00:00 2001 From: Yaroslav Kolomiiets Date: Wed, 11 Dec 2019 11:45:04 +0000 Subject: [PATCH] Share current connections with peers over PEX (anacrolix#341) --- client.go | 18 +++++++ peer_protocol/pex.go | 46 ++++++++++++++--- peer_protocol/pex_test.go | 103 ++++++++++++++++++++++++++++++++++++++ peerconn.go | 25 +++++++++ peerconn_test.go | 20 ++++++++ torrent.go | 47 +++++++++++++++++ torrent_test.go | 32 ++++++++++++ 7 files changed, 285 insertions(+), 6 deletions(-) diff --git a/client.go b/client.go index 8a94e021..b5b76145 100644 --- a/client.go +++ b/client.go @@ -934,6 +934,24 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { } } +func (cl *Client) sendInitialPEX(conn *PeerConn, t *Torrent) { + peerPexExtendedId, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex] + if !ok { + // peer did not advertise support for the PEX extension + conn.logger.Printf("no PEX support - not sending initial") + return + } + pexMsg := t.pexInitial() + if pexMsg == nil { + // not enough peers to share — e.g. len(t.conns < 50) + conn.logger.Printf("skipping PEX initial") + return + } + log.Printf("preparing PEX initial message: %v", pexMsg) + tx := pexMsg.Message(peerPexExtendedId) + conn.post(tx) +} + func (cl *Client) dhtPort() (ret uint16) { cl.eachDhtServer(func(s DhtServer) { ret = uint16(missinggo.AddrPort(s.Addr())) diff --git a/peer_protocol/pex.go b/peer_protocol/pex.go index 2347aa3e..8d091061 100644 --- a/peer_protocol/pex.go +++ b/peer_protocol/pex.go @@ -1,6 +1,11 @@ package peer_protocol -import "github.com/anacrolix/dht/v2/krpc" +import ( + "net" + + "github.com/anacrolix/dht/v2/krpc" + "github.com/anacrolix/torrent/bencode" +) type PexMsg struct { Added krpc.CompactIPv4NodeAddrs `bencode:"added"` @@ -11,6 +16,35 @@ type PexMsg struct { Dropped6 krpc.CompactIPv6NodeAddrs `bencode:"dropped6"` } +func (m *PexMsg) AppendAdded(addr krpc.NodeAddr, f PexPeerFlags) { + ip := addr.IP + if ip.To4() != nil { + m.Added = append(m.Added, addr) + m.AddedFlags = append(m.AddedFlags, f) + } else if len(ip) == net.IPv6len { + m.Added6 = append(m.Added6, addr) + m.Added6Flags = append(m.Added6Flags, f) + } +} + +func (m *PexMsg) AppendDropped(addr krpc.NodeAddr) { + ip := addr.IP + if ip.To4() != nil { + m.Dropped = append(m.Dropped, addr) + } else if len(ip) == net.IPv6len { + m.Dropped6 = append(m.Dropped6, addr) + } +} + +func (pexMsg *PexMsg) Message(pexExtendedId ExtensionNumber) Message { + payload := bencode.MustMarshal(pexMsg) + return Message{ + Type: Extended, + ExtendedID: pexExtendedId, + ExtendedPayload: payload, + } +} + type PexPeerFlags byte func (me PexPeerFlags) Get(f PexPeerFlags) bool { @@ -18,9 +52,9 @@ func (me PexPeerFlags) Get(f PexPeerFlags) bool { } const ( - PexPrefersEncryption = 0x01 - PexSeedUploadOnly = 0x02 - PexSupportsUtp = 0x04 - PexHolepunchSupport = 0x08 - PexOutgoingConn = 0x10 + PexPrefersEncryption PexPeerFlags = 1 << iota + PexSeedUploadOnly + PexSupportsUtp + PexHolepunchSupport + PexOutgoingConn ) diff --git a/peer_protocol/pex_test.go b/peer_protocol/pex_test.go index 0c2b9f9e..30e0d363 100644 --- a/peer_protocol/pex_test.go +++ b/peer_protocol/pex_test.go @@ -1,10 +1,14 @@ package peer_protocol import ( + "bufio" + "bytes" + "net" "testing" "github.com/stretchr/testify/require" + "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/torrent/bencode" ) @@ -24,3 +28,102 @@ func TestEmptyPexMsg(t *testing.T) { require.NoError(t, err) require.NoError(t, bencode.Unmarshal(b, &pm)) } + +func TestPexAppendAdded(t *testing.T) { + t.Run("ipv4", func(t *testing.T) { + addr := krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747} + f := PexPrefersEncryption | PexOutgoingConn + xm := PexMsg{} + xm.AppendAdded(addr, f) + require.EqualValues(t, len(xm.Added), 1) + require.EqualValues(t, len(xm.AddedFlags), 1) + require.EqualValues(t, len(xm.Added6), 0) + require.EqualValues(t, len(xm.Added6Flags), 0) + require.True(t, xm.Added[0].IP.Equal(addr.IP), "IPs should match") + require.EqualValues(t, xm.Added[0].Port, addr.Port) + require.EqualValues(t, xm.AddedFlags[0], f) + }) + t.Run("ipv6", func(t *testing.T) { + addr := krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747} + f := PexPrefersEncryption | PexOutgoingConn + xm := PexMsg{} + xm.AppendAdded(addr, f) + require.EqualValues(t, len(xm.Added), 0) + require.EqualValues(t, len(xm.AddedFlags), 0) + require.EqualValues(t, len(xm.Added6), 1) + require.EqualValues(t, len(xm.Added6Flags), 1) + require.True(t, xm.Added6[0].IP.Equal(addr.IP), "IPs should match") + require.EqualValues(t, xm.Added6[0].Port, addr.Port) + require.EqualValues(t, xm.Added6Flags[0], f) + }) + t.Run("unspecified", func(t *testing.T) { + addr := krpc.NodeAddr{} + xm := PexMsg{} + xm.AppendAdded(addr, 0) + require.EqualValues(t, len(xm.Added), 0) + require.EqualValues(t, len(xm.AddedFlags), 0) + require.EqualValues(t, len(xm.Added6), 0) + require.EqualValues(t, len(xm.Added6Flags), 0) + }) +} + +func TestPexAppendDropped(t *testing.T) { + t.Run("ipv4", func(t *testing.T) { + addr := krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747} + xm := PexMsg{} + xm.AppendDropped(addr) + require.EqualValues(t, len(xm.Dropped), 1) + require.EqualValues(t, len(xm.Dropped6), 0) + require.True(t, xm.Dropped[0].IP.Equal(addr.IP), "IPs should match") + require.EqualValues(t, xm.Dropped[0].Port, addr.Port) + }) + t.Run("ipv6", func(t *testing.T) { + addr := krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747} + xm := PexMsg{} + xm.AppendDropped(addr) + require.EqualValues(t, len(xm.Dropped), 0) + require.EqualValues(t, len(xm.Dropped6), 1) + require.True(t, xm.Dropped6[0].IP.Equal(addr.IP), "IPs should match") + require.EqualValues(t, xm.Dropped6[0].Port, addr.Port) + }) + t.Run("unspecified", func(t *testing.T) { + addr := krpc.NodeAddr{} + xm := PexMsg{} + xm.AppendDropped(addr) + require.EqualValues(t, len(xm.Dropped), 0) + require.EqualValues(t, len(xm.Dropped6), 0) + }) +} + +func TestMarshalPexMessage(t *testing.T) { + addr := krpc.NodeAddr{IP: net.IP{127, 0, 0, 1}, Port: 0x55aa} + f := PexPrefersEncryption | PexOutgoingConn + pm := PexMsg{} + pm.AppendAdded(addr, f) + + b, err := bencode.Marshal(pm) + require.NoError(t, err) + + pexExtendedId := ExtensionNumber(7) + msg := pm.Message(pexExtendedId) + expected := []byte("\x00\x00\x00\x4c\x14\x07d5:added6:\x7f\x00\x00\x01\x55\xaa7:added.f1:\x116:added60:8:added6.f0:7:dropped0:8:dropped60:e") + b, err = msg.MarshalBinary() + require.NoError(t, err) + require.EqualValues(t, b, expected) + + msg = Message{} + dec := Decoder{ + R: bufio.NewReader(bytes.NewBuffer(b)), + MaxLength: 128, + } + pmOut := PexMsg{} + err = dec.Decode(&msg) + require.NoError(t, err) + require.EqualValues(t, Extended, msg.Type) + require.EqualValues(t, pexExtendedId, msg.ExtendedID) + err = bencode.Unmarshal(msg.ExtendedPayload, &pmOut) + require.NoError(t, err) + require.EqualValues(t, len(pm.Added), len(pmOut.Added)) + require.EqualValues(t, pm.Added[0].IP, pmOut.Added[0].IP) + require.EqualValues(t, pm.Added[0].Port, pmOut.Added[0].Port) +} diff --git a/peerconn.go b/peerconn.go index ee5cdfd9..f90dfd4f 100644 --- a/peerconn.go +++ b/peerconn.go @@ -92,6 +92,8 @@ type PeerConn struct { peerChoking bool peerRequests map[request]struct{} PeerExtensionBytes pp.PeerExtensionBits + PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake + PeerListenPort int // The pieces the peer has claimed to have. _peerPieces bitmap.Bitmap // The peer has everything. This can occur due to a special message, when @@ -1115,6 +1117,8 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err if c.PeerExtensionIDs == nil { c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber, len(d.M)) } + c.PeerListenPort = d.Port + c.PeerPrefersEncryption = d.Encryption for name, id := range d.M { if _, ok := c.PeerExtensionIDs[name]; !ok { torrent.Add(fmt.Sprintf("peers supporting extension %q", name), 1) @@ -1127,6 +1131,10 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err } } c.requestPendingMetadata() + if !cl.config.DisablePEX { + cl.sendInitialPEX(c, t) + // BUG no sending PEX updates yet + } return nil case metadataExtendedId: err := cl.gotMetadataExtensionMsg(payload, t, c) @@ -1140,11 +1148,14 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err // advertising that we support PEX if it's disabled. return nil } + c.logger.Printf("incoming PEX message") var pexMsg pp.PexMsg err := bencode.Unmarshal(payload, &pexMsg) if err != nil { return fmt.Errorf("error unmarshalling PEX message: %s", err) } + npeers := len(pexMsg.Added6) + len(pexMsg.Added) + c.logger.Printf("adding %d peers from PEX", npeers) torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6))) var peers Peers peers.AppendFromPex(pexMsg.Added6, pexMsg.Added6Flags) @@ -1468,6 +1479,20 @@ func (c *PeerConn) remoteIpPort() IpPort { return IpPort{ipa.IP, uint16(ipa.Port)} } +func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags { + f := pp.PexPeerFlags(0) + if c.PeerPrefersEncryption { + f |= pp.PexPrefersEncryption + } + if c.outgoing { + f |= pp.PexOutgoingConn + } + if c.utp() { + f |= pp.PexSupportsUtp + } + return f +} + func (c *PeerConn) String() string { return fmt.Sprintf("connection %p", c) } diff --git a/peerconn_test.go b/peerconn_test.go index da7c5bb3..6196bd80 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -143,3 +143,23 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { require.NoError(b, <-mrlErr) require.EqualValues(b, b.N, cn._stats.ChunksReadUseful.Int64()) } + +func TestPexPeerFlags(t *testing.T) { + var testcases = []struct { + conn *PeerConn + f pp.PexPeerFlags + }{ + {&PeerConn{outgoing: false, PeerPrefersEncryption: false}, 0}, + {&PeerConn{outgoing: false, PeerPrefersEncryption: true}, pp.PexPrefersEncryption}, + {&PeerConn{outgoing: true, PeerPrefersEncryption: false}, pp.PexOutgoingConn}, + {&PeerConn{outgoing: true, PeerPrefersEncryption: true}, pp.PexOutgoingConn | pp.PexPrefersEncryption}, + {&PeerConn{network: "udp4"}, pp.PexSupportsUtp}, + {&PeerConn{outgoing: true, network: "udp6"}, pp.PexOutgoingConn | pp.PexSupportsUtp}, + {&PeerConn{outgoing: true, network: "tcp4"}, pp.PexOutgoingConn}, + {&PeerConn{network: "tcp6"}, 0}, + } + for i, tc := range testcases { + f := tc.conn.pexPeerFlags() + require.EqualValues(t, tc.f, f, i) + } +} diff --git a/torrent.go b/torrent.go index 511abd71..fc335357 100644 --- a/torrent.go +++ b/torrent.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "math/rand" + "net" "net/url" "sync" "text/tabwriter" @@ -16,6 +17,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/anacrolix/dht/v2" + "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/log" "github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo/perf" @@ -131,6 +133,8 @@ type Torrent struct { // Count of each request across active connections. pendingRequests map[request]int + + pex pexState } func (t *Torrent) numConns() int { @@ -257,6 +261,7 @@ func (t *Torrent) addPeer(p Peer) { if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok { if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) { torrent.Add("peers not added because of bad addr", 1) + cl.logger.Printf("peers not added because of bad addr: %v", p) return } } @@ -702,6 +707,7 @@ func (t *Torrent) close() (err error) { for conn := range t.conns { conn.close() } + // PEX wipe state here t.cl.event.Broadcast() t.pieceStateChanges.Close() t.updateWantPeersEvent() @@ -1219,6 +1225,9 @@ func (t *Torrent) assertNoPendingRequests() { func (t *Torrent) dropConnection(c *PeerConn) { t.cl.event.Broadcast() + if !t.cl.config.DisablePEX { + t.pex.dropped(c) + } c.close() if t.deleteConnection(c) { t.openNewConns() @@ -1491,6 +1500,9 @@ func (t *Torrent) addConnection(c *PeerConn) (err error) { panic(len(t.conns)) } t.conns[c] = struct{}{} + if !t.cl.config.DisablePEX { + t.pex.added(c) + } return nil } @@ -1851,3 +1863,38 @@ func (t *Torrent) SetOnWriteChunkError(f func(error)) { defer t.cl.unlock() t.userOnWriteChunkErr = f } + +func nodeAddr(addr net.Addr) krpc.NodeAddr { + ipport, _ := tryIpPortFromNetAddr(addr) + ip := ipport.IP + if ip4 := ip.To4(); ip4 != nil && len(ip) != net.IPv4len { + ip = ip4 + } + return krpc.NodeAddr{IP: ip, Port: ipport.Port} +} + +func (t *Torrent) pexInitial() *pp.PexMsg { + // BUG FIXME PEX prepare 25 recently connected peers + tx := &pp.PexMsg{} + for c := range t.conns { + addr := nodeAddr(c.remoteAddr) + f := c.pexPeerFlags() + tx.AppendAdded(addr, f) + } + nc := len(tx.Added) + len(tx.Added6) + // BUG if nc < 50 { + if nc < 1 { + return nil + } + return tx +} + +type pexState struct{} + +func (s *pexState) added(c *PeerConn) { + return +} + +func (s *pexState) dropped(c *PeerConn) { + return +} diff --git a/torrent_test.go b/torrent_test.go index 5ecd93b4..b8cea974 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -218,3 +218,35 @@ func TestTorrentMetainfoIncompleteMetadata(t *testing.T) { assert.False(t, tt.haveAllMetadataPieces()) assert.Nil(t, tt.Metainfo().InfoBytes) } + +func TestTorrentPexInitial(t *testing.T) { + v := []*PeerConn{ + &PeerConn{ + remoteAddr: &net.UDPAddr{IP: net.IPv4(172, 17, 0, 2), Port: 5555}, + }, + &PeerConn{ + remoteAddr: &net.UDPAddr{ + IP: net.IP{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, + Port: 11111, + }, + outgoing: true, + }, + &PeerConn{ + remoteAddr: &net.UDPAddr{IP: net.IP(nil), Port: 0}, + }, + } + torrent := &Torrent{conns: make(map[*PeerConn]struct{})} + for _, conn := range v { + torrent.conns[conn] = struct{}{} + } + tx := torrent.pexInitial() + require.NotNil(t, tx) + require.EqualValues(t, 1, len(tx.Added)) + require.EqualValues(t, tx.Added[0].UDP().Network(), v[0].remoteAddr.Network()) + require.EqualValues(t, tx.Added[0].UDP().String(), v[0].remoteAddr.String()) + require.Zero(t, tx.AddedFlags[0]) + require.EqualValues(t, 1, len(tx.Added6)) + require.EqualValues(t, tx.Added6[0].UDP().Network(), v[1].remoteAddr.Network()) + require.EqualValues(t, tx.Added6[0].UDP().String(), v[1].remoteAddr.String()) + require.NotZero(t, tx.Added6Flags[0]&pp.PexOutgoingConn) +} -- 2.48.1