]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Share current connections with peers over PEX (anacrolix#341)
authorYaroslav Kolomiiets <yarikos@gmail.com>
Wed, 11 Dec 2019 11:45:04 +0000 (11:45 +0000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 15 Apr 2020 07:24:44 +0000 (17:24 +1000)
client.go
peer_protocol/pex.go
peer_protocol/pex_test.go
peerconn.go
peerconn_test.go
torrent.go
torrent_test.go

index 8a94e021756b5ff5c8a75a6af77043b169071cc8..b5b76145cf18675311b60f63fef4985ba97a7a5a 100644 (file)
--- a/client.go
+++ b/client.go
@@ -934,6 +934,24 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
        }
 }
 
+func (cl *Client) sendInitialPEX(conn *PeerConn, t *Torrent) {
+       peerPexExtendedId, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex]
+       if !ok {
+               // peer did not advertise support for the PEX extension
+               conn.logger.Printf("no PEX support -  not sending initial")
+               return
+       }
+       pexMsg := t.pexInitial()
+       if pexMsg == nil {
+               // not enough peers to share — e.g. len(t.conns < 50)
+               conn.logger.Printf("skipping PEX initial")
+               return
+       }
+       log.Printf("preparing PEX initial message: %v", pexMsg)
+       tx := pexMsg.Message(peerPexExtendedId)
+       conn.post(tx)
+}
+
 func (cl *Client) dhtPort() (ret uint16) {
        cl.eachDhtServer(func(s DhtServer) {
                ret = uint16(missinggo.AddrPort(s.Addr()))
index 2347aa3ed1227a9194441f03ece99c38226f1578..8d09106103f20f4a2678b2beb36740b660fe10a4 100644 (file)
@@ -1,6 +1,11 @@
 package peer_protocol
 
-import "github.com/anacrolix/dht/v2/krpc"
+import (
+       "net"
+
+       "github.com/anacrolix/dht/v2/krpc"
+       "github.com/anacrolix/torrent/bencode"
+)
 
 type PexMsg struct {
        Added       krpc.CompactIPv4NodeAddrs `bencode:"added"`
@@ -11,6 +16,35 @@ type PexMsg struct {
        Dropped6    krpc.CompactIPv6NodeAddrs `bencode:"dropped6"`
 }
 
+func (m *PexMsg) AppendAdded(addr krpc.NodeAddr, f PexPeerFlags) {
+       ip := addr.IP
+       if ip.To4() != nil {
+               m.Added = append(m.Added, addr)
+               m.AddedFlags = append(m.AddedFlags, f)
+       } else if len(ip) == net.IPv6len {
+               m.Added6 = append(m.Added6, addr)
+               m.Added6Flags = append(m.Added6Flags, f)
+       }
+}
+
+func (m *PexMsg) AppendDropped(addr krpc.NodeAddr) {
+       ip := addr.IP
+       if ip.To4() != nil {
+               m.Dropped = append(m.Dropped, addr)
+       } else if len(ip) == net.IPv6len {
+               m.Dropped6 = append(m.Dropped6, addr)
+       }
+}
+
+func (pexMsg *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
+       payload := bencode.MustMarshal(pexMsg)
+       return Message{
+               Type:            Extended,
+               ExtendedID:      pexExtendedId,
+               ExtendedPayload: payload,
+       }
+}
+
 type PexPeerFlags byte
 
 func (me PexPeerFlags) Get(f PexPeerFlags) bool {
@@ -18,9 +52,9 @@ func (me PexPeerFlags) Get(f PexPeerFlags) bool {
 }
 
 const (
-       PexPrefersEncryption = 0x01
-       PexSeedUploadOnly    = 0x02
-       PexSupportsUtp       = 0x04
-       PexHolepunchSupport  = 0x08
-       PexOutgoingConn      = 0x10
+       PexPrefersEncryption PexPeerFlags = 1 << iota
+       PexSeedUploadOnly
+       PexSupportsUtp
+       PexHolepunchSupport
+       PexOutgoingConn
 )
index 0c2b9f9ed1489008984571bd09502da3902033b0..30e0d363a8cf381046ab662c63650bdc43652ad7 100644 (file)
@@ -1,10 +1,14 @@
 package peer_protocol
 
 import (
+       "bufio"
+       "bytes"
+       "net"
        "testing"
 
        "github.com/stretchr/testify/require"
 
+       "github.com/anacrolix/dht/v2/krpc"
        "github.com/anacrolix/torrent/bencode"
 )
 
@@ -24,3 +28,102 @@ func TestEmptyPexMsg(t *testing.T) {
        require.NoError(t, err)
        require.NoError(t, bencode.Unmarshal(b, &pm))
 }
+
+func TestPexAppendAdded(t *testing.T) {
+       t.Run("ipv4", func(t *testing.T) {
+               addr := krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}
+               f := PexPrefersEncryption | PexOutgoingConn
+               xm := PexMsg{}
+               xm.AppendAdded(addr, f)
+               require.EqualValues(t, len(xm.Added), 1)
+               require.EqualValues(t, len(xm.AddedFlags), 1)
+               require.EqualValues(t, len(xm.Added6), 0)
+               require.EqualValues(t, len(xm.Added6Flags), 0)
+               require.True(t, xm.Added[0].IP.Equal(addr.IP), "IPs should match")
+               require.EqualValues(t, xm.Added[0].Port, addr.Port)
+               require.EqualValues(t, xm.AddedFlags[0], f)
+       })
+       t.Run("ipv6", func(t *testing.T) {
+               addr := krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}
+               f := PexPrefersEncryption | PexOutgoingConn
+               xm := PexMsg{}
+               xm.AppendAdded(addr, f)
+               require.EqualValues(t, len(xm.Added), 0)
+               require.EqualValues(t, len(xm.AddedFlags), 0)
+               require.EqualValues(t, len(xm.Added6), 1)
+               require.EqualValues(t, len(xm.Added6Flags), 1)
+               require.True(t, xm.Added6[0].IP.Equal(addr.IP), "IPs should match")
+               require.EqualValues(t, xm.Added6[0].Port, addr.Port)
+               require.EqualValues(t, xm.Added6Flags[0], f)
+       })
+       t.Run("unspecified", func(t *testing.T) {
+               addr := krpc.NodeAddr{}
+               xm := PexMsg{}
+               xm.AppendAdded(addr, 0)
+               require.EqualValues(t, len(xm.Added), 0)
+               require.EqualValues(t, len(xm.AddedFlags), 0)
+               require.EqualValues(t, len(xm.Added6), 0)
+               require.EqualValues(t, len(xm.Added6Flags), 0)
+       })
+}
+
+func TestPexAppendDropped(t *testing.T) {
+       t.Run("ipv4", func(t *testing.T) {
+               addr := krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}
+               xm := PexMsg{}
+               xm.AppendDropped(addr)
+               require.EqualValues(t, len(xm.Dropped), 1)
+               require.EqualValues(t, len(xm.Dropped6), 0)
+               require.True(t, xm.Dropped[0].IP.Equal(addr.IP), "IPs should match")
+               require.EqualValues(t, xm.Dropped[0].Port, addr.Port)
+       })
+       t.Run("ipv6", func(t *testing.T) {
+               addr := krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}
+               xm := PexMsg{}
+               xm.AppendDropped(addr)
+               require.EqualValues(t, len(xm.Dropped), 0)
+               require.EqualValues(t, len(xm.Dropped6), 1)
+               require.True(t, xm.Dropped6[0].IP.Equal(addr.IP), "IPs should match")
+               require.EqualValues(t, xm.Dropped6[0].Port, addr.Port)
+       })
+       t.Run("unspecified", func(t *testing.T) {
+               addr := krpc.NodeAddr{}
+               xm := PexMsg{}
+               xm.AppendDropped(addr)
+               require.EqualValues(t, len(xm.Dropped), 0)
+               require.EqualValues(t, len(xm.Dropped6), 0)
+       })
+}
+
+func TestMarshalPexMessage(t *testing.T) {
+       addr := krpc.NodeAddr{IP: net.IP{127, 0, 0, 1}, Port: 0x55aa}
+       f := PexPrefersEncryption | PexOutgoingConn
+       pm := PexMsg{}
+       pm.AppendAdded(addr, f)
+
+       b, err := bencode.Marshal(pm)
+       require.NoError(t, err)
+
+       pexExtendedId := ExtensionNumber(7)
+       msg := pm.Message(pexExtendedId)
+       expected := []byte("\x00\x00\x00\x4c\x14\x07d5:added6:\x7f\x00\x00\x01\x55\xaa7:added.f1:\x116:added60:8:added6.f0:7:dropped0:8:dropped60:e")
+       b, err = msg.MarshalBinary()
+       require.NoError(t, err)
+       require.EqualValues(t, b, expected)
+
+       msg = Message{}
+       dec := Decoder{
+               R:         bufio.NewReader(bytes.NewBuffer(b)),
+               MaxLength: 128,
+       }
+       pmOut := PexMsg{}
+       err = dec.Decode(&msg)
+       require.NoError(t, err)
+       require.EqualValues(t, Extended, msg.Type)
+       require.EqualValues(t, pexExtendedId, msg.ExtendedID)
+       err = bencode.Unmarshal(msg.ExtendedPayload, &pmOut)
+       require.NoError(t, err)
+       require.EqualValues(t, len(pm.Added), len(pmOut.Added))
+       require.EqualValues(t, pm.Added[0].IP, pmOut.Added[0].IP)
+       require.EqualValues(t, pm.Added[0].Port, pmOut.Added[0].Port)
+}
index ee5cdfd986216ee2509098a4d7ea6c496c969cd3..f90dfd4fbd07b0b33a0a704ee89c87513a4e9bb4 100644 (file)
@@ -92,6 +92,8 @@ type PeerConn struct {
        peerChoking        bool
        peerRequests       map[request]struct{}
        PeerExtensionBytes pp.PeerExtensionBits
+       PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
+       PeerListenPort        int
        // The pieces the peer has claimed to have.
        _peerPieces bitmap.Bitmap
        // The peer has everything. This can occur due to a special message, when
@@ -1115,6 +1117,8 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
                if c.PeerExtensionIDs == nil {
                        c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber, len(d.M))
                }
+               c.PeerListenPort = d.Port
+               c.PeerPrefersEncryption = d.Encryption
                for name, id := range d.M {
                        if _, ok := c.PeerExtensionIDs[name]; !ok {
                                torrent.Add(fmt.Sprintf("peers supporting extension %q", name), 1)
@@ -1127,6 +1131,10 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
                        }
                }
                c.requestPendingMetadata()
+               if !cl.config.DisablePEX {
+                       cl.sendInitialPEX(c, t)
+                       // BUG no sending PEX updates yet
+               }
                return nil
        case metadataExtendedId:
                err := cl.gotMetadataExtensionMsg(payload, t, c)
@@ -1140,11 +1148,14 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
                        // advertising that we support PEX if it's disabled.
                        return nil
                }
+               c.logger.Printf("incoming PEX message")
                var pexMsg pp.PexMsg
                err := bencode.Unmarshal(payload, &pexMsg)
                if err != nil {
                        return fmt.Errorf("error unmarshalling PEX message: %s", err)
                }
+               npeers := len(pexMsg.Added6) + len(pexMsg.Added)
+               c.logger.Printf("adding %d peers from PEX", npeers)
                torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
                var peers Peers
                peers.AppendFromPex(pexMsg.Added6, pexMsg.Added6Flags)
@@ -1468,6 +1479,20 @@ func (c *PeerConn) remoteIpPort() IpPort {
        return IpPort{ipa.IP, uint16(ipa.Port)}
 }
 
+func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags {
+       f := pp.PexPeerFlags(0)
+       if c.PeerPrefersEncryption {
+               f |= pp.PexPrefersEncryption
+       }
+       if c.outgoing {
+               f |= pp.PexOutgoingConn
+       }
+       if c.utp() {
+               f |= pp.PexSupportsUtp
+       }
+       return f
+}
+
 func (c *PeerConn) String() string {
        return fmt.Sprintf("connection %p", c)
 }
index da7c5bb3fdc69f71670029c9f3d6c00cf1f7a572..6196bd80327630f4a214c84294ed036a66f7af8a 100644 (file)
@@ -143,3 +143,23 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
        require.NoError(b, <-mrlErr)
        require.EqualValues(b, b.N, cn._stats.ChunksReadUseful.Int64())
 }
+
+func TestPexPeerFlags(t *testing.T) {
+       var testcases = []struct {
+               conn *PeerConn
+               f    pp.PexPeerFlags
+       }{
+               {&PeerConn{outgoing: false, PeerPrefersEncryption: false}, 0},
+               {&PeerConn{outgoing: false, PeerPrefersEncryption: true}, pp.PexPrefersEncryption},
+               {&PeerConn{outgoing: true, PeerPrefersEncryption: false}, pp.PexOutgoingConn},
+               {&PeerConn{outgoing: true, PeerPrefersEncryption: true}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
+               {&PeerConn{network: "udp4"}, pp.PexSupportsUtp},
+               {&PeerConn{outgoing: true, network: "udp6"}, pp.PexOutgoingConn | pp.PexSupportsUtp},
+               {&PeerConn{outgoing: true, network: "tcp4"}, pp.PexOutgoingConn},
+               {&PeerConn{network: "tcp6"}, 0},
+       }
+       for i, tc := range testcases {
+               f := tc.conn.pexPeerFlags()
+               require.EqualValues(t, tc.f, f, i)
+       }
+}
index 511abd7192a07640a0cc982165a8c1a90960ae80..fc3353578a9f973ad0716b166dedf42745ebc992 100644 (file)
@@ -7,6 +7,7 @@ import (
        "fmt"
        "io"
        "math/rand"
+       "net"
        "net/url"
        "sync"
        "text/tabwriter"
@@ -16,6 +17,7 @@ import (
        "github.com/davecgh/go-spew/spew"
 
        "github.com/anacrolix/dht/v2"
+       "github.com/anacrolix/dht/v2/krpc"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo"
        "github.com/anacrolix/missinggo/perf"
@@ -131,6 +133,8 @@ type Torrent struct {
 
        // Count of each request across active connections.
        pendingRequests map[request]int
+
+       pex pexState
 }
 
 func (t *Torrent) numConns() int {
@@ -257,6 +261,7 @@ func (t *Torrent) addPeer(p Peer) {
        if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
                if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
                        torrent.Add("peers not added because of bad addr", 1)
+                       cl.logger.Printf("peers not added because of bad addr: %v", p)
                        return
                }
        }
@@ -702,6 +707,7 @@ func (t *Torrent) close() (err error) {
        for conn := range t.conns {
                conn.close()
        }
+       // PEX wipe state here
        t.cl.event.Broadcast()
        t.pieceStateChanges.Close()
        t.updateWantPeersEvent()
@@ -1219,6 +1225,9 @@ func (t *Torrent) assertNoPendingRequests() {
 
 func (t *Torrent) dropConnection(c *PeerConn) {
        t.cl.event.Broadcast()
+       if !t.cl.config.DisablePEX {
+               t.pex.dropped(c)
+       }
        c.close()
        if t.deleteConnection(c) {
                t.openNewConns()
@@ -1491,6 +1500,9 @@ func (t *Torrent) addConnection(c *PeerConn) (err error) {
                panic(len(t.conns))
        }
        t.conns[c] = struct{}{}
+       if !t.cl.config.DisablePEX {
+               t.pex.added(c)
+       }
        return nil
 }
 
@@ -1851,3 +1863,38 @@ func (t *Torrent) SetOnWriteChunkError(f func(error)) {
        defer t.cl.unlock()
        t.userOnWriteChunkErr = f
 }
+
+func nodeAddr(addr net.Addr) krpc.NodeAddr {
+       ipport, _ := tryIpPortFromNetAddr(addr)
+       ip := ipport.IP
+       if ip4 := ip.To4(); ip4 != nil && len(ip) != net.IPv4len {
+               ip = ip4
+       }
+       return krpc.NodeAddr{IP: ip, Port: ipport.Port}
+}
+
+func (t *Torrent) pexInitial() *pp.PexMsg {
+       // BUG FIXME PEX prepare 25 recently connected peers
+       tx := &pp.PexMsg{}
+       for c := range t.conns {
+               addr := nodeAddr(c.remoteAddr)
+               f := c.pexPeerFlags()
+               tx.AppendAdded(addr, f)
+       }
+       nc := len(tx.Added) + len(tx.Added6)
+       // BUG if nc < 50 {
+       if nc < 1 {
+               return nil
+       }
+       return tx
+}
+
+type pexState struct{}
+
+func (s *pexState) added(c *PeerConn) {
+       return
+}
+
+func (s *pexState) dropped(c *PeerConn) {
+       return
+}
index 5ecd93b4378b52f6e5a4b9e521f7cdffcb49b5c4..b8cea974f9ea5477a7fe6850b5312d6cfaf3366c 100644 (file)
@@ -218,3 +218,35 @@ func TestTorrentMetainfoIncompleteMetadata(t *testing.T) {
        assert.False(t, tt.haveAllMetadataPieces())
        assert.Nil(t, tt.Metainfo().InfoBytes)
 }
+
+func TestTorrentPexInitial(t *testing.T) {
+       v := []*PeerConn{
+               &PeerConn{
+                       remoteAddr: &net.UDPAddr{IP:   net.IPv4(172, 17, 0, 2), Port: 5555},
+               },
+               &PeerConn{
+                       remoteAddr: &net.UDPAddr{
+                               IP:   net.IP{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
+                               Port: 11111,
+                       },
+                       outgoing: true,
+               },
+               &PeerConn{
+                       remoteAddr: &net.UDPAddr{IP: net.IP(nil), Port: 0},
+               },
+       }
+       torrent := &Torrent{conns: make(map[*PeerConn]struct{})}
+       for _, conn := range v {
+               torrent.conns[conn] = struct{}{}
+       }
+       tx := torrent.pexInitial()
+       require.NotNil(t, tx)
+       require.EqualValues(t, 1, len(tx.Added))
+       require.EqualValues(t, tx.Added[0].UDP().Network(), v[0].remoteAddr.Network())
+       require.EqualValues(t, tx.Added[0].UDP().String(), v[0].remoteAddr.String())
+       require.Zero(t, tx.AddedFlags[0])
+       require.EqualValues(t, 1, len(tx.Added6))
+       require.EqualValues(t, tx.Added6[0].UDP().Network(), v[1].remoteAddr.Network())
+       require.EqualValues(t, tx.Added6[0].UDP().String(), v[1].remoteAddr.String())
+       require.NotZero(t, tx.Added6Flags[0]&pp.PexOutgoingConn)
+}