package torrent
+import (
+ "net/netip"
+)
+
+type clientHolepunchAddrSets struct {
+ undialableWithoutHolepunch map[netip.AddrPort]struct{}
+ undialableWithoutHolepunchDialedAfterHolepunchConnect map[netip.AddrPort]struct{}
+ dialableOnlyAfterHolepunch map[netip.AddrPort]struct{}
+ dialedSuccessfullyAfterHolepunchConnect map[netip.AddrPort]struct{}
+ probablyOnlyConnectedDueToHolepunch map[netip.AddrPort]struct{}
+}
+
type ClientStats struct {
ConnStats
// if a Torrent is dropped while there are outstanding dials.
ActiveHalfOpenAttempts int
+ NumPeersUndialableWithoutHolepunch int
// Number of unique peer addresses that were dialed after receiving a holepunch connect message,
// that have previously been undialable without any hole-punching attempts.
NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect int
// Number of unique peer addresses that were successfully dialed and connected after a holepunch
// connect message and previously failing to connect without holepunching.
- NumPeersDialableOnlyAfterHolepunch int
+ NumPeersDialableOnlyAfterHolepunch int
+ NumPeersDialedSuccessfullyAfterHolepunchConnect int
+ NumPeersProbablyOnlyConnectedDueToHolepunch int
+}
+
+func (cl *Client) statsLocked() (stats ClientStats) {
+ stats.ConnStats = cl.connStats.Copy()
+ stats.ActiveHalfOpenAttempts = cl.numHalfOpen
+
+ stats.NumPeersUndialableWithoutHolepunch = len(cl.undialableWithoutHolepunch)
+ stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect = len(cl.undialableWithoutHolepunchDialedAfterHolepunchConnect)
+ stats.NumPeersDialableOnlyAfterHolepunch = len(cl.dialableOnlyAfterHolepunch)
+ stats.NumPeersDialedSuccessfullyAfterHolepunchConnect = len(cl.dialedSuccessfullyAfterHolepunchConnect)
+ stats.NumPeersProbablyOnlyConnectedDueToHolepunch = len(cl.probablyOnlyConnectedDueToHolepunch)
+
+ return
}
activeAnnounceLimiter limiter.Instance
httpClient *http.Client
- undialableWithoutHolepunch map[netip.AddrPort]struct{}
- undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect map[netip.AddrPort]struct{}
- dialableOnlyAfterHolepunch map[netip.AddrPort]struct{}
+ clientHolepunchAddrSets
}
type ipStr string
log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger)
continue
}
+ {
+ holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr())
+ if holepunchErr == nil {
+ cl.lock()
+ if g.MapContains(
+ cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
+ holepunchAddr,
+ ) {
+ g.MakeMapIfNil(&cl.probablyOnlyConnectedDueToHolepunch)
+ g.MapInsert(cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr, struct{}{})
+ }
+ cl.unlock()
+ }
+ }
go func() {
if reject != nil {
torrent.Add("rejected accepted connections", 1)
}
}
holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr)
- if holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) && opts.receivedHolepunchConnect {
- g.MakeMapIfNilAndSet(
- &cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect,
- holepunchAddr,
- struct{}{},
- )
+ if holepunchAddrErr == nil && opts.receivedHolepunchConnect {
+ cl.lock()
+ if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
+ g.MakeMapIfNilAndSet(
+ &cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
+ holepunchAddr,
+ struct{}{},
+ )
+ }
+ cl.unlock()
}
headerObfuscationPolicy := opts.HeaderObfuscationPolicy
obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred
if firstDialResult.Conn == nil {
// No dialers worked. Try to initiate a holepunching rendezvous.
if holepunchAddrErr == nil {
+ cl.lock()
if !opts.receivedHolepunchConnect {
- cl.lock()
g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
- cl.unlock()
}
opts.t.startHolepunchRendezvous(holepunchAddr)
+ cl.unlock()
}
err = fmt.Errorf("all initial dials failed")
return
}
- if opts.receivedHolepunchConnect && holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
- g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
+ if opts.receivedHolepunchConnect && holepunchAddrErr == nil {
+ cl.lock()
+ if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
+ g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
+ }
+ g.MakeMapIfNil(&cl.dialedSuccessfullyAfterHolepunchConnect)
+ g.MapInsert(cl.dialedSuccessfullyAfterHolepunchConnect, holepunchAddr, struct{}{})
+ cl.unlock()
}
c, err = doProtocolHandshakeOnDialResult(
opts.t,
defer cl.rUnlock()
return cl.statsLocked()
}
-
-func (cl *Client) statsLocked() (stats ClientStats) {
- stats.ConnStats = cl.connStats.Copy()
- stats.ActiveHalfOpenAttempts = cl.numHalfOpen
- stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect =
- len(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect)
- stats.NumPeersDialableOnlyAfterHolepunch =
- len(cl.dialableOnlyAfterHolepunch)
- return
-}
package torrent
import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "math/rand"
+ "net"
"os"
"sync"
"testing"
"testing/iotest"
+ "time"
"github.com/anacrolix/log"
-
- "github.com/anacrolix/torrent/internal/testutil"
+ "github.com/anacrolix/missinggo/v2/iter"
qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+
+ "github.com/anacrolix/torrent/internal/testutil"
)
// Check that after completing leeching, a leecher transitions to a seeding
// correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
func TestHolepunchConnect(t *testing.T) {
+ c := qt.New(t)
greetingTempDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingTempDir)
cfg.DisablePEX = true
cfg.Debug = true
cfg.AcceptPeerConnections = false
- //cfg.DisableUTP = true
+ // Listening, even without accepting, still means the leecher-leecher completes the dial to the seeder, and so it
+ // won't attempt to holepunch.
+ cfg.DisableTCP = true
seeder, err := NewClient(cfg)
require.NoError(t, err)
defer seeder.Close()
llg.cl.unlock()
wg.Wait()
+ c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
+ c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
+
llClientStats := leecherLeecher.Stats()
- c.Check(llClientStats.NumPeersDialableOnlyAfterHolepunch, qt.Not(qt.Equals), 0)
- c.Check(
- llClientStats.NumPeersDialableOnlyAfterHolepunch,
- qt.Equals,
- llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect,
- )
+ c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
+ c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
+ c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
}
func waitForConns(t *Torrent) {
}
}
+// Show that dialling TCP will complete before the other side accepts.
func TestDialTcpNotAccepting(t *testing.T) {
l, err := net.Listen("tcp", "localhost:0")
c := qt.New(t)
c.Check(err, qt.IsNil)
defer l.Close()
- _, err = net.Dial("tcp", l.Addr().String())
- c.Assert(err, qt.IsNotNil)
+ dialedConn, err := net.Dial("tcp", l.Addr().String())
+ c.Assert(err, qt.IsNil)
+ dialedConn.Close()
+}
+
+func TestTcpSimultaneousOpen(t *testing.T) {
+ const network = "tcp"
+ ctx := context.Background()
+ makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
+ dialer := net.Dialer{
+ LocalAddr: &net.TCPAddr{
+ Port: localPort,
+ },
+ }
+ return func() (net.Conn, error) {
+ return dialer.DialContext(ctx, network, remoteAddr)
+ }
+ }
+ c := qt.New(t)
+ first, second := randPortPair()
+ t.Logf("ports are %v and %v", first, second)
+ err := testSimultaneousOpen(
+ c.Cleanup,
+ makeDialer(first, fmt.Sprintf("localhost:%d", second)),
+ makeDialer(second, fmt.Sprintf("localhost:%d", first)),
+ )
+ c.Assert(err, qt.IsNil)
+}
+
+func randIntInRange(low, high int) int {
+ return rand.Intn(high-low+1) + low
+}
+
+func randDynamicPort() int {
+ return randIntInRange(49152, 65535)
+}
+
+func randPortPair() (first int, second int) {
+ first = randDynamicPort()
+ for {
+ second = randDynamicPort()
+ if second != first {
+ return
+ }
+ }
+}
+
+func writeMsg(conn net.Conn) {
+ conn.Write([]byte(defaultMsg))
+ // Writing must be closed so the reader will get EOF and stop reading.
+ conn.Close()
+}
+
+func readMsg(conn net.Conn) error {
+ msgBytes, err := io.ReadAll(conn)
+ if err != nil {
+ return err
+ }
+ msgStr := string(msgBytes)
+ if msgStr != defaultMsg {
+ return fmt.Errorf("read %q", msgStr)
+ }
+ return nil
+}
+
+var errMsgNotReceived = errors.New("msg not received in time")
+
+// Runs two dialers simultaneously, then sends a message on one connection and check it reads from
+// the other, thereby showing that both dials obtained endpoints to the same connection.
+func testSimultaneousOpen(
+ cleanup func(func()),
+ firstDialer, secondDialer func() (net.Conn, error),
+) error {
+ errs := make(chan error)
+ var dialsDone sync.WaitGroup
+ const numDials = 2
+ dialsDone.Add(numDials)
+ signal := make(chan struct{})
+ var dialersDone sync.WaitGroup
+ dialersDone.Add(numDials)
+ doDial := func(
+ dialer func() (net.Conn, error),
+ onSignal func(net.Conn),
+ ) {
+ defer dialersDone.Done()
+ conn, err := dialer()
+ dialsDone.Done()
+ errs <- err
+ if err != nil {
+ return
+ }
+ cleanup(func() {
+ conn.Close()
+ })
+ <-signal
+ onSignal(conn)
+ //if err == nil {
+ // conn.Close()
+ //}
+ }
+ go doDial(
+ firstDialer,
+ func(conn net.Conn) {
+ writeMsg(conn)
+ errs <- nil
+ },
+ )
+ go doDial(
+ secondDialer,
+ func(conn net.Conn) {
+ gotMsg := make(chan error, 1)
+ go func() {
+ gotMsg <- readMsg(conn)
+ }()
+ select {
+ case err := <-gotMsg:
+ errs <- err
+ case <-time.After(time.Second):
+ errs <- errMsgNotReceived
+ }
+ },
+ )
+ dialsDone.Wait()
+ for range iter.N(numDials) {
+ err := <-errs
+ if err != nil {
+ return err
+ }
+ }
+ close(signal)
+ for range iter.N(numDials) {
+ err := <-errs
+ if err != nil {
+ return err
+ }
+ }
+ dialersDone.Wait()
+ return nil
+}
+
+const defaultMsg = "hello"
+
+// Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
+// get separate connections. This means that holepunch connect may result in an accept (and dial)
+// for one or both peers involved.
+func TestUtpSimultaneousOpen(t *testing.T) {
+ c := qt.New(t)
+ const network = "udp"
+ ctx := context.Background()
+ newUtpSocket := func(addr string) utpSocket {
+ socket, err := NewUtpSocket(
+ network,
+ addr,
+ func(net.Addr) bool {
+ return false
+ },
+ log.Default,
+ )
+ c.Assert(err, qt.IsNil)
+ return socket
+ }
+ first := newUtpSocket("localhost:3000")
+ defer first.Close()
+ second := newUtpSocket("localhost:3001")
+ defer second.Close()
+ getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
+ return func() (net.Conn, error) {
+ return sock.DialContext(ctx, network, addr)
+ }
+ }
+ err := testSimultaneousOpen(
+ c.Cleanup,
+ getDial(first, "localhost:3001"),
+ getDial(second, "localhost:3000"),
+ )
+ c.Assert(err, qt.ErrorIs, errMsgNotReceived)
+}
+
+func testDirectDialMsg(c *qt.C, r, w net.Conn) {
+ go writeMsg(w)
+ err := readMsg(r)
+ c.Assert(err, qt.IsNil)
+}
+
+// Show that dialling one socket and accepting from the other results in them having ends of the
+// same connection.
+func TestUtpDirectDialMsg(t *testing.T) {
+ c := qt.New(t)
+ const network = "udp"
+ ctx := context.Background()
+ newUtpSocket := func(addr string) utpSocket {
+ socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
+ return false
+ }, log.Default)
+ c.Assert(err, qt.IsNil)
+ return socket
+ }
+ first := newUtpSocket("localhost:0")
+ defer first.Close()
+ second := newUtpSocket("localhost:0")
+ defer second.Close()
+ writer, err := first.DialContext(ctx, network, second.Addr().String())
+ c.Assert(err, qt.IsNil)
+ defer writer.Close()
+ reader, err := second.Accept()
+ defer reader.Close()
+ c.Assert(err, qt.IsNil)
+ testDirectDialMsg(c, reader, writer)
}