From 2b2c480706e5d930a9fba246e755e026b4966c77 Mon Sep 17 00:00:00 2001 From: Yaroslav Kolomiiets Date: Wed, 8 Apr 2020 17:03:29 +0100 Subject: [PATCH] PEX: add periodic deltas --- client.go | 16 ------ peer_protocol/pex.go | 13 ++++- peerconn.go | 57 +++++++++++---------- peerconn_test.go | 53 ++++++++++++++++++-- pex.go | 5 ++ pex_test.go | 10 +++- pexconn.go | 116 +++++++++++++++++++++++++++++++++++++++++++ pexconn_test.go | 56 +++++++++++++++++++++ torrent.go | 12 ++--- 9 files changed, 282 insertions(+), 56 deletions(-) create mode 100644 pexconn.go create mode 100644 pexconn_test.go diff --git a/client.go b/client.go index bb2bf4ae..8a94e021 100644 --- a/client.go +++ b/client.go @@ -934,22 +934,6 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { } } -func (cl *Client) sendInitialPEX(conn *PeerConn, t *Torrent) { - xid, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex] - if !ok { - return - } - m, seq := t.pex.Genmsg(0) - conn.pexSeq = seq - if m.Len() == 0 { - cl.logger.Printf("no initial PEX this time") - // FIXME see how can we schedule another initial for later - return - } - conn.logger.Printf("sending initial PEX message: %v", m) - conn.post(m.Message(xid)) -} - func (cl *Client) dhtPort() (ret uint16) { cl.eachDhtServer(func(s DhtServer) { ret = uint16(missinggo.AddrPort(s.Addr())) diff --git a/peer_protocol/pex.go b/peer_protocol/pex.go index 52801865..91a4fdb8 100644 --- a/peer_protocol/pex.go +++ b/peer_protocol/pex.go @@ -99,8 +99,8 @@ func (m *PexMsg) DeltaLen() int { return lenDropped } -func (pexMsg *PexMsg) Message(pexExtendedId ExtensionNumber) Message { - payload := bencode.MustMarshal(pexMsg) +func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message { + payload := bencode.MustMarshal(m) return Message{ Type: Extended, ExtendedID: pexExtendedId, @@ -108,6 +108,15 @@ func (pexMsg *PexMsg) Message(pexExtendedId ExtensionNumber) Message { } } +func LoadPexMsg(b []byte) (*PexMsg, error) { + m := new(PexMsg) + if err := bencode.Unmarshal(b, m); err != nil { + return nil, err + } + return m, nil +} + + type PexPeerFlags byte func (me PexPeerFlags) Get(f PexPeerFlags) bool { diff --git a/peerconn.go b/peerconn.go index be8018d6..04ce7785 100644 --- a/peerconn.go +++ b/peerconn.go @@ -85,7 +85,7 @@ type PeerConn struct { // response. metadataRequests []bool sentHaves bitmap.Bitmap - pexSeq int + pex pexConnState // Stuff controlled by the remote peer. PeerID PeerID @@ -321,6 +321,9 @@ func (cn *PeerConn) close() { if !cn.closed.Set() { return } + if cn.pex.IsEnabled() { + cn.pex.Close() + } cn.tickleWriter() cn.discardPieceInclination() cn._pieceRequestOrder.Clear() @@ -559,7 +562,9 @@ func (cn *PeerConn) fillWriteBuffer(msg func(pp.Message) bool) { } cn.requestsLowWater = len(cn.requests) / 2 } - + if cn.pex.IsEnabled() { + cn.pex.Share(msg) // gated internally + } cn.upload(msg) } @@ -1132,9 +1137,9 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err } } c.requestPendingMetadata() - if !cl.config.DisablePEX { - cl.sendInitialPEX(c, t) - // BUG no sending PEX updates yet + if !t.cl.config.DisablePEX { + t.pex.Add(c) // we learnt enough now + c.pex.Init(c) } return nil case metadataExtendedId: @@ -1144,25 +1149,10 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err } return nil case pexExtendedId: - if cl.config.DisablePEX { - // TODO: Maybe close the connection. Check that we're not - // advertising that we support PEX if it's disabled. - return nil - } - c.logger.Printf("incoming PEX message") - var pexMsg pp.PexMsg - err := bencode.Unmarshal(payload, &pexMsg) - if err != nil { - return fmt.Errorf("error unmarshalling PEX message: %s", err) + if !c.pex.IsEnabled() { + return nil // or hang-up maybe? } - npeers := len(pexMsg.Added6) + len(pexMsg.Added) - c.logger.Printf("adding %d peers from PEX", npeers) - torrent.Add("pex added6 peers received", int64(len(pexMsg.Added6))) - var peers Peers - peers.AppendFromPex(pexMsg.Added6, pexMsg.Added6Flags) - peers.AppendFromPex(pexMsg.Added, pexMsg.AddedFlags) - t.addPeers(peers) - return nil + return c.pex.Recv(payload) default: return fmt.Errorf("unexpected extended message ID: %v", id) } @@ -1488,15 +1478,32 @@ func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags { if c.outgoing { f |= pp.PexOutgoingConn } - if c.utp() { + if c.remoteAddr != nil && strings.Contains(c.remoteAddr.Network(), "udp") { f |= pp.PexSupportsUtp } return f } +func (c *PeerConn) dialAddr() net.Addr { + if !c.outgoing && c.PeerListenPort != 0 { + switch addr := c.remoteAddr.(type) { + case *net.TCPAddr: + dialAddr := *addr + dialAddr.Port = c.PeerListenPort + return &dialAddr + case *net.UDPAddr: + dialAddr := *addr + dialAddr.Port = c.PeerListenPort + return &dialAddr + } + } + return c.remoteAddr +} + func (c *PeerConn) pexEvent(t pexEventType) pexEvent { f := c.pexPeerFlags() - return pexEvent{t, c.remoteAddr, f} + addr := c.dialAddr() + return pexEvent{t, addr, f} } func (c *PeerConn) String() string { diff --git a/peerconn_test.go b/peerconn_test.go index 6196bd80..21abbc7c 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -144,7 +144,11 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { require.EqualValues(b, b.N, cn._stats.ChunksReadUseful.Int64()) } -func TestPexPeerFlags(t *testing.T) { +func TestConnPexPeerFlags(t *testing.T) { + var ( + tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} + udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848} + ) var testcases = []struct { conn *PeerConn f pp.PexPeerFlags @@ -153,13 +157,52 @@ func TestPexPeerFlags(t *testing.T) { {&PeerConn{outgoing: false, PeerPrefersEncryption: true}, pp.PexPrefersEncryption}, {&PeerConn{outgoing: true, PeerPrefersEncryption: false}, pp.PexOutgoingConn}, {&PeerConn{outgoing: true, PeerPrefersEncryption: true}, pp.PexOutgoingConn | pp.PexPrefersEncryption}, - {&PeerConn{network: "udp4"}, pp.PexSupportsUtp}, - {&PeerConn{outgoing: true, network: "udp6"}, pp.PexOutgoingConn | pp.PexSupportsUtp}, - {&PeerConn{outgoing: true, network: "tcp4"}, pp.PexOutgoingConn}, - {&PeerConn{network: "tcp6"}, 0}, + {&PeerConn{remoteAddr: udpAddr}, pp.PexSupportsUtp}, + {&PeerConn{remoteAddr: udpAddr, outgoing: true}, pp.PexOutgoingConn | pp.PexSupportsUtp}, + {&PeerConn{remoteAddr: tcpAddr, outgoing: true}, pp.PexOutgoingConn}, + {&PeerConn{remoteAddr: tcpAddr}, 0}, } for i, tc := range testcases { f := tc.conn.pexPeerFlags() require.EqualValues(t, tc.f, f, i) } } + +func TestConnPexEvent(t *testing.T) { + var ( + udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848} + tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} + dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747} + dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747} + ) + var testcases = []struct { + t pexEventType + c *PeerConn + e pexEvent + }{ + { + pexAdd, + &PeerConn{remoteAddr: udpAddr}, + pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp}, + }, + { + pexDrop, + &PeerConn{remoteAddr: tcpAddr, outgoing: true, PeerListenPort: dialTcpAddr.Port}, + pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn}, + }, + { + pexAdd, + &PeerConn{remoteAddr: tcpAddr, PeerListenPort: dialTcpAddr.Port}, + pexEvent{pexAdd, dialTcpAddr, 0}, + }, + { + pexDrop, + &PeerConn{remoteAddr: udpAddr, PeerListenPort: dialUdpAddr.Port}, + pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp}, + }, + } + for i, tc := range testcases { + e := tc.c.pexEvent(tc.t) + require.EqualValues(t, tc.e, e, i) + } +} diff --git a/pex.go b/pex.go index 5f22e82b..acedd71d 100644 --- a/pex.go +++ b/pex.go @@ -72,9 +72,14 @@ func (s *pexState) Add(c *PeerConn) { } e := c.pexEvent(pexAdd) s.ev = append(s.ev, e) + c.pex.Listed = true } func (s *pexState) Drop(c *PeerConn) { + if !c.pex.Listed { + // skip connections which were not previously Added + return + } e := c.pexEvent(pexDrop) s.nc-- if s.nc < pexTargAdded && len(s.hold) < pexMaxHold { diff --git a/pex_test.go b/pex_test.go index c170f0d7..4967b5d9 100644 --- a/pex_test.go +++ b/pex_test.go @@ -75,7 +75,7 @@ func TestPexAdded(t *testing.T) { func TestPexDropped(t *testing.T) { t.Run("belowTarg", func(t *testing.T) { s := &pexState{nc: 1} - s.Drop(&PeerConn{remoteAddr: addrs[0]}) + s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}}) targ := &pexState{ hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}}, nc: 0, @@ -84,13 +84,19 @@ func TestPexDropped(t *testing.T) { }) t.Run("aboveTarg", func(t *testing.T) { s := &pexState{nc: pexTargAdded + 1} - s.Drop(&PeerConn{remoteAddr: addrs[0]}) + s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}}) targ := &pexState{ ev: []pexEvent{pexEvent{pexDrop, addrs[0], 0}}, nc: pexTargAdded, } require.EqualValues(t, targ, s) }) + t.Run("aboveTargNotListed", func(t *testing.T) { + s := &pexState{nc: pexTargAdded + 1} + s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: false}}) + targ := &pexState{nc: pexTargAdded + 1} + require.EqualValues(t, targ, s) + }) } func TestPexReset(t *testing.T) { diff --git a/pexconn.go b/pexconn.go new file mode 100644 index 00000000..51aac7e9 --- /dev/null +++ b/pexconn.go @@ -0,0 +1,116 @@ +package torrent + +import ( + "fmt" + "time" + + "github.com/anacrolix/log" + + pp "github.com/anacrolix/torrent/peer_protocol" +) + +const ( + pexRetryDelay = 10 * time.Second + pexInterval = 1 * time.Minute +) + +// per-connection PEX state +type pexConnState struct { + enabled bool + xid pp.ExtensionNumber + seq int + timer *time.Timer + gate chan struct{} + readyfn func() + torrent *Torrent + Listed bool + info log.Logger + dbg log.Logger +} + +func (s *pexConnState) IsEnabled() bool { + return s.enabled +} + +// Init is called from the reader goroutine upon the extended handshake completion +func (s *pexConnState) Init(c *PeerConn) { + xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex] + if !ok || xid == 0 || c.t.cl.config.DisablePEX { + return + } + s.xid = xid + s.seq = 0 + s.torrent = c.t + s.info = c.t.cl.logger + s.dbg = c.logger + s.readyfn = c.tickleWriter + s.gate = make(chan struct{}, 1) + s.timer = time.AfterFunc(0, func() { + s.gate <- struct{}{} + s.readyfn() // wake up the writer + }) + s.enabled = true +} + +// schedule next PEX message +func (s *pexConnState) sched(delay time.Duration) { + s.timer.Reset(delay) +} + +// generate next PEX message for the peer; returns nil if nothing yet to send +func (s *pexConnState) genmsg() *pp.PexMsg { + tx, seq := s.torrent.pex.Genmsg(s.seq) + if tx.Len() == 0 { + return nil + } + s.seq = seq + return tx +} + +// Share is called from the writer goroutine if when it is woken up with the write buffers empty +func (s *pexConnState) Share(postfn messageWriter) { + select { + case <-s.gate: + if tx := s.genmsg(); tx != nil { + s.dbg.Print("sending PEX message: ", tx) + postfn(tx.Message(s.xid)) + s.sched(pexInterval) + } else { + // no PEX to send this time - try again shortly + s.sched(pexRetryDelay) + } + default: + return + } +} + +// Recv is called from the reader goroutine +func (s *pexConnState) Recv(payload []byte) error { + rx, err := pp.LoadPexMsg(payload) + if err != nil { + return fmt.Errorf("error unmarshalling PEX message: %s", err) + } + s.dbg.Print("incoming PEX message: ", rx) + torrent.Add("pex added peers received", int64(len(rx.Added))) + torrent.Add("pex added6 peers received", int64(len(rx.Added6))) + + var peers Peers + peers.AppendFromPex(rx.Added6, rx.Added6Flags) + peers.AppendFromPex(rx.Added, rx.AddedFlags) + s.dbg.Printf("adding %d peers from PEX", len(peers)) + s.torrent.addPeers(peers) + // s.dbg.Print("known swarm now:", s.torrent.KnownSwarm()) + + // one day we may also want to: + // - check if the peer is not flooding us with PEX updates + // - handle drops somehow + // - detect malicious peers + + return nil +} + +func (s *pexConnState) Close() { + if s.timer != nil { + s.timer.Stop() + } +} diff --git a/pexconn_test.go b/pexconn_test.go new file mode 100644 index 00000000..ffe0d85e --- /dev/null +++ b/pexconn_test.go @@ -0,0 +1,56 @@ +package torrent + +import ( + "net" + "testing" + + "github.com/anacrolix/dht/v2/krpc" + "github.com/stretchr/testify/require" + + "github.com/anacrolix/torrent/metainfo" + pp "github.com/anacrolix/torrent/peer_protocol" +) + +func TestPexConnState(t *testing.T) { + cl := Client{ + config: TestingConfig(), + } + cl.initLogger() + torrent := cl.newTorrent(metainfo.Hash{}, nil) + addr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4747} + c := cl.newConnection(nil, false, addr, "") + c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber) + c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId + c.writerCond.L.Lock() + c.setTorrent(torrent) + torrent.addConnection(c) + + c.pex.Init(c) + require.True(t, c.pex.IsEnabled(), "should get enabled") + defer c.pex.Close() + + var out pp.Message + writerCalled := false + testWriter := func(m pp.Message) bool { + writerCalled = true + out = m + return true + } + c.writerCond.Wait() + c.pex.Share(testWriter) + require.True(t, writerCalled) + require.EqualValues(t, pp.Extended, out.Type) + require.EqualValues(t, pexExtendedId, out.ExtendedID) + + x, err := pp.LoadPexMsg(out.ExtendedPayload) + require.NoError(t, err) + targx := &pp.PexMsg{ + Added: krpc.CompactIPv4NodeAddrs(nil), + AddedFlags: []pp.PexPeerFlags{}, + Added6: krpc.CompactIPv6NodeAddrs{ + nodeAddr(addr), + }, + Added6Flags: []pp.PexPeerFlags{0}, + } + require.EqualValues(t, targx, x) +} diff --git a/torrent.go b/torrent.go index 3b75e255..db0126bf 100644 --- a/torrent.go +++ b/torrent.go @@ -259,7 +259,7 @@ func (t *Torrent) addPeer(p Peer) { if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok { if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) { torrent.Add("peers not added because of bad addr", 1) - cl.logger.Printf("peers not added because of bad addr: %v", p) + // cl.logger.Printf("peers not added because of bad addr: %v", p) return } } @@ -1204,6 +1204,9 @@ func (t *Torrent) deleteConnection(c *PeerConn) (ret bool) { } _, ret = t.conns[c] delete(t.conns, c) + if !t.cl.config.DisablePEX { + t.pex.Drop(c) + } torrent.Add("deleted connections", 1) c.deleteAllRequests() if len(t.conns) == 0 { @@ -1223,9 +1226,6 @@ func (t *Torrent) assertNoPendingRequests() { func (t *Torrent) dropConnection(c *PeerConn) { t.cl.event.Broadcast() - if !t.cl.config.DisablePEX { - t.pex.Drop(c) - } c.close() if t.deleteConnection(c) { t.openNewConns() @@ -1498,8 +1498,8 @@ func (t *Torrent) addConnection(c *PeerConn) (err error) { panic(len(t.conns)) } t.conns[c] = struct{}{} - if !t.cl.config.DisablePEX { - t.pex.Add(c) + if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() { + t.pex.Add(c) // as no further extended handshake expected } return nil } -- 2.44.0