]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Dial TCP with the listener's local addr
authorMatt Joiner <anacrolix@gmail.com>
Mon, 1 May 2023 00:15:34 +0000 (10:15 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 9 May 2023 05:45:52 +0000 (15:45 +1000)
client.go
client_test.go
go.mod
socket.go
sockopts.go [new file with mode: 0644]
sockopts_unix.go [new file with mode: 0644]
sockopts_wasm.go [new file with mode: 0644]
sockopts_windows.go [new file with mode: 0644]

index 4fac27220f10c302ec2271e4580448a298522f4f..342025ac554d6220931d1224dcf18ecb2d211812 100644 (file)
--- a/client.go
+++ b/client.go
@@ -16,7 +16,6 @@ import (
        "net/netip"
        "sort"
        "strconv"
-       "strings"
        "time"
 
        "github.com/anacrolix/chansync"
@@ -646,7 +645,7 @@ func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResu
                        res = <-resCh
                }
        }()
-       // There are still incompleted dials.
+       // There are still uncompleted dials.
        go func() {
                for ; left > 0; left-- {
                        conn := (<-resCh).Conn
@@ -663,8 +662,12 @@ func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResu
 
 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
        c, err := s.Dial(ctx, addr)
+       if err != nil {
+               log.Levelf(log.Debug, "error dialing %q: %v", addr, err)
+       }
        // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
-       // it now in case we close the connection forthwith.
+       // it now in case we close the connection forthwith. Note this is also done in the TCP dialer
+       // code to increase the chance it's done.
        if tc, ok := c.(*net.TCPConn); ok {
                tc.SetLinger(0)
        }
@@ -672,10 +675,6 @@ func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
        return c
 }
 
-func forgettableDialError(err error) bool {
-       return strings.Contains(err.Error(), "no suitable address found")
-}
-
 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
        if _, ok := t.halfOpen[addr]; !ok {
                panic("invariant broken")
index 03851ae88d8712c4293545604ab97e7935f273e3..c86243956ab5e8a9b93184c5561490c890b7a52f 100644 (file)
@@ -254,6 +254,50 @@ func TestResponsive(t *testing.T) {
        assert.EqualValues(t, "d\n", string(b))
 }
 
+// TestResponsive was the first test to fail if uTP is disabled and TCP sockets dial from the
+// listening port.
+func TestResponsiveTcpOnly(t *testing.T) {
+       seederDataDir, mi := testutil.GreetingTestTorrent()
+       defer os.RemoveAll(seederDataDir)
+       cfg := TestingConfig(t)
+       cfg.DisableUTP = true
+       cfg.Seed = true
+       cfg.DataDir = seederDataDir
+       seeder, err := NewClient(cfg)
+       require.Nil(t, err)
+       defer seeder.Close()
+       seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
+       seederTorrent.VerifyData()
+       leecherDataDir := t.TempDir()
+       cfg = TestingConfig(t)
+       cfg.DataDir = leecherDataDir
+       leecher, err := NewClient(cfg)
+       require.Nil(t, err)
+       defer leecher.Close()
+       leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
+               ret = TorrentSpecFromMetaInfo(mi)
+               ret.ChunkSize = 2
+               return
+       }())
+       leecherTorrent.AddClientPeer(seeder)
+       reader := leecherTorrent.NewReader()
+       defer reader.Close()
+       reader.SetReadahead(0)
+       reader.SetResponsive()
+       b := make([]byte, 2)
+       _, err = reader.Seek(3, io.SeekStart)
+       require.NoError(t, err)
+       _, err = io.ReadFull(reader, b)
+       assert.Nil(t, err)
+       assert.EqualValues(t, "lo", string(b))
+       _, err = reader.Seek(11, io.SeekStart)
+       require.NoError(t, err)
+       n, err := io.ReadFull(reader, b)
+       assert.Nil(t, err)
+       assert.EqualValues(t, 2, n)
+       assert.EqualValues(t, "d\n", string(b))
+}
+
 func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
        seederDataDir, mi := testutil.GreetingTestTorrent()
        defer os.RemoveAll(seederDataDir)
@@ -749,6 +793,7 @@ func TestClientAddressInUse(t *testing.T) {
                defer s.Close()
        }
        cfg := TestingConfig(t).SetListenAddr(":50007")
+       cfg.DisableUTP = false
        cl, err := NewClient(cfg)
        require.Error(t, err)
        require.Nil(t, cl)
diff --git a/go.mod b/go.mod
index 9ea01b969420d297be873fe12aee712285d1078f..d7bcf96fcf2e6bbd0021e8bc6df5474bd97d370a 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -49,7 +49,7 @@ require (
        go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.8.0
        go.opentelemetry.io/otel/sdk v1.8.0
        go.opentelemetry.io/otel/trace v1.8.0
-       golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d
+       golang.org/x/sys v0.5.0
        golang.org/x/time v0.0.0-20220609170525-579cf78fd858
 )
 
@@ -101,7 +101,6 @@ require (
        golang.org/x/crypto v0.5.0 // indirect
        golang.org/x/net v0.7.0 // indirect
        golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
-       golang.org/x/sys v0.5.0 // indirect
        golang.org/x/text v0.7.0 // indirect
        golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
        google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect
index 127cd29afecb1cecd9f9ffe0dd08f579fcb54136..22809e1741fca097508874ffe9b92fadd16c55df 100644 (file)
--- a/socket.go
+++ b/socket.go
@@ -4,13 +4,12 @@ import (
        "context"
        "net"
        "strconv"
+       "syscall"
 
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/perf"
        "github.com/anacrolix/missinggo/v2"
        "github.com/pkg/errors"
-
-       "github.com/anacrolix/torrent/dialer"
 )
 
 type Listener interface {
@@ -38,13 +37,54 @@ func listen(n network, addr string, f firewallCallback, logger log.Logger) (sock
        }
 }
 
+var tcpListenConfig = net.ListenConfig{
+       Control: func(network, address string, c syscall.RawConn) (err error) {
+               controlErr := c.Control(func(fd uintptr) {
+                       err = setReusePortSockOpts(fd)
+               })
+               if err != nil {
+                       return
+               }
+               err = controlErr
+               return
+       },
+       // BitTorrent connections manage their own keep-alives.
+       KeepAlive: -1,
+}
+
 func listenTcp(network, address string) (s socket, err error) {
-       l, err := net.Listen(network, address)
+       l, err := tcpListenConfig.Listen(context.Background(), network, address)
        return tcpSocket{
                Listener: l,
                NetworkDialer: NetworkDialer{
                        Network: network,
-                       Dialer:  dialer.Default,
+                       Dialer: &net.Dialer{
+                               // My hope is that dialling out from a consistent port will improve the
+                               // hole-punching behaviour. It might also prevent duplicate connections to the same
+                               // peer if the peer does the same thing. There should probably be a fallback dialer
+                               // if we fail to configure the socket to reuse ports for whatever reason.
+                               LocalAddr: l.Addr(),
+                               // We don't want fallback, as we explicitly manage the IPv4/IPv6 distinction
+                               // ourselves, although it's probably not triggered as I think the network is already
+                               // constrained to tcp4 or tcp6 at this point.
+                               FallbackDelay: -1,
+                               // BitTorrent connections manage their own keep-alives.
+                               KeepAlive: tcpListenConfig.KeepAlive,
+                               Control: func(network, address string, c syscall.RawConn) (err error) {
+                                       controlErr := c.Control(func(fd uintptr) {
+                                               err = setSockNoLinger(fd)
+                                               if err != nil {
+                                                       // Failing to disable linger is undesirable, but not fatal.
+                                                       log.Printf("error setting linger socket option on tcp socket: %v", err)
+                                               }
+                                               err = setReusePortSockOpts(fd)
+                                       })
+                                       if err == nil {
+                                               err = controlErr
+                                       }
+                                       return
+                               },
+                       },
                },
        }, err
 }
diff --git a/sockopts.go b/sockopts.go
new file mode 100644 (file)
index 0000000..54f307d
--- /dev/null
@@ -0,0 +1,10 @@
+//go:build !wasm
+
+package torrent
+
+import "syscall"
+
+var lingerOffVal = syscall.Linger{
+       Onoff:  0,
+       Linger: 0,
+}
diff --git a/sockopts_unix.go b/sockopts_unix.go
new file mode 100644 (file)
index 0000000..52ec9e8
--- /dev/null
@@ -0,0 +1,29 @@
+//go:build !windows && !wasm
+
+package torrent
+
+import (
+       "syscall"
+
+       "golang.org/x/sys/unix"
+)
+
+func setReusePortSockOpts(fd uintptr) (err error) {
+       // I would use libp2p/go-reuseport to do this here, but no surprise it's
+       // implemented incorrectly.
+
+       // Looks like we can get away with just REUSEPORT at least on Darwin, and probably by
+       // extension BSDs and Linux.
+       if false {
+               err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
+               if err != nil {
+                       return
+               }
+       }
+       err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1)
+       return
+}
+
+func setSockNoLinger(fd uintptr) (err error) {
+       return syscall.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &lingerOffVal)
+}
diff --git a/sockopts_wasm.go b/sockopts_wasm.go
new file mode 100644 (file)
index 0000000..9705b91
--- /dev/null
@@ -0,0 +1,12 @@
+package torrent
+
+// It's possible that we either need to use JS-specific way to allow port reuse, or to fall back to
+// dialling TCP without forcing the local address to match the listener. If the fallback is
+// implemented, then this should probably return an error to trigger it.
+func setReusePortSockOpts(fd uintptr) error {
+       return nil
+}
+
+func setSockNoLinger(fd uintptr) error {
+       return nil
+}
diff --git a/sockopts_windows.go b/sockopts_windows.go
new file mode 100644 (file)
index 0000000..c3c0ab0
--- /dev/null
@@ -0,0 +1,15 @@
+package torrent
+
+import (
+       "syscall"
+
+       "golang.org/x/sys/windows"
+)
+
+func setReusePortSockOpts(fd uintptr) (err error) {
+       return windows.SetsockoptInt(windows.Handle(fd), windows.SOL_SOCKET, windows.SO_REUSEADDR, 1)
+}
+
+func setSockNoLinger(fd uintptr) (err error) {
+       return syscall.SetsockoptLinger(syscall.Handle(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &lingerOffVal)
+}