From: Matt Joiner <anacrolix@gmail.com>
Date: Fri, 12 May 2023 03:47:24 +0000 (+1000)
Subject: Add holepunching stats and tests
X-Git-Tag: v1.51.0~16
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=0b6209062d730cbbda0bb66391089f5c2b0e8084;p=btrtrc.git

Add holepunching stats and tests
---

diff --git a/client-stats.go b/client-stats.go
index 3513f8f1..1a800970 100644
--- a/client-stats.go
+++ b/client-stats.go
@@ -1,5 +1,17 @@
 package torrent
 
+import (
+	"net/netip"
+)
+
+type clientHolepunchAddrSets struct {
+	undialableWithoutHolepunch                            map[netip.AddrPort]struct{}
+	undialableWithoutHolepunchDialedAfterHolepunchConnect map[netip.AddrPort]struct{}
+	dialableOnlyAfterHolepunch                            map[netip.AddrPort]struct{}
+	dialedSuccessfullyAfterHolepunchConnect               map[netip.AddrPort]struct{}
+	probablyOnlyConnectedDueToHolepunch                   map[netip.AddrPort]struct{}
+}
+
 type ClientStats struct {
 	ConnStats
 
@@ -8,10 +20,26 @@ type ClientStats struct {
 	// if a Torrent is dropped while there are outstanding dials.
 	ActiveHalfOpenAttempts int
 
+	NumPeersUndialableWithoutHolepunch int
 	// Number of unique peer addresses that were dialed after receiving a holepunch connect message,
 	// that have previously been undialable without any hole-punching attempts.
 	NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect int
 	// Number of unique peer addresses that were successfully dialed and connected after a holepunch
 	// connect message and previously failing to connect without holepunching.
-	NumPeersDialableOnlyAfterHolepunch int
+	NumPeersDialableOnlyAfterHolepunch              int
+	NumPeersDialedSuccessfullyAfterHolepunchConnect int
+	NumPeersProbablyOnlyConnectedDueToHolepunch     int
+}
+
+func (cl *Client) statsLocked() (stats ClientStats) {
+	stats.ConnStats = cl.connStats.Copy()
+	stats.ActiveHalfOpenAttempts = cl.numHalfOpen
+
+	stats.NumPeersUndialableWithoutHolepunch = len(cl.undialableWithoutHolepunch)
+	stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect = len(cl.undialableWithoutHolepunchDialedAfterHolepunchConnect)
+	stats.NumPeersDialableOnlyAfterHolepunch = len(cl.dialableOnlyAfterHolepunch)
+	stats.NumPeersDialedSuccessfullyAfterHolepunchConnect = len(cl.dialedSuccessfullyAfterHolepunchConnect)
+	stats.NumPeersProbablyOnlyConnectedDueToHolepunch = len(cl.probablyOnlyConnectedDueToHolepunch)
+
+	return
 }
diff --git a/client.go b/client.go
index 95c21aab..6a82774e 100644
--- a/client.go
+++ b/client.go
@@ -91,9 +91,7 @@ type Client struct {
 	activeAnnounceLimiter limiter.Instance
 	httpClient            *http.Client
 
-	undialableWithoutHolepunch                                   map[netip.AddrPort]struct{}
-	undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect map[netip.AddrPort]struct{}
-	dialableOnlyAfterHolepunch                                   map[netip.AddrPort]struct{}
+	clientHolepunchAddrSets
 }
 
 type ipStr string
@@ -520,6 +518,20 @@ func (cl *Client) acceptConnections(l Listener) {
 			log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger)
 			continue
 		}
+		{
+			holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr())
+			if holepunchErr == nil {
+				cl.lock()
+				if g.MapContains(
+					cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
+					holepunchAddr,
+				) {
+					g.MakeMapIfNil(&cl.probablyOnlyConnectedDueToHolepunch)
+					g.MapInsert(cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr, struct{}{})
+				}
+				cl.unlock()
+			}
+		}
 		go func() {
 			if reject != nil {
 				torrent.Add("rejected accepted connections", 1)
@@ -738,12 +750,16 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn,
 		}
 	}
 	holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr)
-	if holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) && opts.receivedHolepunchConnect {
-		g.MakeMapIfNilAndSet(
-			&cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect,
-			holepunchAddr,
-			struct{}{},
-		)
+	if holepunchAddrErr == nil && opts.receivedHolepunchConnect {
+		cl.lock()
+		if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
+			g.MakeMapIfNilAndSet(
+				&cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
+				holepunchAddr,
+				struct{}{},
+			)
+		}
+		cl.unlock()
 	}
 	headerObfuscationPolicy := opts.HeaderObfuscationPolicy
 	obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred
@@ -751,18 +767,24 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn,
 	if firstDialResult.Conn == nil {
 		// No dialers worked. Try to initiate a holepunching rendezvous.
 		if holepunchAddrErr == nil {
+			cl.lock()
 			if !opts.receivedHolepunchConnect {
-				cl.lock()
 				g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
-				cl.unlock()
 			}
 			opts.t.startHolepunchRendezvous(holepunchAddr)
+			cl.unlock()
 		}
 		err = fmt.Errorf("all initial dials failed")
 		return
 	}
-	if opts.receivedHolepunchConnect && holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
-		g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
+	if opts.receivedHolepunchConnect && holepunchAddrErr == nil {
+		cl.lock()
+		if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
+			g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
+		}
+		g.MakeMapIfNil(&cl.dialedSuccessfullyAfterHolepunchConnect)
+		g.MapInsert(cl.dialedSuccessfullyAfterHolepunchConnect, holepunchAddr, struct{}{})
+		cl.unlock()
 	}
 	c, err = doProtocolHandshakeOnDialResult(
 		opts.t,
@@ -1746,13 +1768,3 @@ func (cl *Client) Stats() ClientStats {
 	defer cl.rUnlock()
 	return cl.statsLocked()
 }
-
-func (cl *Client) statsLocked() (stats ClientStats) {
-	stats.ConnStats = cl.connStats.Copy()
-	stats.ActiveHalfOpenAttempts = cl.numHalfOpen
-	stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect =
-		len(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect)
-	stats.NumPeersDialableOnlyAfterHolepunch =
-		len(cl.dialableOnlyAfterHolepunch)
-	return
-}
diff --git a/torrent.go b/torrent.go
index eeac172d..ad89b453 100644
--- a/torrent.go
+++ b/torrent.go
@@ -2859,11 +2859,3 @@ func (t *Torrent) getDialTimeoutUnlocked() time.Duration {
 	defer cl.rUnlock()
 	return t.dialTimeout()
 }
-
-func (t *Torrent) startHolepunchRendezvousForPeerRemoteAddr(addr PeerRemoteAddr) error {
-	addrPort, err := addrPortFromPeerRemoteAddr(addr)
-	if err != nil {
-		return err
-	}
-	return t.startHolepunchRendezvous(addrPort)
-}
diff --git a/ut-holepunching_test.go b/ut-holepunching_test.go
index 93ff3a4e..29be6d1e 100644
--- a/ut-holepunching_test.go
+++ b/ut-holepunching_test.go
@@ -1,22 +1,31 @@
 package torrent
 
 import (
+	"context"
+	"errors"
+	"fmt"
+	"io"
+	"math/rand"
+	"net"
 	"os"
 	"sync"
 	"testing"
 	"testing/iotest"
+	"time"
 
 	"github.com/anacrolix/log"
-
-	"github.com/anacrolix/torrent/internal/testutil"
+	"github.com/anacrolix/missinggo/v2/iter"
 	qt "github.com/frankban/quicktest"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
+
+	"github.com/anacrolix/torrent/internal/testutil"
 )
 
 // Check that after completing leeching, a leecher transitions to a seeding
 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
 func TestHolepunchConnect(t *testing.T) {
+	c := qt.New(t)
 	greetingTempDir, mi := testutil.GreetingTestTorrent()
 	defer os.RemoveAll(greetingTempDir)
 
@@ -27,7 +36,9 @@ func TestHolepunchConnect(t *testing.T) {
 	cfg.DisablePEX = true
 	cfg.Debug = true
 	cfg.AcceptPeerConnections = false
-	//cfg.DisableUTP = true
+	// Listening, even without accepting, still means the leecher-leecher completes the dial to the seeder, and so it
+	// won't attempt to holepunch.
+	cfg.DisableTCP = true
 	seeder, err := NewClient(cfg)
 	require.NoError(t, err)
 	defer seeder.Close()
@@ -105,13 +116,13 @@ func TestHolepunchConnect(t *testing.T) {
 	llg.cl.unlock()
 	wg.Wait()
 
+	c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
+	c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
+
 	llClientStats := leecherLeecher.Stats()
-	c.Check(llClientStats.NumPeersDialableOnlyAfterHolepunch, qt.Not(qt.Equals), 0)
-	c.Check(
-		llClientStats.NumPeersDialableOnlyAfterHolepunch,
-		qt.Equals,
-		llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect,
-	)
+	c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
+	c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
+	c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
 }
 
 func waitForConns(t *Torrent) {
@@ -125,11 +136,218 @@ func waitForConns(t *Torrent) {
 	}
 }
 
+// Show that dialling TCP will complete before the other side accepts.
 func TestDialTcpNotAccepting(t *testing.T) {
 	l, err := net.Listen("tcp", "localhost:0")
 	c := qt.New(t)
 	c.Check(err, qt.IsNil)
 	defer l.Close()
-	_, err = net.Dial("tcp", l.Addr().String())
-	c.Assert(err, qt.IsNotNil)
+	dialedConn, err := net.Dial("tcp", l.Addr().String())
+	c.Assert(err, qt.IsNil)
+	dialedConn.Close()
+}
+
+func TestTcpSimultaneousOpen(t *testing.T) {
+	const network = "tcp"
+	ctx := context.Background()
+	makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
+		dialer := net.Dialer{
+			LocalAddr: &net.TCPAddr{
+				Port: localPort,
+			},
+		}
+		return func() (net.Conn, error) {
+			return dialer.DialContext(ctx, network, remoteAddr)
+		}
+	}
+	c := qt.New(t)
+	first, second := randPortPair()
+	t.Logf("ports are %v and %v", first, second)
+	err := testSimultaneousOpen(
+		c.Cleanup,
+		makeDialer(first, fmt.Sprintf("localhost:%d", second)),
+		makeDialer(second, fmt.Sprintf("localhost:%d", first)),
+	)
+	c.Assert(err, qt.IsNil)
+}
+
+func randIntInRange(low, high int) int {
+	return rand.Intn(high-low+1) + low
+}
+
+func randDynamicPort() int {
+	return randIntInRange(49152, 65535)
+}
+
+func randPortPair() (first int, second int) {
+	first = randDynamicPort()
+	for {
+		second = randDynamicPort()
+		if second != first {
+			return
+		}
+	}
+}
+
+func writeMsg(conn net.Conn) {
+	conn.Write([]byte(defaultMsg))
+	// Writing must be closed so the reader will get EOF and stop reading.
+	conn.Close()
+}
+
+func readMsg(conn net.Conn) error {
+	msgBytes, err := io.ReadAll(conn)
+	if err != nil {
+		return err
+	}
+	msgStr := string(msgBytes)
+	if msgStr != defaultMsg {
+		return fmt.Errorf("read %q", msgStr)
+	}
+	return nil
+}
+
+var errMsgNotReceived = errors.New("msg not received in time")
+
+// Runs two dialers simultaneously, then sends a message on one connection and check it reads from
+// the other, thereby showing that both dials obtained endpoints to the same connection.
+func testSimultaneousOpen(
+	cleanup func(func()),
+	firstDialer, secondDialer func() (net.Conn, error),
+) error {
+	errs := make(chan error)
+	var dialsDone sync.WaitGroup
+	const numDials = 2
+	dialsDone.Add(numDials)
+	signal := make(chan struct{})
+	var dialersDone sync.WaitGroup
+	dialersDone.Add(numDials)
+	doDial := func(
+		dialer func() (net.Conn, error),
+		onSignal func(net.Conn),
+	) {
+		defer dialersDone.Done()
+		conn, err := dialer()
+		dialsDone.Done()
+		errs <- err
+		if err != nil {
+			return
+		}
+		cleanup(func() {
+			conn.Close()
+		})
+		<-signal
+		onSignal(conn)
+		//if err == nil {
+		//	conn.Close()
+		//}
+	}
+	go doDial(
+		firstDialer,
+		func(conn net.Conn) {
+			writeMsg(conn)
+			errs <- nil
+		},
+	)
+	go doDial(
+		secondDialer,
+		func(conn net.Conn) {
+			gotMsg := make(chan error, 1)
+			go func() {
+				gotMsg <- readMsg(conn)
+			}()
+			select {
+			case err := <-gotMsg:
+				errs <- err
+			case <-time.After(time.Second):
+				errs <- errMsgNotReceived
+			}
+		},
+	)
+	dialsDone.Wait()
+	for range iter.N(numDials) {
+		err := <-errs
+		if err != nil {
+			return err
+		}
+	}
+	close(signal)
+	for range iter.N(numDials) {
+		err := <-errs
+		if err != nil {
+			return err
+		}
+	}
+	dialersDone.Wait()
+	return nil
+}
+
+const defaultMsg = "hello"
+
+// Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
+// get separate connections. This means that holepunch connect may result in an accept (and dial)
+// for one or both peers involved.
+func TestUtpSimultaneousOpen(t *testing.T) {
+	c := qt.New(t)
+	const network = "udp"
+	ctx := context.Background()
+	newUtpSocket := func(addr string) utpSocket {
+		socket, err := NewUtpSocket(
+			network,
+			addr,
+			func(net.Addr) bool {
+				return false
+			},
+			log.Default,
+		)
+		c.Assert(err, qt.IsNil)
+		return socket
+	}
+	first := newUtpSocket("localhost:3000")
+	defer first.Close()
+	second := newUtpSocket("localhost:3001")
+	defer second.Close()
+	getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
+		return func() (net.Conn, error) {
+			return sock.DialContext(ctx, network, addr)
+		}
+	}
+	err := testSimultaneousOpen(
+		c.Cleanup,
+		getDial(first, "localhost:3001"),
+		getDial(second, "localhost:3000"),
+	)
+	c.Assert(err, qt.ErrorIs, errMsgNotReceived)
+}
+
+func testDirectDialMsg(c *qt.C, r, w net.Conn) {
+	go writeMsg(w)
+	err := readMsg(r)
+	c.Assert(err, qt.IsNil)
+}
+
+// Show that dialling one socket and accepting from the other results in them having ends of the
+// same connection.
+func TestUtpDirectDialMsg(t *testing.T) {
+	c := qt.New(t)
+	const network = "udp"
+	ctx := context.Background()
+	newUtpSocket := func(addr string) utpSocket {
+		socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
+			return false
+		}, log.Default)
+		c.Assert(err, qt.IsNil)
+		return socket
+	}
+	first := newUtpSocket("localhost:0")
+	defer first.Close()
+	second := newUtpSocket("localhost:0")
+	defer second.Close()
+	writer, err := first.DialContext(ctx, network, second.Addr().String())
+	c.Assert(err, qt.IsNil)
+	defer writer.Close()
+	reader, err := second.Accept()
+	defer reader.Close()
+	c.Assert(err, qt.IsNil)
+	testDirectDialMsg(c, reader, writer)
 }