]> Sergey Matveev's repositories - btrtrc.git/commitdiff
PEX: add periodic deltas
authorYaroslav Kolomiiets <yarikos@gmail.com>
Wed, 8 Apr 2020 16:03:29 +0000 (17:03 +0100)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 15 Apr 2020 07:24:44 +0000 (17:24 +1000)
client.go
peer_protocol/pex.go
peerconn.go
peerconn_test.go
pex.go
pex_test.go
pexconn.go [new file with mode: 0644]
pexconn_test.go [new file with mode: 0644]
torrent.go

index bb2bf4aebc634947e5b4ee645ac4c9a5c522e170..8a94e021756b5ff5c8a75a6af77043b169071cc8 100644 (file)
--- a/client.go
+++ b/client.go
@@ -934,22 +934,6 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
        }
 }
 
-func (cl *Client) sendInitialPEX(conn *PeerConn, t *Torrent) {
-       xid, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex]
-       if !ok {
-               return
-       }
-       m, seq := t.pex.Genmsg(0)
-       conn.pexSeq = seq
-       if m.Len() == 0 {
-               cl.logger.Printf("no initial PEX this time")
-               // FIXME see how can we schedule another initial for later
-               return
-       }
-       conn.logger.Printf("sending initial PEX message: %v", m)
-       conn.post(m.Message(xid))
-}
-
 func (cl *Client) dhtPort() (ret uint16) {
        cl.eachDhtServer(func(s DhtServer) {
                ret = uint16(missinggo.AddrPort(s.Addr()))
index 52801865acbc66780e79fcc29ca3256fd2aa4aa7..91a4fdb8a3685374554bb6b8d862f33a98d0702d 100644 (file)
@@ -99,8 +99,8 @@ func (m *PexMsg) DeltaLen() int {
        return lenDropped
 }
 
-func (pexMsg *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
-       payload := bencode.MustMarshal(pexMsg)
+func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
+       payload := bencode.MustMarshal(m)
        return Message{
                Type:            Extended,
                ExtendedID:      pexExtendedId,
@@ -108,6 +108,15 @@ func (pexMsg *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
        }
 }
 
+func LoadPexMsg(b []byte) (*PexMsg, error) {
+       m := new(PexMsg) 
+       if err := bencode.Unmarshal(b, m); err != nil {
+               return nil, err
+       }
+       return m, nil
+}
+
+
 type PexPeerFlags byte
 
 func (me PexPeerFlags) Get(f PexPeerFlags) bool {
index be8018d6e6ff7e52a2993ea438e8813e37a5df27..04ce7785c2ecfc314b0713df65d850b96ac29001 100644 (file)
@@ -85,7 +85,7 @@ type PeerConn struct {
        // response.
        metadataRequests []bool
        sentHaves        bitmap.Bitmap
-       pexSeq           int
+       pex              pexConnState
 
        // Stuff controlled by the remote peer.
        PeerID                PeerID
@@ -321,6 +321,9 @@ func (cn *PeerConn) close() {
        if !cn.closed.Set() {
                return
        }
+       if cn.pex.IsEnabled() {
+               cn.pex.Close()
+       }
        cn.tickleWriter()
        cn.discardPieceInclination()
        cn._pieceRequestOrder.Clear()
@@ -559,7 +562,9 @@ func (cn *PeerConn) fillWriteBuffer(msg func(pp.Message) bool) {
                }
                cn.requestsLowWater = len(cn.requests) / 2
        }
-
+       if cn.pex.IsEnabled() {
+               cn.pex.Share(msg) // gated internally
+       }
        cn.upload(msg)
 }
 
@@ -1132,9 +1137,9 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
                        }
                }
                c.requestPendingMetadata()
-               if !cl.config.DisablePEX {
-                       cl.sendInitialPEX(c, t)
-                       // BUG no sending PEX updates yet
+               if !t.cl.config.DisablePEX {
+                       t.pex.Add(c) // we learnt enough now
+                       c.pex.Init(c)
                }
                return nil
        case metadataExtendedId:
@@ -1144,25 +1149,10 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
                }
                return nil
        case pexExtendedId:
-               if cl.config.DisablePEX {
-                       // TODO: Maybe close the connection. Check that we're not
-                       // advertising that we support PEX if it's disabled.
-                       return nil
-               }
-               c.logger.Printf("incoming PEX message")
-               var pexMsg pp.PexMsg
-               err := bencode.Unmarshal(payload, &pexMsg)
-               if err != nil {
-                       return fmt.Errorf("error unmarshalling PEX message: %s", err)
+               if !c.pex.IsEnabled() {
+                       return nil // or hang-up maybe?
                }
-               npeers := len(pexMsg.Added6) + len(pexMsg.Added)
-               c.logger.Printf("adding %d peers from PEX", npeers)
-               torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6)))
-               var peers Peers
-               peers.AppendFromPex(pexMsg.Added6, pexMsg.Added6Flags)
-               peers.AppendFromPex(pexMsg.Added, pexMsg.AddedFlags)
-               t.addPeers(peers)
-               return nil
+               return c.pex.Recv(payload)
        default:
                return fmt.Errorf("unexpected extended message ID: %v", id)
        }
@@ -1488,15 +1478,32 @@ func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags {
        if c.outgoing {
                f |= pp.PexOutgoingConn
        }
-       if c.utp() {
+       if c.remoteAddr != nil && strings.Contains(c.remoteAddr.Network(), "udp") {
                f |= pp.PexSupportsUtp
        }
        return f
 }
 
+func (c *PeerConn) dialAddr() net.Addr {
+       if !c.outgoing && c.PeerListenPort != 0 {
+               switch addr := c.remoteAddr.(type) {
+               case *net.TCPAddr:
+                       dialAddr := *addr
+                       dialAddr.Port = c.PeerListenPort
+                       return &dialAddr
+               case *net.UDPAddr:
+                       dialAddr := *addr
+                       dialAddr.Port = c.PeerListenPort
+                       return &dialAddr
+               }
+       }
+       return c.remoteAddr
+}
+
 func (c *PeerConn) pexEvent(t pexEventType) pexEvent {
        f := c.pexPeerFlags()
-       return pexEvent{t, c.remoteAddr, f}
+       addr := c.dialAddr()
+       return pexEvent{t, addr, f}
 }
 
 func (c *PeerConn) String() string {
index 6196bd80327630f4a214c84294ed036a66f7af8a..21abbc7c180a78fb699cba813b5de01dcd78df38 100644 (file)
@@ -144,7 +144,11 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
        require.EqualValues(b, b.N, cn._stats.ChunksReadUseful.Int64())
 }
 
-func TestPexPeerFlags(t *testing.T) {
+func TestConnPexPeerFlags(t *testing.T) {
+       var (
+               tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
+               udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
+       )
        var testcases = []struct {
                conn *PeerConn
                f    pp.PexPeerFlags
@@ -153,13 +157,52 @@ func TestPexPeerFlags(t *testing.T) {
                {&PeerConn{outgoing: false, PeerPrefersEncryption: true}, pp.PexPrefersEncryption},
                {&PeerConn{outgoing: true, PeerPrefersEncryption: false}, pp.PexOutgoingConn},
                {&PeerConn{outgoing: true, PeerPrefersEncryption: true}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
-               {&PeerConn{network: "udp4"}, pp.PexSupportsUtp},
-               {&PeerConn{outgoing: true, network: "udp6"}, pp.PexOutgoingConn | pp.PexSupportsUtp},
-               {&PeerConn{outgoing: true, network: "tcp4"}, pp.PexOutgoingConn},
-               {&PeerConn{network: "tcp6"}, 0},
+               {&PeerConn{remoteAddr: udpAddr}, pp.PexSupportsUtp},
+               {&PeerConn{remoteAddr: udpAddr, outgoing: true}, pp.PexOutgoingConn | pp.PexSupportsUtp},
+               {&PeerConn{remoteAddr: tcpAddr, outgoing: true}, pp.PexOutgoingConn},
+               {&PeerConn{remoteAddr: tcpAddr}, 0},
        }
        for i, tc := range testcases {
                f := tc.conn.pexPeerFlags()
                require.EqualValues(t, tc.f, f, i)
        }
 }
+
+func TestConnPexEvent(t *testing.T) {
+       var (
+               udpAddr     = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
+               tcpAddr     = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
+               dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
+               dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
+       )
+       var testcases = []struct {
+               t pexEventType
+               c *PeerConn
+               e pexEvent
+       }{
+               {
+                       pexAdd,
+                       &PeerConn{remoteAddr: udpAddr},
+                       pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp},
+               },
+               {
+                       pexDrop,
+                       &PeerConn{remoteAddr: tcpAddr, outgoing: true, PeerListenPort: dialTcpAddr.Port},
+                       pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn},
+               },
+               {
+                       pexAdd,
+                       &PeerConn{remoteAddr: tcpAddr, PeerListenPort: dialTcpAddr.Port},
+                       pexEvent{pexAdd, dialTcpAddr, 0},
+               },
+               {
+                       pexDrop,
+                       &PeerConn{remoteAddr: udpAddr, PeerListenPort: dialUdpAddr.Port},
+                       pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp},
+               },
+       }
+       for i, tc := range testcases {
+               e := tc.c.pexEvent(tc.t)
+               require.EqualValues(t, tc.e, e, i)
+       }
+}
diff --git a/pex.go b/pex.go
index 5f22e82b30e2bdbb97a5090b282a2a7fe864d997..acedd71d05bcdb85fc886de09cd8acf3c5d069f6 100644 (file)
--- a/pex.go
+++ b/pex.go
@@ -72,9 +72,14 @@ func (s *pexState) Add(c *PeerConn) {
        }
        e := c.pexEvent(pexAdd)
        s.ev = append(s.ev, e)
+       c.pex.Listed = true
 }
 
 func (s *pexState) Drop(c *PeerConn) {
+       if !c.pex.Listed {
+               // skip connections which were not previously Added
+               return
+       }
        e := c.pexEvent(pexDrop)
        s.nc--
        if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
index c170f0d71ef43227384fb8f2f563695a4aa03243..4967b5d92dd17810008edbe0ff9f0e02112d6f0c 100644 (file)
@@ -75,7 +75,7 @@ func TestPexAdded(t *testing.T) {
 func TestPexDropped(t *testing.T) {
        t.Run("belowTarg", func(t *testing.T) {
                s := &pexState{nc: 1}
-               s.Drop(&PeerConn{remoteAddr: addrs[0]})
+               s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}})
                targ := &pexState{
                        hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
                        nc:   0,
@@ -84,13 +84,19 @@ func TestPexDropped(t *testing.T) {
        })
        t.Run("aboveTarg", func(t *testing.T) {
                s := &pexState{nc: pexTargAdded + 1}
-               s.Drop(&PeerConn{remoteAddr: addrs[0]})
+               s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}})
                targ := &pexState{
                        ev: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
                        nc: pexTargAdded,
                }
                require.EqualValues(t, targ, s)
        })
+       t.Run("aboveTargNotListed", func(t *testing.T) {
+               s := &pexState{nc: pexTargAdded + 1}
+               s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: false}})
+               targ := &pexState{nc: pexTargAdded + 1}
+               require.EqualValues(t, targ, s)
+       })
 }
 
 func TestPexReset(t *testing.T) {
diff --git a/pexconn.go b/pexconn.go
new file mode 100644 (file)
index 0000000..51aac7e
--- /dev/null
@@ -0,0 +1,116 @@
+package torrent
+
+import (
+       "fmt"
+       "time"
+
+       "github.com/anacrolix/log"
+
+       pp "github.com/anacrolix/torrent/peer_protocol"
+)
+
+const (
+       pexRetryDelay = 10 * time.Second
+       pexInterval   = 1 * time.Minute
+)
+
+// per-connection PEX state
+type pexConnState struct {
+       enabled bool
+       xid     pp.ExtensionNumber
+       seq     int
+       timer   *time.Timer
+       gate    chan struct{}
+       readyfn func()
+       torrent *Torrent
+       Listed  bool
+       info    log.Logger
+       dbg     log.Logger
+}
+
+func (s *pexConnState) IsEnabled() bool {
+       return s.enabled
+}
+
+// Init is called from the reader goroutine upon the extended handshake completion
+func (s *pexConnState) Init(c *PeerConn) {
+       xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
+       if !ok || xid == 0 || c.t.cl.config.DisablePEX {
+               return
+       }
+       s.xid = xid
+       s.seq = 0
+       s.torrent = c.t
+       s.info = c.t.cl.logger
+       s.dbg = c.logger
+       s.readyfn = c.tickleWriter
+       s.gate = make(chan struct{}, 1)
+       s.timer = time.AfterFunc(0, func() {
+               s.gate <- struct{}{}
+               s.readyfn() // wake up the writer
+       })
+       s.enabled = true
+}
+
+// schedule next PEX message
+func (s *pexConnState) sched(delay time.Duration) {
+       s.timer.Reset(delay)
+}
+
+// generate next PEX message for the peer; returns nil if nothing yet to send
+func (s *pexConnState) genmsg() *pp.PexMsg {
+       tx, seq := s.torrent.pex.Genmsg(s.seq)
+       if tx.Len() == 0 {
+               return nil
+       }
+       s.seq = seq
+       return tx
+}
+
+// Share is called from the writer goroutine if when it is woken up with the write buffers empty
+func (s *pexConnState) Share(postfn messageWriter) {
+       select {
+       case <-s.gate:
+               if tx := s.genmsg(); tx != nil {
+                       s.dbg.Print("sending PEX message: ", tx)
+                       postfn(tx.Message(s.xid))
+                       s.sched(pexInterval)
+               } else {
+                       // no PEX to send this time - try again shortly
+                       s.sched(pexRetryDelay)
+               }
+       default:
+               return
+       }
+}
+
+// Recv is called from the reader goroutine
+func (s *pexConnState) Recv(payload []byte) error {
+       rx, err := pp.LoadPexMsg(payload)
+       if err != nil {
+               return fmt.Errorf("error unmarshalling PEX message: %s", err)
+       }
+       s.dbg.Print("incoming PEX message: ", rx)
+       torrent.Add("pex added peers received", int64(len(rx.Added)))
+       torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
+
+       var peers Peers
+       peers.AppendFromPex(rx.Added6, rx.Added6Flags)
+       peers.AppendFromPex(rx.Added, rx.AddedFlags)
+       s.dbg.Printf("adding %d peers from PEX", len(peers))
+       s.torrent.addPeers(peers)
+       // s.dbg.Print("known swarm now:", s.torrent.KnownSwarm())
+
+       // one day we may also want to:
+       // - check if the peer is not flooding us with PEX updates
+       // - handle drops somehow
+       // - detect malicious peers
+
+       return nil
+}
+
+func (s *pexConnState) Close() {
+       if s.timer != nil {
+               s.timer.Stop()
+       }
+}
diff --git a/pexconn_test.go b/pexconn_test.go
new file mode 100644 (file)
index 0000000..ffe0d85
--- /dev/null
@@ -0,0 +1,56 @@
+package torrent
+
+import (
+       "net"
+       "testing"
+
+       "github.com/anacrolix/dht/v2/krpc"
+       "github.com/stretchr/testify/require"
+
+       "github.com/anacrolix/torrent/metainfo"
+       pp "github.com/anacrolix/torrent/peer_protocol"
+)
+
+func TestPexConnState(t *testing.T) {
+       cl := Client{
+               config: TestingConfig(),
+       }
+       cl.initLogger()
+       torrent := cl.newTorrent(metainfo.Hash{}, nil)
+       addr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
+       c := cl.newConnection(nil, false, addr, "")
+       c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber)
+       c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId
+       c.writerCond.L.Lock()
+       c.setTorrent(torrent)
+       torrent.addConnection(c)
+
+       c.pex.Init(c)
+       require.True(t, c.pex.IsEnabled(), "should get enabled")
+       defer c.pex.Close()
+
+       var out pp.Message
+       writerCalled := false
+       testWriter := func(m pp.Message) bool {
+               writerCalled = true
+               out = m
+               return true
+       }
+       c.writerCond.Wait()
+       c.pex.Share(testWriter)
+       require.True(t, writerCalled)
+       require.EqualValues(t, pp.Extended, out.Type)
+       require.EqualValues(t, pexExtendedId, out.ExtendedID)
+
+       x, err := pp.LoadPexMsg(out.ExtendedPayload)
+       require.NoError(t, err)
+       targx := &pp.PexMsg{
+               Added:      krpc.CompactIPv4NodeAddrs(nil),
+               AddedFlags: []pp.PexPeerFlags{},
+               Added6: krpc.CompactIPv6NodeAddrs{
+                       nodeAddr(addr),
+               },
+               Added6Flags: []pp.PexPeerFlags{0},
+       }
+       require.EqualValues(t, targx, x)
+}
index 3b75e25542c66056ea8ce1fa77b28ea4b3cddfb6..db0126bfac1ccd8b3f03d23b463924dca41dca20 100644 (file)
@@ -259,7 +259,7 @@ func (t *Torrent) addPeer(p Peer) {
        if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
                if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
                        torrent.Add("peers not added because of bad addr", 1)
-                       cl.logger.Printf("peers not added because of bad addr: %v", p)
+                       // cl.logger.Printf("peers not added because of bad addr: %v", p)
                        return
                }
        }
@@ -1204,6 +1204,9 @@ func (t *Torrent) deleteConnection(c *PeerConn) (ret bool) {
        }
        _, ret = t.conns[c]
        delete(t.conns, c)
+       if !t.cl.config.DisablePEX {
+               t.pex.Drop(c)
+       }
        torrent.Add("deleted connections", 1)
        c.deleteAllRequests()
        if len(t.conns) == 0 {
@@ -1223,9 +1226,6 @@ func (t *Torrent) assertNoPendingRequests() {
 
 func (t *Torrent) dropConnection(c *PeerConn) {
        t.cl.event.Broadcast()
-       if !t.cl.config.DisablePEX {
-               t.pex.Drop(c)
-       }
        c.close()
        if t.deleteConnection(c) {
                t.openNewConns()
@@ -1498,8 +1498,8 @@ func (t *Torrent) addConnection(c *PeerConn) (err error) {
                panic(len(t.conns))
        }
        t.conns[c] = struct{}{}
-       if !t.cl.config.DisablePEX {
-               t.pex.Add(c)
+       if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
+               t.pex.Add(c) // as no further extended handshake expected
        }
        return nil
 }