package torrent
import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "math/rand"
+ "net"
"os"
"sync"
"testing"
"testing/iotest"
+ "time"
"github.com/anacrolix/log"
-
- "github.com/anacrolix/torrent/internal/testutil"
+ "github.com/anacrolix/missinggo/v2/iter"
qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "golang.org/x/time/rate"
+
+ "github.com/anacrolix/torrent/internal/testutil"
)
// Check that after completing leeching, a leecher transitions to a seeding
// correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
func TestHolepunchConnect(t *testing.T) {
+ c := qt.New(t)
greetingTempDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingTempDir)
cfg.DisablePEX = true
cfg.Debug = true
cfg.AcceptPeerConnections = false
- //cfg.DisableUTP = true
+ // Listening, even without accepting, still means the leecher-leecher completes the dial to the
+ // seeder, and so it won't attempt to holepunch.
+ cfg.DisableTCP = true
+ // Ensure that responding to holepunch connects don't wait around for the dial limit. We also
+ // have to allow the initial connection to the leecher though, so it can rendezvous for us.
+ cfg.DialRateLimiter = rate.NewLimiter(0, 1)
+ cfg.Logger = cfg.Logger.WithContextText("seeder")
seeder, err := NewClient(cfg)
require.NoError(t, err)
defer seeder.Close()
cfg.Seed = true
cfg.DataDir = t.TempDir()
cfg.AlwaysWantConns = true
+ cfg.Logger = cfg.Logger.WithContextText("leecher")
// This way the leecher leecher will still try to use this peer as a relay, but won't be told
// about the seeder via PEX.
//cfg.DisablePEX = true
- //cfg.Debug = true
+ cfg.Debug = true
leecher, err := NewClient(cfg)
require.NoError(t, err)
defer leecher.Close()
cfg.Seed = false
cfg.DataDir = t.TempDir()
cfg.MaxAllocPeerRequestDataPerConn = 4
- //cfg.Debug = true
+ cfg.Debug = true
+ cfg.NominalDialTimeout = time.Second
+ cfg.Logger = cfg.Logger.WithContextText("leecher-leecher")
//cfg.DisableUTP = true
leecherLeecher, _ := NewClient(cfg)
require.NoError(t, err)
waitForConns(seederTorrent)
go llg.AddClientPeer(leecher)
waitForConns(llg)
- //time.Sleep(time.Second)
+ time.Sleep(time.Second)
llg.cl.lock()
- targetAddr := seeder.ListenAddrs()[1]
+ targetAddr := seeder.ListenAddrs()[0]
log.Printf("trying to initiate to %v", targetAddr)
- llg.initiateConn(PeerInfo{
- Addr: targetAddr,
- }, true, false, false)
+ initiateConn(outgoingConnOpts{
+ peerInfo: PeerInfo{
+ Addr: targetAddr,
+ },
+ t: llg,
+ requireRendezvous: true,
+ skipHolepunchRendezvous: false,
+ HeaderObfuscationPolicy: llg.cl.config.HeaderObfuscationPolicy,
+ }, true)
llg.cl.unlock()
wg.Wait()
+
+ c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
+ c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
+
+ llClientStats := leecherLeecher.Stats()
+ c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
+ c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
+ c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
}
func waitForConns(t *Torrent) {
t.cl.event.Wait()
}
}
+
+// Show that dialling TCP will complete before the other side accepts.
+func TestDialTcpNotAccepting(t *testing.T) {
+ l, err := net.Listen("tcp", "localhost:0")
+ c := qt.New(t)
+ c.Check(err, qt.IsNil)
+ defer l.Close()
+ dialedConn, err := net.Dial("tcp", l.Addr().String())
+ c.Assert(err, qt.IsNil)
+ dialedConn.Close()
+}
+
+func TestTcpSimultaneousOpen(t *testing.T) {
+ const network = "tcp"
+ ctx := context.Background()
+ makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
+ dialer := net.Dialer{
+ LocalAddr: &net.TCPAddr{
+ //IP: net.IPv6loopback,
+ Port: localPort,
+ },
+ }
+ return func() (net.Conn, error) {
+ return dialer.DialContext(ctx, network, remoteAddr)
+ }
+ }
+ c := qt.New(t)
+ // I really hate doing this in unit tests, but we would need to pick apart Dialer to get
+ // perfectly synchronized simultaneous dials.
+ for range iter.N(10) {
+ first, second := randPortPair()
+ t.Logf("ports are %v and %v", first, second)
+ err := testSimultaneousOpen(
+ c.Cleanup,
+ makeDialer(first, fmt.Sprintf("localhost:%d", second)),
+ makeDialer(second, fmt.Sprintf("localhost:%d", first)),
+ )
+ if err == nil {
+ return
+ }
+ // This proves that the connections are not the same.
+ if errors.Is(err, errMsgNotReceived) {
+ t.Fatal(err)
+ }
+ // Could be a timing issue, so try again.
+ t.Log(err)
+ }
+ // If we weren't able to get a simultaneous dial to occur, then we can't call it a failure.
+ t.Skip("couldn't synchronize dials")
+}
+
+func randIntInRange(low, high int) int {
+ return rand.Intn(high-low+1) + low
+}
+
+func randDynamicPort() int {
+ return randIntInRange(49152, 65535)
+}
+
+func randPortPair() (first int, second int) {
+ first = randDynamicPort()
+ for {
+ second = randDynamicPort()
+ if second != first {
+ return
+ }
+ }
+}
+
+func writeMsg(conn net.Conn) {
+ conn.Write([]byte(defaultMsg))
+ // Writing must be closed so the reader will get EOF and stop reading.
+ conn.Close()
+}
+
+func readMsg(conn net.Conn) error {
+ msgBytes, err := io.ReadAll(conn)
+ if err != nil {
+ return err
+ }
+ msgStr := string(msgBytes)
+ if msgStr != defaultMsg {
+ return fmt.Errorf("read %q", msgStr)
+ }
+ return nil
+}
+
+var errMsgNotReceived = errors.New("msg not received in time")
+
+// Runs two dialers simultaneously, then sends a message on one connection and check it reads from
+// the other, thereby showing that both dials obtained endpoints to the same connection.
+func testSimultaneousOpen(
+ cleanup func(func()),
+ firstDialer, secondDialer func() (net.Conn, error),
+) error {
+ errs := make(chan error)
+ var dialsDone sync.WaitGroup
+ const numDials = 2
+ dialsDone.Add(numDials)
+ signal := make(chan struct{})
+ var dialersDone sync.WaitGroup
+ dialersDone.Add(numDials)
+ doDial := func(
+ dialer func() (net.Conn, error),
+ onSignal func(net.Conn),
+ ) {
+ defer dialersDone.Done()
+ conn, err := dialer()
+ dialsDone.Done()
+ errs <- err
+ if err != nil {
+ return
+ }
+ cleanup(func() {
+ conn.Close()
+ })
+ <-signal
+ onSignal(conn)
+ //if err == nil {
+ // conn.Close()
+ //}
+ }
+ go doDial(
+ firstDialer,
+ func(conn net.Conn) {
+ writeMsg(conn)
+ errs <- nil
+ },
+ )
+ go doDial(
+ secondDialer,
+ func(conn net.Conn) {
+ gotMsg := make(chan error, 1)
+ go func() {
+ gotMsg <- readMsg(conn)
+ }()
+ select {
+ case err := <-gotMsg:
+ errs <- err
+ case <-time.After(time.Second):
+ errs <- errMsgNotReceived
+ }
+ },
+ )
+ dialsDone.Wait()
+ for range iter.N(numDials) {
+ err := <-errs
+ if err != nil {
+ return err
+ }
+ }
+ close(signal)
+ for range iter.N(numDials) {
+ err := <-errs
+ if err != nil {
+ return err
+ }
+ }
+ dialersDone.Wait()
+ return nil
+}
+
+const defaultMsg = "hello"
+
+// Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
+// get separate connections. This means that holepunch connect may result in an accept (and dial)
+// for one or both peers involved.
+func TestUtpSimultaneousOpen(t *testing.T) {
+ t.Parallel()
+ c := qt.New(t)
+ const network = "udp"
+ ctx := context.Background()
+ newUtpSocket := func(addr string) utpSocket {
+ socket, err := NewUtpSocket(
+ network,
+ addr,
+ func(net.Addr) bool {
+ return false
+ },
+ log.Default,
+ )
+ c.Assert(err, qt.IsNil)
+ return socket
+ }
+ first := newUtpSocket("localhost:0")
+ defer first.Close()
+ second := newUtpSocket("localhost:0")
+ defer second.Close()
+ getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
+ return func() (net.Conn, error) {
+ return sock.DialContext(ctx, network, addr)
+ }
+ }
+ t.Logf("first addr is %v. second addr is %v", first.Addr().String(), second.Addr().String())
+ for range iter.N(10) {
+ err := testSimultaneousOpen(
+ c.Cleanup,
+ getDial(first, second.Addr().String()),
+ getDial(second, first.Addr().String()),
+ )
+ if err == nil {
+ t.Fatal("expected utp to fail simultaneous open")
+ }
+ if errors.Is(err, errMsgNotReceived) {
+ return
+ }
+ skipGoUtpDialIssue(t, err)
+ t.Log(err)
+ time.Sleep(time.Second)
+ }
+ t.FailNow()
+}
+
+func writeAndReadMsg(r, w net.Conn) error {
+ go writeMsg(w)
+ return readMsg(r)
+}
+
+func skipGoUtpDialIssue(t *testing.T, err error) {
+ if err.Error() == "timed out waiting for ack" {
+ t.Skip("anacrolix go utp implementation has issues. Use anacrolix/go-libutp by enabling CGO.")
+ }
+}
+
+// Show that dialling one socket and accepting from the other results in them having ends of the
+// same connection.
+func TestUtpDirectDialMsg(t *testing.T) {
+ t.Parallel()
+ c := qt.New(t)
+ const network = "udp4"
+ ctx := context.Background()
+ newUtpSocket := func(addr string) utpSocket {
+ socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
+ return false
+ }, log.Default)
+ c.Assert(err, qt.IsNil)
+ return socket
+ }
+ for range iter.N(10) {
+ err := func() error {
+ first := newUtpSocket("localhost:0")
+ defer first.Close()
+ second := newUtpSocket("localhost:0")
+ defer second.Close()
+ writer, err := first.DialContext(ctx, network, second.Addr().String())
+ if err != nil {
+ return err
+ }
+ defer writer.Close()
+ reader, err := second.Accept()
+ defer reader.Close()
+ c.Assert(err, qt.IsNil)
+ return writeAndReadMsg(reader, writer)
+ }()
+ if err == nil {
+ return
+ }
+ skipGoUtpDialIssue(t, err)
+ t.Log(err)
+ time.Sleep(time.Second)
+ }
+ t.FailNow()
+}