From 214fe6b93c62c805f2ddfe056238e84c9cd5ede7 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 11 Jul 2022 18:22:23 +1000 Subject: [PATCH] Use webrtc local addr for webrtc conn peer priority WebRTC conns are providing the correct IP for peer priority calculations, so use that instead of trying to guess (which doesn't work if there are no regular conn listeners attached to the Client. (cherry picked from commit e86bb5fee3958dc90a3d012469b1352005d6c9ad) --- bep40_test.go | 4 ++++ client.go | 60 +++++++++++++++++++++++++++++++++--------------- peer.go | 2 ++ peerconn.go | 13 +++++++---- peerconn_test.go | 11 ++++++--- pexconn_test.go | 5 +++- torrent.go | 13 +++++++---- 7 files changed, 77 insertions(+), 31 deletions(-) diff --git a/bep40_test.go b/bep40_test.go index 78b3ac21..48d5fdda 100644 --- a/bep40_test.go +++ b/bep40_test.go @@ -20,6 +20,10 @@ func TestBep40Priority(t *testing.T) { IpPort{IP: net.ParseIP("123.213.32.10"), Port: 0}, IpPort{IP: net.ParseIP("123.213.32.234"), Port: 0}, )) + assert.Equal(t, peerPriority(0x2b41d456), bep40PriorityIgnoreError( + IpPort{IP: net.ParseIP("206.248.98.111"), Port: 0}, + IpPort{IP: net.ParseIP("142.147.89.224"), Port: 0}, + )) assert.EqualValues(t, "\x00\x00\x00\x00", func() []byte { b, _ := bep40PriorityBytes( IpPort{IP: net.ParseIP("123.213.32.234"), Port: 0}, diff --git a/client.go b/client.go index 70d3d103..dd03f338 100644 --- a/client.go +++ b/client.go @@ -551,8 +551,16 @@ func (cl *Client) incomingConnection(nc net.Conn) { if tc, ok := nc.(*net.TCPConn); ok { tc.SetLinger(0) } - c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(), - regularNetConnPeerConnConnString(nc)) + remoteAddr, _ := tryIpPortFromNetAddr(nc.RemoteAddr()) + c := cl.newConnection( + nc, + newConnectionOpts{ + outgoing: false, + remoteAddr: nc.RemoteAddr(), + localPublicAddr: cl.publicAddr(remoteAddr.IP), + network: nc.RemoteAddr().Network(), + connString: regularNetConnPeerConnConnString(nc), + }) defer func() { cl.lock() defer cl.unlock() @@ -687,13 +695,12 @@ func (cl *Client) initiateProtocolHandshakes( ctx context.Context, nc net.Conn, t *Torrent, - outgoing, encryptHeader bool, - remoteAddr PeerRemoteAddr, - network, connString string, + encryptHeader bool, + newConnOpts newConnectionOpts, ) ( c *PeerConn, err error, ) { - c = cl.newConnection(nc, outgoing, remoteAddr, network, connString) + c = cl.newConnection(nc, newConnOpts) c.headerEncrypted = encryptHeader ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout) defer cancel() @@ -725,7 +732,15 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfus } return nil, errors.New("dial failed") } - c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc)) + addrIpPort, _ := tryIpPortFromNetAddr(addr) + c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{ + outgoing: true, + remoteAddr: addr, + // It would be possible to retrieve a public IP from the dialer used here? + localPublicAddr: cl.publicAddr(addrIpPort.IP), + network: dr.Dialer.DialerNetwork(), + connString: regularNetConnPeerConnConnString(nc), + }) if err != nil { nc.Close() } @@ -1468,28 +1483,37 @@ func (cl *Client) banPeerIP(ip net.IP) { } } -func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) { - if network == "" { - panic(remoteAddr) +type newConnectionOpts struct { + outgoing bool + remoteAddr PeerRemoteAddr + localPublicAddr peerLocalPublicAddr + network string + connString string +} + +func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) { + if opts.network == "" { + panic(opts.remoteAddr) } c = &PeerConn{ Peer: Peer{ - outgoing: outgoing, + outgoing: opts.outgoing, choking: true, peerChoking: true, PeerMaxRequests: 250, - RemoteAddr: remoteAddr, - Network: network, - callbacks: &cl.config.Callbacks, + RemoteAddr: opts.remoteAddr, + localPublicAddr: opts.localPublicAddr, + Network: opts.network, + callbacks: &cl.config.Callbacks, }, - connString: connString, + connString: opts.connString, conn: nc, } c.initRequestState() // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses. - if remoteAddr != nil { - netipAddrPort, err := netip.ParseAddrPort(remoteAddr.String()) + if opts.remoteAddr != nil { + netipAddrPort, err := netip.ParseAddrPort(opts.remoteAddr.String()) if err == nil { c.bannableAddr = Some(netipAddrPort.Addr()) } @@ -1501,7 +1525,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot l: cl.config.DownloadRateLimiter, r: c.r, } - c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing) + c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", opts.remoteAddr, opts.network, opts.outgoing) for _, f := range cl.config.Callbacks.NewPeer { f(&c.Peer) } diff --git a/peer.go b/peer.go index e1bab184..3b8d4586 100644 --- a/peer.go +++ b/peer.go @@ -1,5 +1,7 @@ package torrent +type peerLocalPublicAddr = IpPort + func (p *Peer) isLowOnRequests() bool { return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty() } diff --git a/peerconn.go b/peerconn.go index bd6b376b..77857d84 100644 --- a/peerconn.go +++ b/peerconn.go @@ -64,10 +64,13 @@ type Peer struct { peerImpl callbacks *Callbacks - outgoing bool - Network string - RemoteAddr PeerRemoteAddr - bannableAddr Option[bannableAddr] + outgoing bool + Network string + RemoteAddr PeerRemoteAddr + // The local address as observed by the remote peer. WebRTC seems to get this right without needing hints from the + // config. + localPublicAddr peerLocalPublicAddr + bannableAddr Option[bannableAddr] // True if the connection is operating over MSE obfuscation. headerEncrypted bool cryptoMethod mse.CryptoMethod @@ -1729,7 +1732,7 @@ func (c *PeerConn) setTorrent(t *Torrent) { } func (c *Peer) peerPriority() (peerPriority, error) { - return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp())) + return bep40Priority(c.remoteIpPort(), c.localPublicAddr) } func (c *Peer) remoteIp() net.IP { diff --git a/peerconn_test.go b/peerconn_test.go index 2f1fc56a..23d32286 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -25,7 +25,7 @@ func TestSendBitfieldThenHave(t *testing.T) { var cl Client cl.init(TestingConfig(t)) cl.initLogger() - c := cl.newConnection(nil, false, nil, "io.Pipe", "") + c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"}) c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil { t.Log(err) @@ -107,7 +107,12 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { t.onSetInfo() t._pendingPieces.Add(0) r, w := net.Pipe() - cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r)) + cn := cl.newConnection(r, newConnectionOpts{ + outgoing: true, + remoteAddr: r.RemoteAddr(), + network: r.RemoteAddr().Network(), + connString: regularNetConnPeerConnConnString(r), + }) cn.setTorrent(t) mrlErrChan := make(chan error) msg := pp.Message{ @@ -287,7 +292,7 @@ func TestPreferredNetworkDirection(t *testing.T) { func TestReceiveLargeRequest(t *testing.T) { c := qt.New(t) cl := newTestingClient(t) - pc := cl.newConnection(nil, false, nil, "test", "") + pc := cl.newConnection(nil, newConnectionOpts{network: "test"}) tor := cl.newTorrentForTesting() tor.info = &metainfo.Info{PieceLength: 3 << 20} pc.setTorrent(tor) diff --git a/pexconn_test.go b/pexconn_test.go index 42c62cff..603398c5 100644 --- a/pexconn_test.go +++ b/pexconn_test.go @@ -17,7 +17,10 @@ func TestPexConnState(t *testing.T) { cl.initLogger() torrent := cl.newTorrent(metainfo.Hash{}, nil) addr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4747} - c := cl.newConnection(nil, false, addr, addr.Network(), "") + c := cl.newConnection(nil, newConnectionOpts{ + remoteAddr: addr, + network: addr.Network(), + }) c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber) c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId c.messageWriter.mu.Lock() diff --git a/torrent.go b/torrent.go index f18fc252..157b44d4 100644 --- a/torrent.go +++ b/torrent.go @@ -1575,18 +1575,23 @@ func (t *Torrent) onWebRtcConn( DataChannelContext: dcc, } peerRemoteAddr := netConn.RemoteAddr() + //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr) if t.cl.badPeerAddr(peerRemoteAddr) { return } + localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr()) pc, err := t.cl.initiateProtocolHandshakes( context.Background(), netConn, t, - dcc.LocalOffered, false, - netConn.RemoteAddr(), - webrtcNetwork, - fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)), + newConnectionOpts{ + outgoing: dcc.LocalOffered, + remoteAddr: peerRemoteAddr, + localPublicAddr: localAddrIpPort, + network: webrtcNetwork, + connString: fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)), + }, ) if err != nil { t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err) -- 2.44.0