]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add holepunching stats and tests
authorMatt Joiner <anacrolix@gmail.com>
Fri, 12 May 2023 03:47:24 +0000 (13:47 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 12 May 2023 03:47:24 +0000 (13:47 +1000)
client-stats.go
client.go
torrent.go
ut-holepunching_test.go

index 3513f8f1c250c98ffff3c0a4b33b0af5be8f737a..1a8009708fb3a3c893d60a4c542e009eab9957ed 100644 (file)
@@ -1,5 +1,17 @@
 package torrent
 
+import (
+       "net/netip"
+)
+
+type clientHolepunchAddrSets struct {
+       undialableWithoutHolepunch                            map[netip.AddrPort]struct{}
+       undialableWithoutHolepunchDialedAfterHolepunchConnect map[netip.AddrPort]struct{}
+       dialableOnlyAfterHolepunch                            map[netip.AddrPort]struct{}
+       dialedSuccessfullyAfterHolepunchConnect               map[netip.AddrPort]struct{}
+       probablyOnlyConnectedDueToHolepunch                   map[netip.AddrPort]struct{}
+}
+
 type ClientStats struct {
        ConnStats
 
@@ -8,10 +20,26 @@ type ClientStats struct {
        // if a Torrent is dropped while there are outstanding dials.
        ActiveHalfOpenAttempts int
 
+       NumPeersUndialableWithoutHolepunch int
        // Number of unique peer addresses that were dialed after receiving a holepunch connect message,
        // that have previously been undialable without any hole-punching attempts.
        NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect int
        // Number of unique peer addresses that were successfully dialed and connected after a holepunch
        // connect message and previously failing to connect without holepunching.
-       NumPeersDialableOnlyAfterHolepunch int
+       NumPeersDialableOnlyAfterHolepunch              int
+       NumPeersDialedSuccessfullyAfterHolepunchConnect int
+       NumPeersProbablyOnlyConnectedDueToHolepunch     int
+}
+
+func (cl *Client) statsLocked() (stats ClientStats) {
+       stats.ConnStats = cl.connStats.Copy()
+       stats.ActiveHalfOpenAttempts = cl.numHalfOpen
+
+       stats.NumPeersUndialableWithoutHolepunch = len(cl.undialableWithoutHolepunch)
+       stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect = len(cl.undialableWithoutHolepunchDialedAfterHolepunchConnect)
+       stats.NumPeersDialableOnlyAfterHolepunch = len(cl.dialableOnlyAfterHolepunch)
+       stats.NumPeersDialedSuccessfullyAfterHolepunchConnect = len(cl.dialedSuccessfullyAfterHolepunchConnect)
+       stats.NumPeersProbablyOnlyConnectedDueToHolepunch = len(cl.probablyOnlyConnectedDueToHolepunch)
+
+       return
 }
index 95c21aabb7576333b80cdbd9b4d3a2cef71b0460..6a82774e7a0d5cf8f0fa4b265161de1076a40609 100644 (file)
--- a/client.go
+++ b/client.go
@@ -91,9 +91,7 @@ type Client struct {
        activeAnnounceLimiter limiter.Instance
        httpClient            *http.Client
 
-       undialableWithoutHolepunch                                   map[netip.AddrPort]struct{}
-       undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect map[netip.AddrPort]struct{}
-       dialableOnlyAfterHolepunch                                   map[netip.AddrPort]struct{}
+       clientHolepunchAddrSets
 }
 
 type ipStr string
@@ -520,6 +518,20 @@ func (cl *Client) acceptConnections(l Listener) {
                        log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger)
                        continue
                }
+               {
+                       holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr())
+                       if holepunchErr == nil {
+                               cl.lock()
+                               if g.MapContains(
+                                       cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
+                                       holepunchAddr,
+                               ) {
+                                       g.MakeMapIfNil(&cl.probablyOnlyConnectedDueToHolepunch)
+                                       g.MapInsert(cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr, struct{}{})
+                               }
+                               cl.unlock()
+                       }
+               }
                go func() {
                        if reject != nil {
                                torrent.Add("rejected accepted connections", 1)
@@ -738,12 +750,16 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn,
                }
        }
        holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr)
-       if holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) && opts.receivedHolepunchConnect {
-               g.MakeMapIfNilAndSet(
-                       &cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect,
-                       holepunchAddr,
-                       struct{}{},
-               )
+       if holepunchAddrErr == nil && opts.receivedHolepunchConnect {
+               cl.lock()
+               if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
+                       g.MakeMapIfNilAndSet(
+                               &cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
+                               holepunchAddr,
+                               struct{}{},
+                       )
+               }
+               cl.unlock()
        }
        headerObfuscationPolicy := opts.HeaderObfuscationPolicy
        obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred
@@ -751,18 +767,24 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn,
        if firstDialResult.Conn == nil {
                // No dialers worked. Try to initiate a holepunching rendezvous.
                if holepunchAddrErr == nil {
+                       cl.lock()
                        if !opts.receivedHolepunchConnect {
-                               cl.lock()
                                g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
-                               cl.unlock()
                        }
                        opts.t.startHolepunchRendezvous(holepunchAddr)
+                       cl.unlock()
                }
                err = fmt.Errorf("all initial dials failed")
                return
        }
-       if opts.receivedHolepunchConnect && holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
-               g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
+       if opts.receivedHolepunchConnect && holepunchAddrErr == nil {
+               cl.lock()
+               if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
+                       g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
+               }
+               g.MakeMapIfNil(&cl.dialedSuccessfullyAfterHolepunchConnect)
+               g.MapInsert(cl.dialedSuccessfullyAfterHolepunchConnect, holepunchAddr, struct{}{})
+               cl.unlock()
        }
        c, err = doProtocolHandshakeOnDialResult(
                opts.t,
@@ -1746,13 +1768,3 @@ func (cl *Client) Stats() ClientStats {
        defer cl.rUnlock()
        return cl.statsLocked()
 }
-
-func (cl *Client) statsLocked() (stats ClientStats) {
-       stats.ConnStats = cl.connStats.Copy()
-       stats.ActiveHalfOpenAttempts = cl.numHalfOpen
-       stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect =
-               len(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect)
-       stats.NumPeersDialableOnlyAfterHolepunch =
-               len(cl.dialableOnlyAfterHolepunch)
-       return
-}
index eeac172d10641dc997a566f4980abea44f2d555c..ad89b45391cc9df92616888a369c67b09245b0e2 100644 (file)
@@ -2859,11 +2859,3 @@ func (t *Torrent) getDialTimeoutUnlocked() time.Duration {
        defer cl.rUnlock()
        return t.dialTimeout()
 }
-
-func (t *Torrent) startHolepunchRendezvousForPeerRemoteAddr(addr PeerRemoteAddr) error {
-       addrPort, err := addrPortFromPeerRemoteAddr(addr)
-       if err != nil {
-               return err
-       }
-       return t.startHolepunchRendezvous(addrPort)
-}
index 93ff3a4ea58f53aa95b5e37d762f7cbe0b7ab381..29be6d1e6bffc426eb02ff04210eb3a96ac830e6 100644 (file)
@@ -1,22 +1,31 @@
 package torrent
 
 import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "math/rand"
+       "net"
        "os"
        "sync"
        "testing"
        "testing/iotest"
+       "time"
 
        "github.com/anacrolix/log"
-
-       "github.com/anacrolix/torrent/internal/testutil"
+       "github.com/anacrolix/missinggo/v2/iter"
        qt "github.com/frankban/quicktest"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
+
+       "github.com/anacrolix/torrent/internal/testutil"
 )
 
 // Check that after completing leeching, a leecher transitions to a seeding
 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
 func TestHolepunchConnect(t *testing.T) {
+       c := qt.New(t)
        greetingTempDir, mi := testutil.GreetingTestTorrent()
        defer os.RemoveAll(greetingTempDir)
 
@@ -27,7 +36,9 @@ func TestHolepunchConnect(t *testing.T) {
        cfg.DisablePEX = true
        cfg.Debug = true
        cfg.AcceptPeerConnections = false
-       //cfg.DisableUTP = true
+       // Listening, even without accepting, still means the leecher-leecher completes the dial to the seeder, and so it
+       // won't attempt to holepunch.
+       cfg.DisableTCP = true
        seeder, err := NewClient(cfg)
        require.NoError(t, err)
        defer seeder.Close()
@@ -105,13 +116,13 @@ func TestHolepunchConnect(t *testing.T) {
        llg.cl.unlock()
        wg.Wait()
 
+       c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
+       c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
+
        llClientStats := leecherLeecher.Stats()
-       c.Check(llClientStats.NumPeersDialableOnlyAfterHolepunch, qt.Not(qt.Equals), 0)
-       c.Check(
-               llClientStats.NumPeersDialableOnlyAfterHolepunch,
-               qt.Equals,
-               llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect,
-       )
+       c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
+       c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
+       c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
 }
 
 func waitForConns(t *Torrent) {
@@ -125,11 +136,218 @@ func waitForConns(t *Torrent) {
        }
 }
 
+// Show that dialling TCP will complete before the other side accepts.
 func TestDialTcpNotAccepting(t *testing.T) {
        l, err := net.Listen("tcp", "localhost:0")
        c := qt.New(t)
        c.Check(err, qt.IsNil)
        defer l.Close()
-       _, err = net.Dial("tcp", l.Addr().String())
-       c.Assert(err, qt.IsNotNil)
+       dialedConn, err := net.Dial("tcp", l.Addr().String())
+       c.Assert(err, qt.IsNil)
+       dialedConn.Close()
+}
+
+func TestTcpSimultaneousOpen(t *testing.T) {
+       const network = "tcp"
+       ctx := context.Background()
+       makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
+               dialer := net.Dialer{
+                       LocalAddr: &net.TCPAddr{
+                               Port: localPort,
+                       },
+               }
+               return func() (net.Conn, error) {
+                       return dialer.DialContext(ctx, network, remoteAddr)
+               }
+       }
+       c := qt.New(t)
+       first, second := randPortPair()
+       t.Logf("ports are %v and %v", first, second)
+       err := testSimultaneousOpen(
+               c.Cleanup,
+               makeDialer(first, fmt.Sprintf("localhost:%d", second)),
+               makeDialer(second, fmt.Sprintf("localhost:%d", first)),
+       )
+       c.Assert(err, qt.IsNil)
+}
+
+func randIntInRange(low, high int) int {
+       return rand.Intn(high-low+1) + low
+}
+
+func randDynamicPort() int {
+       return randIntInRange(49152, 65535)
+}
+
+func randPortPair() (first int, second int) {
+       first = randDynamicPort()
+       for {
+               second = randDynamicPort()
+               if second != first {
+                       return
+               }
+       }
+}
+
+func writeMsg(conn net.Conn) {
+       conn.Write([]byte(defaultMsg))
+       // Writing must be closed so the reader will get EOF and stop reading.
+       conn.Close()
+}
+
+func readMsg(conn net.Conn) error {
+       msgBytes, err := io.ReadAll(conn)
+       if err != nil {
+               return err
+       }
+       msgStr := string(msgBytes)
+       if msgStr != defaultMsg {
+               return fmt.Errorf("read %q", msgStr)
+       }
+       return nil
+}
+
+var errMsgNotReceived = errors.New("msg not received in time")
+
+// Runs two dialers simultaneously, then sends a message on one connection and check it reads from
+// the other, thereby showing that both dials obtained endpoints to the same connection.
+func testSimultaneousOpen(
+       cleanup func(func()),
+       firstDialer, secondDialer func() (net.Conn, error),
+) error {
+       errs := make(chan error)
+       var dialsDone sync.WaitGroup
+       const numDials = 2
+       dialsDone.Add(numDials)
+       signal := make(chan struct{})
+       var dialersDone sync.WaitGroup
+       dialersDone.Add(numDials)
+       doDial := func(
+               dialer func() (net.Conn, error),
+               onSignal func(net.Conn),
+       ) {
+               defer dialersDone.Done()
+               conn, err := dialer()
+               dialsDone.Done()
+               errs <- err
+               if err != nil {
+                       return
+               }
+               cleanup(func() {
+                       conn.Close()
+               })
+               <-signal
+               onSignal(conn)
+               //if err == nil {
+               //      conn.Close()
+               //}
+       }
+       go doDial(
+               firstDialer,
+               func(conn net.Conn) {
+                       writeMsg(conn)
+                       errs <- nil
+               },
+       )
+       go doDial(
+               secondDialer,
+               func(conn net.Conn) {
+                       gotMsg := make(chan error, 1)
+                       go func() {
+                               gotMsg <- readMsg(conn)
+                       }()
+                       select {
+                       case err := <-gotMsg:
+                               errs <- err
+                       case <-time.After(time.Second):
+                               errs <- errMsgNotReceived
+                       }
+               },
+       )
+       dialsDone.Wait()
+       for range iter.N(numDials) {
+               err := <-errs
+               if err != nil {
+                       return err
+               }
+       }
+       close(signal)
+       for range iter.N(numDials) {
+               err := <-errs
+               if err != nil {
+                       return err
+               }
+       }
+       dialersDone.Wait()
+       return nil
+}
+
+const defaultMsg = "hello"
+
+// Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
+// get separate connections. This means that holepunch connect may result in an accept (and dial)
+// for one or both peers involved.
+func TestUtpSimultaneousOpen(t *testing.T) {
+       c := qt.New(t)
+       const network = "udp"
+       ctx := context.Background()
+       newUtpSocket := func(addr string) utpSocket {
+               socket, err := NewUtpSocket(
+                       network,
+                       addr,
+                       func(net.Addr) bool {
+                               return false
+                       },
+                       log.Default,
+               )
+               c.Assert(err, qt.IsNil)
+               return socket
+       }
+       first := newUtpSocket("localhost:3000")
+       defer first.Close()
+       second := newUtpSocket("localhost:3001")
+       defer second.Close()
+       getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
+               return func() (net.Conn, error) {
+                       return sock.DialContext(ctx, network, addr)
+               }
+       }
+       err := testSimultaneousOpen(
+               c.Cleanup,
+               getDial(first, "localhost:3001"),
+               getDial(second, "localhost:3000"),
+       )
+       c.Assert(err, qt.ErrorIs, errMsgNotReceived)
+}
+
+func testDirectDialMsg(c *qt.C, r, w net.Conn) {
+       go writeMsg(w)
+       err := readMsg(r)
+       c.Assert(err, qt.IsNil)
+}
+
+// Show that dialling one socket and accepting from the other results in them having ends of the
+// same connection.
+func TestUtpDirectDialMsg(t *testing.T) {
+       c := qt.New(t)
+       const network = "udp"
+       ctx := context.Background()
+       newUtpSocket := func(addr string) utpSocket {
+               socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
+                       return false
+               }, log.Default)
+               c.Assert(err, qt.IsNil)
+               return socket
+       }
+       first := newUtpSocket("localhost:0")
+       defer first.Close()
+       second := newUtpSocket("localhost:0")
+       defer second.Close()
+       writer, err := first.DialContext(ctx, network, second.Addr().String())
+       c.Assert(err, qt.IsNil)
+       defer writer.Close()
+       reader, err := second.Accept()
+       defer reader.Close()
+       c.Assert(err, qt.IsNil)
+       testDirectDialMsg(c, reader, writer)
 }