package torrent
import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "math/rand"
+ "net"
"os"
"sync"
"testing"
"testing/iotest"
+ "time"
- "github.com/anacrolix/torrent/internal/testutil"
+ "github.com/anacrolix/log"
+ "github.com/anacrolix/missinggo/v2/iter"
qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+
+ "github.com/anacrolix/torrent/internal/testutil"
)
// Check that after completing leeching, a leecher transitions to a seeding
// correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
func TestHolepunchConnect(t *testing.T) {
+ c := qt.New(t)
greetingTempDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingTempDir)
cfg.MaxAllocPeerRequestDataPerConn = 4
cfg.DataDir = greetingTempDir
cfg.DisablePEX = true
- //cfg.Debug = true
+ cfg.Debug = true
cfg.AcceptPeerConnections = false
+ // Listening, even without accepting, still means the leecher-leecher completes the dial to the seeder, and so it
+ // won't attempt to holepunch.
+ cfg.DisableTCP = true
seeder, err := NewClient(cfg)
require.NoError(t, err)
defer seeder.Close()
cfg.DataDir = t.TempDir()
cfg.MaxAllocPeerRequestDataPerConn = 4
cfg.Debug = true
+ cfg.NominalDialTimeout = time.Second
//cfg.DisableUTP = true
leecherLeecher, _ := NewClient(cfg)
require.NoError(t, err)
waitForConns(seederTorrent)
go llg.AddClientPeer(leecher)
waitForConns(llg)
+ time.Sleep(time.Second)
llg.cl.lock()
- llg.initiateConn(PeerInfo{
- Addr: seeder.ListenAddrs()[0],
- }, true, false)
+ targetAddr := seeder.ListenAddrs()[0]
+ log.Printf("trying to initiate to %v", targetAddr)
+ initiateConn(outgoingConnOpts{
+ peerInfo: PeerInfo{
+ Addr: targetAddr,
+ },
+ t: llg,
+ requireRendezvous: true,
+ skipHolepunchRendezvous: false,
+ HeaderObfuscationPolicy: llg.cl.config.HeaderObfuscationPolicy,
+ }, true)
llg.cl.unlock()
wg.Wait()
+
+ c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
+ c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
+
+ llClientStats := leecherLeecher.Stats()
+ c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
+ c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
+ c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
}
func waitForConns(t *Torrent) {
t.cl.event.Wait()
}
}
+
+// Show that dialling TCP will complete before the other side accepts.
+func TestDialTcpNotAccepting(t *testing.T) {
+ l, err := net.Listen("tcp", "localhost:0")
+ c := qt.New(t)
+ c.Check(err, qt.IsNil)
+ defer l.Close()
+ dialedConn, err := net.Dial("tcp", l.Addr().String())
+ c.Assert(err, qt.IsNil)
+ dialedConn.Close()
+}
+
+func TestTcpSimultaneousOpen(t *testing.T) {
+ const network = "tcp"
+ ctx := context.Background()
+ makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
+ dialer := net.Dialer{
+ LocalAddr: &net.TCPAddr{
+ Port: localPort,
+ },
+ }
+ return func() (net.Conn, error) {
+ return dialer.DialContext(ctx, network, remoteAddr)
+ }
+ }
+ c := qt.New(t)
+ first, second := randPortPair()
+ t.Logf("ports are %v and %v", first, second)
+ err := testSimultaneousOpen(
+ c.Cleanup,
+ makeDialer(first, fmt.Sprintf("localhost:%d", second)),
+ makeDialer(second, fmt.Sprintf("localhost:%d", first)),
+ )
+ c.Assert(err, qt.IsNil)
+}
+
+func randIntInRange(low, high int) int {
+ return rand.Intn(high-low+1) + low
+}
+
+func randDynamicPort() int {
+ return randIntInRange(49152, 65535)
+}
+
+func randPortPair() (first int, second int) {
+ first = randDynamicPort()
+ for {
+ second = randDynamicPort()
+ if second != first {
+ return
+ }
+ }
+}
+
+func writeMsg(conn net.Conn) {
+ conn.Write([]byte(defaultMsg))
+ // Writing must be closed so the reader will get EOF and stop reading.
+ conn.Close()
+}
+
+func readMsg(conn net.Conn) error {
+ msgBytes, err := io.ReadAll(conn)
+ if err != nil {
+ return err
+ }
+ msgStr := string(msgBytes)
+ if msgStr != defaultMsg {
+ return fmt.Errorf("read %q", msgStr)
+ }
+ return nil
+}
+
+var errMsgNotReceived = errors.New("msg not received in time")
+
+// Runs two dialers simultaneously, then sends a message on one connection and check it reads from
+// the other, thereby showing that both dials obtained endpoints to the same connection.
+func testSimultaneousOpen(
+ cleanup func(func()),
+ firstDialer, secondDialer func() (net.Conn, error),
+) error {
+ errs := make(chan error)
+ var dialsDone sync.WaitGroup
+ const numDials = 2
+ dialsDone.Add(numDials)
+ signal := make(chan struct{})
+ var dialersDone sync.WaitGroup
+ dialersDone.Add(numDials)
+ doDial := func(
+ dialer func() (net.Conn, error),
+ onSignal func(net.Conn),
+ ) {
+ defer dialersDone.Done()
+ conn, err := dialer()
+ dialsDone.Done()
+ errs <- err
+ if err != nil {
+ return
+ }
+ cleanup(func() {
+ conn.Close()
+ })
+ <-signal
+ onSignal(conn)
+ //if err == nil {
+ // conn.Close()
+ //}
+ }
+ go doDial(
+ firstDialer,
+ func(conn net.Conn) {
+ writeMsg(conn)
+ errs <- nil
+ },
+ )
+ go doDial(
+ secondDialer,
+ func(conn net.Conn) {
+ gotMsg := make(chan error, 1)
+ go func() {
+ gotMsg <- readMsg(conn)
+ }()
+ select {
+ case err := <-gotMsg:
+ errs <- err
+ case <-time.After(time.Second):
+ errs <- errMsgNotReceived
+ }
+ },
+ )
+ dialsDone.Wait()
+ for range iter.N(numDials) {
+ err := <-errs
+ if err != nil {
+ return err
+ }
+ }
+ close(signal)
+ for range iter.N(numDials) {
+ err := <-errs
+ if err != nil {
+ return err
+ }
+ }
+ dialersDone.Wait()
+ return nil
+}
+
+const defaultMsg = "hello"
+
+// Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
+// get separate connections. This means that holepunch connect may result in an accept (and dial)
+// for one or both peers involved.
+func TestUtpSimultaneousOpen(t *testing.T) {
+ c := qt.New(t)
+ const network = "udp"
+ ctx := context.Background()
+ newUtpSocket := func(addr string) utpSocket {
+ socket, err := NewUtpSocket(
+ network,
+ addr,
+ func(net.Addr) bool {
+ return false
+ },
+ log.Default,
+ )
+ c.Assert(err, qt.IsNil)
+ return socket
+ }
+ first := newUtpSocket("localhost:3000")
+ defer first.Close()
+ second := newUtpSocket("localhost:3001")
+ defer second.Close()
+ getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
+ return func() (net.Conn, error) {
+ return sock.DialContext(ctx, network, addr)
+ }
+ }
+ err := testSimultaneousOpen(
+ c.Cleanup,
+ getDial(first, "localhost:3001"),
+ getDial(second, "localhost:3000"),
+ )
+ c.Assert(err, qt.ErrorIs, errMsgNotReceived)
+}
+
+func testDirectDialMsg(c *qt.C, r, w net.Conn) {
+ go writeMsg(w)
+ err := readMsg(r)
+ c.Assert(err, qt.IsNil)
+}
+
+// Show that dialling one socket and accepting from the other results in them having ends of the
+// same connection.
+func TestUtpDirectDialMsg(t *testing.T) {
+ c := qt.New(t)
+ const network = "udp"
+ ctx := context.Background()
+ newUtpSocket := func(addr string) utpSocket {
+ socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
+ return false
+ }, log.Default)
+ c.Assert(err, qt.IsNil)
+ return socket
+ }
+ first := newUtpSocket("localhost:0")
+ defer first.Close()
+ second := newUtpSocket("localhost:0")
+ defer second.Close()
+ writer, err := first.DialContext(ctx, network, second.Addr().String())
+ c.Assert(err, qt.IsNil)
+ defer writer.Close()
+ reader, err := second.Accept()
+ defer reader.Close()
+ c.Assert(err, qt.IsNil)
+ testDirectDialMsg(c, reader, writer)
+}