From: Matt Joiner Date: Fri, 12 May 2023 03:47:24 +0000 (+1000) Subject: Add holepunching stats and tests X-Git-Url: http://www.git.stargrave.org/?p=btrtrc.git;a=commitdiff_plain;h=0b6209062d730cbbda0bb66391089f5c2b0e8084 Add holepunching stats and tests --- diff --git a/client-stats.go b/client-stats.go index 3513f8f1..1a800970 100644 --- a/client-stats.go +++ b/client-stats.go @@ -1,5 +1,17 @@ package torrent +import ( + "net/netip" +) + +type clientHolepunchAddrSets struct { + undialableWithoutHolepunch map[netip.AddrPort]struct{} + undialableWithoutHolepunchDialedAfterHolepunchConnect map[netip.AddrPort]struct{} + dialableOnlyAfterHolepunch map[netip.AddrPort]struct{} + dialedSuccessfullyAfterHolepunchConnect map[netip.AddrPort]struct{} + probablyOnlyConnectedDueToHolepunch map[netip.AddrPort]struct{} +} + type ClientStats struct { ConnStats @@ -8,10 +20,26 @@ type ClientStats struct { // if a Torrent is dropped while there are outstanding dials. ActiveHalfOpenAttempts int + NumPeersUndialableWithoutHolepunch int // Number of unique peer addresses that were dialed after receiving a holepunch connect message, // that have previously been undialable without any hole-punching attempts. NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect int // Number of unique peer addresses that were successfully dialed and connected after a holepunch // connect message and previously failing to connect without holepunching. - NumPeersDialableOnlyAfterHolepunch int + NumPeersDialableOnlyAfterHolepunch int + NumPeersDialedSuccessfullyAfterHolepunchConnect int + NumPeersProbablyOnlyConnectedDueToHolepunch int +} + +func (cl *Client) statsLocked() (stats ClientStats) { + stats.ConnStats = cl.connStats.Copy() + stats.ActiveHalfOpenAttempts = cl.numHalfOpen + + stats.NumPeersUndialableWithoutHolepunch = len(cl.undialableWithoutHolepunch) + stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect = len(cl.undialableWithoutHolepunchDialedAfterHolepunchConnect) + stats.NumPeersDialableOnlyAfterHolepunch = len(cl.dialableOnlyAfterHolepunch) + stats.NumPeersDialedSuccessfullyAfterHolepunchConnect = len(cl.dialedSuccessfullyAfterHolepunchConnect) + stats.NumPeersProbablyOnlyConnectedDueToHolepunch = len(cl.probablyOnlyConnectedDueToHolepunch) + + return } diff --git a/client.go b/client.go index 95c21aab..6a82774e 100644 --- a/client.go +++ b/client.go @@ -91,9 +91,7 @@ type Client struct { activeAnnounceLimiter limiter.Instance httpClient *http.Client - undialableWithoutHolepunch map[netip.AddrPort]struct{} - undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect map[netip.AddrPort]struct{} - dialableOnlyAfterHolepunch map[netip.AddrPort]struct{} + clientHolepunchAddrSets } type ipStr string @@ -520,6 +518,20 @@ func (cl *Client) acceptConnections(l Listener) { log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger) continue } + { + holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr()) + if holepunchErr == nil { + cl.lock() + if g.MapContains( + cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, + holepunchAddr, + ) { + g.MakeMapIfNil(&cl.probablyOnlyConnectedDueToHolepunch) + g.MapInsert(cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr, struct{}{}) + } + cl.unlock() + } + } go func() { if reject != nil { torrent.Add("rejected accepted connections", 1) @@ -738,12 +750,16 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, } } holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr) - if holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) && opts.receivedHolepunchConnect { - g.MakeMapIfNilAndSet( - &cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect, - holepunchAddr, - struct{}{}, - ) + if holepunchAddrErr == nil && opts.receivedHolepunchConnect { + cl.lock() + if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) { + g.MakeMapIfNilAndSet( + &cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, + holepunchAddr, + struct{}{}, + ) + } + cl.unlock() } headerObfuscationPolicy := opts.HeaderObfuscationPolicy obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred @@ -751,18 +767,24 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, if firstDialResult.Conn == nil { // No dialers worked. Try to initiate a holepunching rendezvous. if holepunchAddrErr == nil { + cl.lock() if !opts.receivedHolepunchConnect { - cl.lock() g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{}) - cl.unlock() } opts.t.startHolepunchRendezvous(holepunchAddr) + cl.unlock() } err = fmt.Errorf("all initial dials failed") return } - if opts.receivedHolepunchConnect && holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) { - g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{}) + if opts.receivedHolepunchConnect && holepunchAddrErr == nil { + cl.lock() + if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) { + g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{}) + } + g.MakeMapIfNil(&cl.dialedSuccessfullyAfterHolepunchConnect) + g.MapInsert(cl.dialedSuccessfullyAfterHolepunchConnect, holepunchAddr, struct{}{}) + cl.unlock() } c, err = doProtocolHandshakeOnDialResult( opts.t, @@ -1746,13 +1768,3 @@ func (cl *Client) Stats() ClientStats { defer cl.rUnlock() return cl.statsLocked() } - -func (cl *Client) statsLocked() (stats ClientStats) { - stats.ConnStats = cl.connStats.Copy() - stats.ActiveHalfOpenAttempts = cl.numHalfOpen - stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect = - len(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect) - stats.NumPeersDialableOnlyAfterHolepunch = - len(cl.dialableOnlyAfterHolepunch) - return -} diff --git a/torrent.go b/torrent.go index eeac172d..ad89b453 100644 --- a/torrent.go +++ b/torrent.go @@ -2859,11 +2859,3 @@ func (t *Torrent) getDialTimeoutUnlocked() time.Duration { defer cl.rUnlock() return t.dialTimeout() } - -func (t *Torrent) startHolepunchRendezvousForPeerRemoteAddr(addr PeerRemoteAddr) error { - addrPort, err := addrPortFromPeerRemoteAddr(addr) - if err != nil { - return err - } - return t.startHolepunchRendezvous(addrPort) -} diff --git a/ut-holepunching_test.go b/ut-holepunching_test.go index 93ff3a4e..29be6d1e 100644 --- a/ut-holepunching_test.go +++ b/ut-holepunching_test.go @@ -1,22 +1,31 @@ package torrent import ( + "context" + "errors" + "fmt" + "io" + "math/rand" + "net" "os" "sync" "testing" "testing/iotest" + "time" "github.com/anacrolix/log" - - "github.com/anacrolix/torrent/internal/testutil" + "github.com/anacrolix/missinggo/v2/iter" qt "github.com/frankban/quicktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/anacrolix/torrent/internal/testutil" ) // Check that after completing leeching, a leecher transitions to a seeding // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher. func TestHolepunchConnect(t *testing.T) { + c := qt.New(t) greetingTempDir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(greetingTempDir) @@ -27,7 +36,9 @@ func TestHolepunchConnect(t *testing.T) { cfg.DisablePEX = true cfg.Debug = true cfg.AcceptPeerConnections = false - //cfg.DisableUTP = true + // Listening, even without accepting, still means the leecher-leecher completes the dial to the seeder, and so it + // won't attempt to holepunch. + cfg.DisableTCP = true seeder, err := NewClient(cfg) require.NoError(t, err) defer seeder.Close() @@ -105,13 +116,13 @@ func TestHolepunchConnect(t *testing.T) { llg.cl.unlock() wg.Wait() + c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0) + c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0) + llClientStats := leecherLeecher.Stats() - c.Check(llClientStats.NumPeersDialableOnlyAfterHolepunch, qt.Not(qt.Equals), 0) - c.Check( - llClientStats.NumPeersDialableOnlyAfterHolepunch, - qt.Equals, - llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, - ) + c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0) + c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0) + c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0) } func waitForConns(t *Torrent) { @@ -125,11 +136,218 @@ func waitForConns(t *Torrent) { } } +// Show that dialling TCP will complete before the other side accepts. func TestDialTcpNotAccepting(t *testing.T) { l, err := net.Listen("tcp", "localhost:0") c := qt.New(t) c.Check(err, qt.IsNil) defer l.Close() - _, err = net.Dial("tcp", l.Addr().String()) - c.Assert(err, qt.IsNotNil) + dialedConn, err := net.Dial("tcp", l.Addr().String()) + c.Assert(err, qt.IsNil) + dialedConn.Close() +} + +func TestTcpSimultaneousOpen(t *testing.T) { + const network = "tcp" + ctx := context.Background() + makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) { + dialer := net.Dialer{ + LocalAddr: &net.TCPAddr{ + Port: localPort, + }, + } + return func() (net.Conn, error) { + return dialer.DialContext(ctx, network, remoteAddr) + } + } + c := qt.New(t) + first, second := randPortPair() + t.Logf("ports are %v and %v", first, second) + err := testSimultaneousOpen( + c.Cleanup, + makeDialer(first, fmt.Sprintf("localhost:%d", second)), + makeDialer(second, fmt.Sprintf("localhost:%d", first)), + ) + c.Assert(err, qt.IsNil) +} + +func randIntInRange(low, high int) int { + return rand.Intn(high-low+1) + low +} + +func randDynamicPort() int { + return randIntInRange(49152, 65535) +} + +func randPortPair() (first int, second int) { + first = randDynamicPort() + for { + second = randDynamicPort() + if second != first { + return + } + } +} + +func writeMsg(conn net.Conn) { + conn.Write([]byte(defaultMsg)) + // Writing must be closed so the reader will get EOF and stop reading. + conn.Close() +} + +func readMsg(conn net.Conn) error { + msgBytes, err := io.ReadAll(conn) + if err != nil { + return err + } + msgStr := string(msgBytes) + if msgStr != defaultMsg { + return fmt.Errorf("read %q", msgStr) + } + return nil +} + +var errMsgNotReceived = errors.New("msg not received in time") + +// Runs two dialers simultaneously, then sends a message on one connection and check it reads from +// the other, thereby showing that both dials obtained endpoints to the same connection. +func testSimultaneousOpen( + cleanup func(func()), + firstDialer, secondDialer func() (net.Conn, error), +) error { + errs := make(chan error) + var dialsDone sync.WaitGroup + const numDials = 2 + dialsDone.Add(numDials) + signal := make(chan struct{}) + var dialersDone sync.WaitGroup + dialersDone.Add(numDials) + doDial := func( + dialer func() (net.Conn, error), + onSignal func(net.Conn), + ) { + defer dialersDone.Done() + conn, err := dialer() + dialsDone.Done() + errs <- err + if err != nil { + return + } + cleanup(func() { + conn.Close() + }) + <-signal + onSignal(conn) + //if err == nil { + // conn.Close() + //} + } + go doDial( + firstDialer, + func(conn net.Conn) { + writeMsg(conn) + errs <- nil + }, + ) + go doDial( + secondDialer, + func(conn net.Conn) { + gotMsg := make(chan error, 1) + go func() { + gotMsg <- readMsg(conn) + }() + select { + case err := <-gotMsg: + errs <- err + case <-time.After(time.Second): + errs <- errMsgNotReceived + } + }, + ) + dialsDone.Wait() + for range iter.N(numDials) { + err := <-errs + if err != nil { + return err + } + } + close(signal) + for range iter.N(numDials) { + err := <-errs + if err != nil { + return err + } + } + dialersDone.Wait() + return nil +} + +const defaultMsg = "hello" + +// Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both +// get separate connections. This means that holepunch connect may result in an accept (and dial) +// for one or both peers involved. +func TestUtpSimultaneousOpen(t *testing.T) { + c := qt.New(t) + const network = "udp" + ctx := context.Background() + newUtpSocket := func(addr string) utpSocket { + socket, err := NewUtpSocket( + network, + addr, + func(net.Addr) bool { + return false + }, + log.Default, + ) + c.Assert(err, qt.IsNil) + return socket + } + first := newUtpSocket("localhost:3000") + defer first.Close() + second := newUtpSocket("localhost:3001") + defer second.Close() + getDial := func(sock utpSocket, addr string) func() (net.Conn, error) { + return func() (net.Conn, error) { + return sock.DialContext(ctx, network, addr) + } + } + err := testSimultaneousOpen( + c.Cleanup, + getDial(first, "localhost:3001"), + getDial(second, "localhost:3000"), + ) + c.Assert(err, qt.ErrorIs, errMsgNotReceived) +} + +func testDirectDialMsg(c *qt.C, r, w net.Conn) { + go writeMsg(w) + err := readMsg(r) + c.Assert(err, qt.IsNil) +} + +// Show that dialling one socket and accepting from the other results in them having ends of the +// same connection. +func TestUtpDirectDialMsg(t *testing.T) { + c := qt.New(t) + const network = "udp" + ctx := context.Background() + newUtpSocket := func(addr string) utpSocket { + socket, err := NewUtpSocket(network, addr, func(net.Addr) bool { + return false + }, log.Default) + c.Assert(err, qt.IsNil) + return socket + } + first := newUtpSocket("localhost:0") + defer first.Close() + second := newUtpSocket("localhost:0") + defer second.Close() + writer, err := first.DialContext(ctx, network, second.Addr().String()) + c.Assert(err, qt.IsNil) + defer writer.Close() + reader, err := second.Accept() + defer reader.Close() + c.Assert(err, qt.IsNil) + testDirectDialMsg(c, reader, writer) }