"net/netip"
"sort"
"strconv"
- "strings"
"time"
"github.com/anacrolix/chansync"
res = <-resCh
}
}()
- // There are still incompleted dials.
+ // There are still uncompleted dials.
go func() {
for ; left > 0; left-- {
conn := (<-resCh).Conn
func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
c, err := s.Dial(ctx, addr)
+ if err != nil {
+ log.Levelf(log.Debug, "error dialing %q: %v", addr, err)
+ }
// This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
- // it now in case we close the connection forthwith.
+ // it now in case we close the connection forthwith. Note this is also done in the TCP dialer
+ // code to increase the chance it's done.
if tc, ok := c.(*net.TCPConn); ok {
tc.SetLinger(0)
}
return c
}
-func forgettableDialError(err error) bool {
- return strings.Contains(err.Error(), "no suitable address found")
-}
-
func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
if _, ok := t.halfOpen[addr]; !ok {
panic("invariant broken")
assert.EqualValues(t, "d\n", string(b))
}
+// TestResponsive was the first test to fail if uTP is disabled and TCP sockets dial from the
+// listening port.
+func TestResponsiveTcpOnly(t *testing.T) {
+ seederDataDir, mi := testutil.GreetingTestTorrent()
+ defer os.RemoveAll(seederDataDir)
+ cfg := TestingConfig(t)
+ cfg.DisableUTP = true
+ cfg.Seed = true
+ cfg.DataDir = seederDataDir
+ seeder, err := NewClient(cfg)
+ require.Nil(t, err)
+ defer seeder.Close()
+ seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
+ seederTorrent.VerifyData()
+ leecherDataDir := t.TempDir()
+ cfg = TestingConfig(t)
+ cfg.DataDir = leecherDataDir
+ leecher, err := NewClient(cfg)
+ require.Nil(t, err)
+ defer leecher.Close()
+ leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
+ ret = TorrentSpecFromMetaInfo(mi)
+ ret.ChunkSize = 2
+ return
+ }())
+ leecherTorrent.AddClientPeer(seeder)
+ reader := leecherTorrent.NewReader()
+ defer reader.Close()
+ reader.SetReadahead(0)
+ reader.SetResponsive()
+ b := make([]byte, 2)
+ _, err = reader.Seek(3, io.SeekStart)
+ require.NoError(t, err)
+ _, err = io.ReadFull(reader, b)
+ assert.Nil(t, err)
+ assert.EqualValues(t, "lo", string(b))
+ _, err = reader.Seek(11, io.SeekStart)
+ require.NoError(t, err)
+ n, err := io.ReadFull(reader, b)
+ assert.Nil(t, err)
+ assert.EqualValues(t, 2, n)
+ assert.EqualValues(t, "d\n", string(b))
+}
+
func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
seederDataDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(seederDataDir)
defer s.Close()
}
cfg := TestingConfig(t).SetListenAddr(":50007")
+ cfg.DisableUTP = false
cl, err := NewClient(cfg)
require.Error(t, err)
require.Nil(t, cl)
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.8.0
go.opentelemetry.io/otel/sdk v1.8.0
go.opentelemetry.io/otel/trace v1.8.0
- golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d
+ golang.org/x/sys v0.5.0
golang.org/x/time v0.0.0-20220609170525-579cf78fd858
)
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
- golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect
"context"
"net"
"strconv"
+ "syscall"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/v2"
"github.com/pkg/errors"
-
- "github.com/anacrolix/torrent/dialer"
)
type Listener interface {
}
}
+var tcpListenConfig = net.ListenConfig{
+ Control: func(network, address string, c syscall.RawConn) (err error) {
+ controlErr := c.Control(func(fd uintptr) {
+ err = setReusePortSockOpts(fd)
+ })
+ if err != nil {
+ return
+ }
+ err = controlErr
+ return
+ },
+ // BitTorrent connections manage their own keep-alives.
+ KeepAlive: -1,
+}
+
func listenTcp(network, address string) (s socket, err error) {
- l, err := net.Listen(network, address)
+ l, err := tcpListenConfig.Listen(context.Background(), network, address)
return tcpSocket{
Listener: l,
NetworkDialer: NetworkDialer{
Network: network,
- Dialer: dialer.Default,
+ Dialer: &net.Dialer{
+ // My hope is that dialling out from a consistent port will improve the
+ // hole-punching behaviour. It might also prevent duplicate connections to the same
+ // peer if the peer does the same thing. There should probably be a fallback dialer
+ // if we fail to configure the socket to reuse ports for whatever reason.
+ LocalAddr: l.Addr(),
+ // We don't want fallback, as we explicitly manage the IPv4/IPv6 distinction
+ // ourselves, although it's probably not triggered as I think the network is already
+ // constrained to tcp4 or tcp6 at this point.
+ FallbackDelay: -1,
+ // BitTorrent connections manage their own keep-alives.
+ KeepAlive: tcpListenConfig.KeepAlive,
+ Control: func(network, address string, c syscall.RawConn) (err error) {
+ controlErr := c.Control(func(fd uintptr) {
+ err = setSockNoLinger(fd)
+ if err != nil {
+ // Failing to disable linger is undesirable, but not fatal.
+ log.Printf("error setting linger socket option on tcp socket: %v", err)
+ }
+ err = setReusePortSockOpts(fd)
+ })
+ if err == nil {
+ err = controlErr
+ }
+ return
+ },
+ },
},
}, err
}
--- /dev/null
+//go:build !wasm
+
+package torrent
+
+import "syscall"
+
+var lingerOffVal = syscall.Linger{
+ Onoff: 0,
+ Linger: 0,
+}
--- /dev/null
+//go:build !windows && !wasm
+
+package torrent
+
+import (
+ "syscall"
+
+ "golang.org/x/sys/unix"
+)
+
+func setReusePortSockOpts(fd uintptr) (err error) {
+ // I would use libp2p/go-reuseport to do this here, but no surprise it's
+ // implemented incorrectly.
+
+ // Looks like we can get away with just REUSEPORT at least on Darwin, and probably by
+ // extension BSDs and Linux.
+ if false {
+ err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
+ if err != nil {
+ return
+ }
+ }
+ err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1)
+ return
+}
+
+func setSockNoLinger(fd uintptr) (err error) {
+ return syscall.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &lingerOffVal)
+}
--- /dev/null
+package torrent
+
+// It's possible that we either need to use JS-specific way to allow port reuse, or to fall back to
+// dialling TCP without forcing the local address to match the listener. If the fallback is
+// implemented, then this should probably return an error to trigger it.
+func setReusePortSockOpts(fd uintptr) error {
+ return nil
+}
+
+func setSockNoLinger(fd uintptr) error {
+ return nil
+}
--- /dev/null
+package torrent
+
+import (
+ "syscall"
+
+ "golang.org/x/sys/windows"
+)
+
+func setReusePortSockOpts(fd uintptr) (err error) {
+ return windows.SetsockoptInt(windows.Handle(fd), windows.SOL_SOCKET, windows.SO_REUSEADDR, 1)
+}
+
+func setSockNoLinger(fd uintptr) (err error) {
+ return syscall.SetsockoptLinger(syscall.Handle(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &lingerOffVal)
+}