]> Sergey Matveev's repositories - btrtrc.git/blobdiff - ut-holepunching_test.go
Drop support for go 1.20
[btrtrc.git] / ut-holepunching_test.go
index 93ff3a4ea58f53aa95b5e37d762f7cbe0b7ab381..ef7cda6ba7770455c22576466e0083cc7cf81c6d 100644 (file)
@@ -1,22 +1,32 @@
 package torrent
 
 import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "math/rand"
+       "net"
        "os"
        "sync"
        "testing"
        "testing/iotest"
+       "time"
 
        "github.com/anacrolix/log"
-
-       "github.com/anacrolix/torrent/internal/testutil"
+       "github.com/anacrolix/missinggo/v2/iter"
        qt "github.com/frankban/quicktest"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
+       "golang.org/x/time/rate"
+
+       "github.com/anacrolix/torrent/internal/testutil"
 )
 
 // Check that after completing leeching, a leecher transitions to a seeding
 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
 func TestHolepunchConnect(t *testing.T) {
+       c := qt.New(t)
        greetingTempDir, mi := testutil.GreetingTestTorrent()
        defer os.RemoveAll(greetingTempDir)
 
@@ -27,7 +37,13 @@ func TestHolepunchConnect(t *testing.T) {
        cfg.DisablePEX = true
        cfg.Debug = true
        cfg.AcceptPeerConnections = false
-       //cfg.DisableUTP = true
+       // Listening, even without accepting, still means the leecher-leecher completes the dial to the
+       // seeder, and so it won't attempt to holepunch.
+       cfg.DisableTCP = true
+       // Ensure that responding to holepunch connects don't wait around for the dial limit. We also
+       // have to allow the initial connection to the leecher though, so it can rendezvous for us.
+       cfg.DialRateLimiter = rate.NewLimiter(0, 1)
+       cfg.Logger = cfg.Logger.WithContextText("seeder")
        seeder, err := NewClient(cfg)
        require.NoError(t, err)
        defer seeder.Close()
@@ -41,10 +57,11 @@ func TestHolepunchConnect(t *testing.T) {
        cfg.Seed = true
        cfg.DataDir = t.TempDir()
        cfg.AlwaysWantConns = true
+       cfg.Logger = cfg.Logger.WithContextText("leecher")
        // This way the leecher leecher will still try to use this peer as a relay, but won't be told
        // about the seeder via PEX.
        //cfg.DisablePEX = true
-       //cfg.Debug = true
+       cfg.Debug = true
        leecher, err := NewClient(cfg)
        require.NoError(t, err)
        defer leecher.Close()
@@ -56,6 +73,7 @@ func TestHolepunchConnect(t *testing.T) {
        cfg.MaxAllocPeerRequestDataPerConn = 4
        cfg.Debug = true
        cfg.NominalDialTimeout = time.Second
+       cfg.Logger = cfg.Logger.WithContextText("leecher-leecher")
        //cfg.DisableUTP = true
        leecherLeecher, _ := NewClient(cfg)
        require.NoError(t, err)
@@ -105,13 +123,13 @@ func TestHolepunchConnect(t *testing.T) {
        llg.cl.unlock()
        wg.Wait()
 
+       c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
+       c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
+
        llClientStats := leecherLeecher.Stats()
-       c.Check(llClientStats.NumPeersDialableOnlyAfterHolepunch, qt.Not(qt.Equals), 0)
-       c.Check(
-               llClientStats.NumPeersDialableOnlyAfterHolepunch,
-               qt.Equals,
-               llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect,
-       )
+       c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
+       c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
+       c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
 }
 
 func waitForConns(t *Torrent) {
@@ -125,11 +143,265 @@ func waitForConns(t *Torrent) {
        }
 }
 
+// Show that dialling TCP will complete before the other side accepts.
 func TestDialTcpNotAccepting(t *testing.T) {
        l, err := net.Listen("tcp", "localhost:0")
        c := qt.New(t)
        c.Check(err, qt.IsNil)
        defer l.Close()
-       _, err = net.Dial("tcp", l.Addr().String())
-       c.Assert(err, qt.IsNotNil)
+       dialedConn, err := net.Dial("tcp", l.Addr().String())
+       c.Assert(err, qt.IsNil)
+       dialedConn.Close()
+}
+
+func TestTcpSimultaneousOpen(t *testing.T) {
+       const network = "tcp"
+       ctx := context.Background()
+       makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
+               dialer := net.Dialer{
+                       LocalAddr: &net.TCPAddr{
+                               //IP:   net.IPv6loopback,
+                               Port: localPort,
+                       },
+               }
+               return func() (net.Conn, error) {
+                       return dialer.DialContext(ctx, network, remoteAddr)
+               }
+       }
+       c := qt.New(t)
+       // I really hate doing this in unit tests, but we would need to pick apart Dialer to get
+       // perfectly synchronized simultaneous dials.
+       for range iter.N(10) {
+               first, second := randPortPair()
+               t.Logf("ports are %v and %v", first, second)
+               err := testSimultaneousOpen(
+                       c.Cleanup,
+                       makeDialer(first, fmt.Sprintf("localhost:%d", second)),
+                       makeDialer(second, fmt.Sprintf("localhost:%d", first)),
+               )
+               if err == nil {
+                       return
+               }
+               // This proves that the connections are not the same.
+               if errors.Is(err, errMsgNotReceived) {
+                       t.Fatal(err)
+               }
+               // Could be a timing issue, so try again.
+               t.Log(err)
+       }
+       // If we weren't able to get a simultaneous dial to occur, then we can't call it a failure.
+       t.Skip("couldn't synchronize dials")
+}
+
+func randIntInRange(low, high int) int {
+       return rand.Intn(high-low+1) + low
+}
+
+func randDynamicPort() int {
+       return randIntInRange(49152, 65535)
+}
+
+func randPortPair() (first int, second int) {
+       first = randDynamicPort()
+       for {
+               second = randDynamicPort()
+               if second != first {
+                       return
+               }
+       }
+}
+
+func writeMsg(conn net.Conn) {
+       conn.Write([]byte(defaultMsg))
+       // Writing must be closed so the reader will get EOF and stop reading.
+       conn.Close()
+}
+
+func readMsg(conn net.Conn) error {
+       msgBytes, err := io.ReadAll(conn)
+       if err != nil {
+               return err
+       }
+       msgStr := string(msgBytes)
+       if msgStr != defaultMsg {
+               return fmt.Errorf("read %q", msgStr)
+       }
+       return nil
+}
+
+var errMsgNotReceived = errors.New("msg not received in time")
+
+// Runs two dialers simultaneously, then sends a message on one connection and check it reads from
+// the other, thereby showing that both dials obtained endpoints to the same connection.
+func testSimultaneousOpen(
+       cleanup func(func()),
+       firstDialer, secondDialer func() (net.Conn, error),
+) error {
+       errs := make(chan error)
+       var dialsDone sync.WaitGroup
+       const numDials = 2
+       dialsDone.Add(numDials)
+       signal := make(chan struct{})
+       var dialersDone sync.WaitGroup
+       dialersDone.Add(numDials)
+       doDial := func(
+               dialer func() (net.Conn, error),
+               onSignal func(net.Conn),
+       ) {
+               defer dialersDone.Done()
+               conn, err := dialer()
+               dialsDone.Done()
+               errs <- err
+               if err != nil {
+                       return
+               }
+               cleanup(func() {
+                       conn.Close()
+               })
+               <-signal
+               onSignal(conn)
+               //if err == nil {
+               //      conn.Close()
+               //}
+       }
+       go doDial(
+               firstDialer,
+               func(conn net.Conn) {
+                       writeMsg(conn)
+                       errs <- nil
+               },
+       )
+       go doDial(
+               secondDialer,
+               func(conn net.Conn) {
+                       gotMsg := make(chan error, 1)
+                       go func() {
+                               gotMsg <- readMsg(conn)
+                       }()
+                       select {
+                       case err := <-gotMsg:
+                               errs <- err
+                       case <-time.After(time.Second):
+                               errs <- errMsgNotReceived
+                       }
+               },
+       )
+       dialsDone.Wait()
+       for range iter.N(numDials) {
+               err := <-errs
+               if err != nil {
+                       return err
+               }
+       }
+       close(signal)
+       for range iter.N(numDials) {
+               err := <-errs
+               if err != nil {
+                       return err
+               }
+       }
+       dialersDone.Wait()
+       return nil
+}
+
+const defaultMsg = "hello"
+
+// Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
+// get separate connections. This means that holepunch connect may result in an accept (and dial)
+// for one or both peers involved.
+func TestUtpSimultaneousOpen(t *testing.T) {
+       t.Parallel()
+       c := qt.New(t)
+       const network = "udp"
+       ctx := context.Background()
+       newUtpSocket := func(addr string) utpSocket {
+               socket, err := NewUtpSocket(
+                       network,
+                       addr,
+                       func(net.Addr) bool {
+                               return false
+                       },
+                       log.Default,
+               )
+               c.Assert(err, qt.IsNil)
+               return socket
+       }
+       first := newUtpSocket("localhost:0")
+       defer first.Close()
+       second := newUtpSocket("localhost:0")
+       defer second.Close()
+       getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
+               return func() (net.Conn, error) {
+                       return sock.DialContext(ctx, network, addr)
+               }
+       }
+       t.Logf("first addr is %v. second addr is %v", first.Addr().String(), second.Addr().String())
+       for range iter.N(10) {
+               err := testSimultaneousOpen(
+                       c.Cleanup,
+                       getDial(first, second.Addr().String()),
+                       getDial(second, first.Addr().String()),
+               )
+               if err == nil {
+                       t.Fatal("expected utp to fail simultaneous open")
+               }
+               if errors.Is(err, errMsgNotReceived) {
+                       return
+               }
+               skipGoUtpDialIssue(t, err)
+               t.Log(err)
+               time.Sleep(time.Second)
+       }
+       t.FailNow()
+}
+
+func writeAndReadMsg(r, w net.Conn) error {
+       go writeMsg(w)
+       return readMsg(r)
+}
+
+func skipGoUtpDialIssue(t *testing.T, err error) {
+       if err.Error() == "timed out waiting for ack" {
+               t.Skip("anacrolix go utp implementation has issues. Use anacrolix/go-libutp by enabling CGO.")
+       }
+}
+
+// Show that dialling one socket and accepting from the other results in them having ends of the
+// same connection.
+func TestUtpDirectDialMsg(t *testing.T) {
+       t.Parallel()
+       c := qt.New(t)
+       const network = "udp4"
+       ctx := context.Background()
+       newUtpSocket := func(addr string) utpSocket {
+               socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
+                       return false
+               }, log.Default)
+               c.Assert(err, qt.IsNil)
+               return socket
+       }
+       for range iter.N(10) {
+               err := func() error {
+                       first := newUtpSocket("localhost:0")
+                       defer first.Close()
+                       second := newUtpSocket("localhost:0")
+                       defer second.Close()
+                       writer, err := first.DialContext(ctx, network, second.Addr().String())
+                       if err != nil {
+                               return err
+                       }
+                       defer writer.Close()
+                       reader, err := second.Accept()
+                       defer reader.Close()
+                       c.Assert(err, qt.IsNil)
+                       return writeAndReadMsg(reader, writer)
+               }()
+               if err == nil {
+                       return
+               }
+               skipGoUtpDialIssue(t, err)
+               t.Log(err)
+               time.Sleep(time.Second)
+       }
+       t.FailNow()
 }