]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Locate target peer using dial addr when receiving a holepunch rendezvous
authorMatt Joiner <anacrolix@gmail.com>
Mon, 1 May 2023 02:30:06 +0000 (12:30 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 9 May 2023 05:46:51 +0000 (15:46 +1000)
peer.go
peerconn.go
peerconn_test.go
torrent.go
ut-holepunching_test.go

diff --git a/peer.go b/peer.go
index 4e2a24f60b5de6e2feb0ee981beecfbfc9806fd2..1d8ead1002b49758b0564ac47516d9f6fc420c80 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -86,7 +86,6 @@ type (
                peerChoking           bool
                peerRequests          map[Request]*peerRequestState
                PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
-               PeerListenPort        int
                // The highest possible number of pieces the torrent could have based on
                // communication with the peer. Generally only useful until we have the
                // torrent info.
index 71a38925fd02e9ff55048f8ce07b83663a0572b9..d85d0d511c5417b8aa4bbfd420e91e00b655bd00 100644 (file)
@@ -15,10 +15,12 @@ import (
        "time"
 
        "github.com/RoaringBitmap/roaring"
+       "github.com/anacrolix/generics"
        . "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/multiless"
+       "golang.org/x/exp/maps"
        "golang.org/x/time/rate"
 
        "github.com/anacrolix/torrent/bencode"
@@ -40,6 +42,7 @@ type PeerConn struct {
        // See BEP 3 etc.
        PeerID             PeerID
        PeerExtensionBytes pp.PeerExtensionBits
+       PeerListenPort     int
 
        // The actual Conn, used for closing, and setting socket options. Do not use methods on this
        // while holding any mutexes.
@@ -75,11 +78,25 @@ func (cn *PeerConn) pexStatus() string {
        if !cn.supportsExtension(pp.ExtensionNamePex) {
                return "unsupported"
        }
-       return fmt.Sprintf(
-               "%v conns, %v unsent events",
-               len(cn.pex.remoteLiveConns),
-               cn.pex.numPending(),
-       )
+       if true {
+               return fmt.Sprintf(
+                       "%v conns, %v unsent events",
+                       len(cn.pex.remoteLiveConns),
+                       cn.pex.numPending(),
+               )
+       } else {
+               // This alternative branch prints out the remote live conn addresses.
+               return fmt.Sprintf(
+                       "%v conns, %v unsent events",
+                       strings.Join(generics.SliceMap(
+                               maps.Keys(cn.pex.remoteLiveConns),
+                               func(from netip.AddrPort) string {
+                                       return from.String()
+                               }), ","),
+                       cn.pex.numPending(),
+               )
+
+       }
 }
 
 func (cn *PeerConn) peerImplStatusLines() []string {
@@ -1049,6 +1066,8 @@ func (c *PeerConn) dialAddr() PeerRemoteAddr {
                        dialAddr := *addr
                        dialAddr.Port = c.PeerListenPort
                        return &dialAddr
+               default:
+                       panic(addr)
                }
        }
        return c.RemoteAddr
@@ -1082,6 +1101,11 @@ func (pc *PeerConn) remoteAddrPort() Option[netip.AddrPort] {
        }).AddrPort())
 }
 
+func (pc *PeerConn) remoteDialAddrPort() (netip.AddrPort, error) {
+       dialAddr := pc.dialAddr()
+       return addrPortFromPeerRemoteAddr(dialAddr)
+}
+
 func (pc *PeerConn) bitExtensionEnabled(bit pp.ExtensionBit) bool {
        return pc.t.cl.config.Extensions.GetBit(bit) && pc.PeerExtensionBytes.GetBit(bit)
 }
index c68ca6f8cfb77ca41f0aa69cfb5e6e9a1fda9a69..f0a5a6fd9a1489d5f375fc8264fc0baaaaaa627a 100644 (file)
@@ -200,17 +200,17 @@ func TestConnPexEvent(t *testing.T) {
                },
                {
                        pexDrop,
-                       &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
+                       &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, PeerListenPort: dialTcpAddr.Port},
                        pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
                },
                {
                        pexAdd,
-                       &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
+                       &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, PeerListenPort: dialTcpAddr.Port},
                        pexEvent{pexAdd, dialTcpAddr, 0, nil},
                },
                {
                        pexDrop,
-                       &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
+                       &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, PeerListenPort: dialUdpAddr.Port},
                        pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
                },
        }
index eef9c0cb8da16ca8a5924e5918633a57e69953c0..c7ca2549230cfd5e2ae6bc9cd0b0bec9f0997213 100644 (file)
@@ -2002,8 +2002,9 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
        }
        t.conns[c] = struct{}{}
        t.cl.event.Broadcast()
+       // We'll never receive the "p" extended handshake parameter.
        if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
-               t.pex.Add(c) // as no further extended handshake expected
+               t.pex.Add(c)
        }
        return nil
 }
@@ -2678,10 +2679,13 @@ func (t *Torrent) checkValidReceiveChunk(r Request) error {
        return nil
 }
 
-func (t *Torrent) peerConnsWithRemoteAddrPort(addrPort netip.AddrPort) (ret []*PeerConn) {
+func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) {
        for pc := range t.conns {
-               addr := pc.remoteAddrPort()
-               if !(addr.Ok && addr.Value == addrPort) {
+               dialAddr, err := pc.remoteDialAddrPort()
+               if err != nil {
+                       continue
+               }
+               if dialAddr != target {
                        continue
                }
                ret = append(ret, pc)
@@ -2725,7 +2729,18 @@ func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *Peer
        case utHolepunch.Rendezvous:
                t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender)
                sendMsg := sendUtHolepunchMsg
-               targets := t.peerConnsWithRemoteAddrPort(msg.AddrPort)
+               senderAddrPort, err := sender.remoteDialAddrPort()
+               if err != nil {
+                       sender.logger.Levelf(
+                               log.Warning,
+                               "error getting ut_holepunch rendezvous sender's dial address: %v",
+                               err,
+                       )
+                       // There's no better error code. The sender's address itself is invalid. I don't see
+                       // this error message being appropriate anywhere else anyway.
+                       sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer)
+               }
+               targets := t.peerConnsWithDialAddrPort(msg.AddrPort)
                if len(targets) == 0 {
                        sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected)
                        return nil
@@ -2736,7 +2751,7 @@ func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *Peer
                                continue
                        }
                        sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0)
-                       sendMsg(pc, utHolepunch.Connect, sender.remoteAddrPort().Unwrap(), 0)
+                       sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0)
                }
                return nil
        case utHolepunch.Connect:
index 1b4b7d08c29aff12af0a0c5aff318b0dfe60be55..fe73e8d11b050420a03a73c47a9e0655bb27961d 100644 (file)
@@ -6,6 +6,8 @@ import (
        "testing"
        "testing/iotest"
 
+       "github.com/anacrolix/log"
+
        "github.com/anacrolix/torrent/internal/testutil"
        qt "github.com/frankban/quicktest"
        "github.com/stretchr/testify/assert"
@@ -23,8 +25,9 @@ func TestHolepunchConnect(t *testing.T) {
        cfg.MaxAllocPeerRequestDataPerConn = 4
        cfg.DataDir = greetingTempDir
        cfg.DisablePEX = true
-       //cfg.Debug = true
+       cfg.Debug = true
        cfg.AcceptPeerConnections = false
+       //cfg.DisableUTP = true
        seeder, err := NewClient(cfg)
        require.NoError(t, err)
        defer seeder.Close()
@@ -51,7 +54,7 @@ func TestHolepunchConnect(t *testing.T) {
        cfg.Seed = false
        cfg.DataDir = t.TempDir()
        cfg.MaxAllocPeerRequestDataPerConn = 4
-       cfg.Debug = true
+       //cfg.Debug = true
        //cfg.DisableUTP = true
        leecherLeecher, _ := NewClient(cfg)
        require.NoError(t, err)
@@ -85,9 +88,12 @@ func TestHolepunchConnect(t *testing.T) {
        waitForConns(seederTorrent)
        go llg.AddClientPeer(leecher)
        waitForConns(llg)
+       //time.Sleep(time.Second)
        llg.cl.lock()
+       targetAddr := seeder.ListenAddrs()[1]
+       log.Printf("trying to initiate to %v", targetAddr)
        llg.initiateConn(PeerInfo{
-               Addr: seeder.ListenAddrs()[0],
+               Addr: targetAddr,
        }, true, false)
        llg.cl.unlock()
        wg.Wait()