16 "github.com/anacrolix/log"
17 "github.com/anacrolix/missinggo/v2/iter"
18 qt "github.com/frankban/quicktest"
19 "github.com/stretchr/testify/assert"
20 "github.com/stretchr/testify/require"
21 "golang.org/x/time/rate"
23 "github.com/anacrolix/torrent/internal/testutil"
26 // Check that after completing leeching, a leecher transitions to a seeding
27 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
28 func TestHolepunchConnect(t *testing.T) {
30 greetingTempDir, mi := testutil.GreetingTestTorrent()
31 defer os.RemoveAll(greetingTempDir)
33 cfg := TestingConfig(t)
35 cfg.MaxAllocPeerRequestDataPerConn = 4
36 cfg.DataDir = greetingTempDir
39 cfg.AcceptPeerConnections = false
40 // Listening, even without accepting, still means the leecher-leecher completes the dial to the
41 // seeder, and so it won't attempt to holepunch.
43 // Ensure that responding to holepunch connects don't wait around for the dial limit. We also
44 // have to allow the initial connection to the leecher though, so it can rendezvous for us.
45 cfg.DialRateLimiter = rate.NewLimiter(0, 1)
46 seeder, err := NewClient(cfg)
47 require.NoError(t, err)
49 defer testutil.ExportStatusWriter(seeder, "s", t)()
50 seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
51 require.NoError(t, err)
53 seederTorrent.VerifyData()
55 cfg = TestingConfig(t)
57 cfg.DataDir = t.TempDir()
58 cfg.AlwaysWantConns = true
59 // This way the leecher leecher will still try to use this peer as a relay, but won't be told
60 // about the seeder via PEX.
61 //cfg.DisablePEX = true
63 leecher, err := NewClient(cfg)
64 require.NoError(t, err)
66 defer testutil.ExportStatusWriter(leecher, "l", t)()
68 cfg = TestingConfig(t)
70 cfg.DataDir = t.TempDir()
71 cfg.MaxAllocPeerRequestDataPerConn = 4
73 cfg.NominalDialTimeout = time.Second
74 //cfg.DisableUTP = true
75 leecherLeecher, _ := NewClient(cfg)
76 require.NoError(t, err)
77 defer leecherLeecher.Close()
78 defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
79 leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
80 ret = TorrentSpecFromMetaInfo(mi)
85 require.NoError(t, err)
87 llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
88 ret = TorrentSpecFromMetaInfo(mi)
92 require.NoError(t, err)
101 qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
103 go seederTorrent.AddClientPeer(leecher)
104 waitForConns(seederTorrent)
105 go llg.AddClientPeer(leecher)
107 time.Sleep(time.Second)
109 targetAddr := seeder.ListenAddrs()[0]
110 log.Printf("trying to initiate to %v", targetAddr)
111 initiateConn(outgoingConnOpts{
116 requireRendezvous: true,
117 skipHolepunchRendezvous: false,
118 HeaderObfuscationPolicy: llg.cl.config.HeaderObfuscationPolicy,
123 c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
124 c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
126 llClientStats := leecherLeecher.Stats()
127 c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
128 c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
129 c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
132 func waitForConns(t *Torrent) {
143 // Show that dialling TCP will complete before the other side accepts.
144 func TestDialTcpNotAccepting(t *testing.T) {
145 l, err := net.Listen("tcp", "localhost:0")
147 c.Check(err, qt.IsNil)
149 dialedConn, err := net.Dial("tcp", l.Addr().String())
150 c.Assert(err, qt.IsNil)
154 func TestTcpSimultaneousOpen(t *testing.T) {
155 const network = "tcp"
156 ctx := context.Background()
157 makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
158 dialer := net.Dialer{
159 LocalAddr: &net.TCPAddr{
160 //IP: net.IPv6loopback,
164 return func() (net.Conn, error) {
165 return dialer.DialContext(ctx, network, remoteAddr)
169 // I really hate doing this in unit tests, but we would need to pick apart Dialer to get
170 // perfectly synchronized simultaneous dials.
171 for range iter.N(10) {
172 first, second := randPortPair()
173 t.Logf("ports are %v and %v", first, second)
174 err := testSimultaneousOpen(
176 makeDialer(first, fmt.Sprintf("localhost:%d", second)),
177 makeDialer(second, fmt.Sprintf("localhost:%d", first)),
182 // This proves that the connections are not the same.
183 if errors.Is(err, errMsgNotReceived) {
186 // Could be a timing issue, so try again.
189 // If we weren't able to get a simultaneous dial to occur, then we can't call it a failure.
190 t.Skip("couldn't synchronize dials")
193 func randIntInRange(low, high int) int {
194 return rand.Intn(high-low+1) + low
197 func randDynamicPort() int {
198 return randIntInRange(49152, 65535)
201 func randPortPair() (first int, second int) {
202 first = randDynamicPort()
204 second = randDynamicPort()
211 func writeMsg(conn net.Conn) {
212 conn.Write([]byte(defaultMsg))
213 // Writing must be closed so the reader will get EOF and stop reading.
217 func readMsg(conn net.Conn) error {
218 msgBytes, err := io.ReadAll(conn)
222 msgStr := string(msgBytes)
223 if msgStr != defaultMsg {
224 return fmt.Errorf("read %q", msgStr)
229 var errMsgNotReceived = errors.New("msg not received in time")
231 // Runs two dialers simultaneously, then sends a message on one connection and check it reads from
232 // the other, thereby showing that both dials obtained endpoints to the same connection.
233 func testSimultaneousOpen(
234 cleanup func(func()),
235 firstDialer, secondDialer func() (net.Conn, error),
237 errs := make(chan error)
238 var dialsDone sync.WaitGroup
240 dialsDone.Add(numDials)
241 signal := make(chan struct{})
242 var dialersDone sync.WaitGroup
243 dialersDone.Add(numDials)
245 dialer func() (net.Conn, error),
246 onSignal func(net.Conn),
248 defer dialersDone.Done()
249 conn, err := dialer()
266 func(conn net.Conn) {
273 func(conn net.Conn) {
274 gotMsg := make(chan error, 1)
276 gotMsg <- readMsg(conn)
279 case err := <-gotMsg:
281 case <-time.After(time.Second):
282 errs <- errMsgNotReceived
287 for range iter.N(numDials) {
294 for range iter.N(numDials) {
304 const defaultMsg = "hello"
306 // Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
307 // get separate connections. This means that holepunch connect may result in an accept (and dial)
308 // for one or both peers involved.
309 func TestUtpSimultaneousOpen(t *testing.T) {
311 const network = "udp"
312 ctx := context.Background()
313 newUtpSocket := func(addr string) utpSocket {
314 socket, err := NewUtpSocket(
317 func(net.Addr) bool {
322 c.Assert(err, qt.IsNil)
325 first := newUtpSocket("localhost:3000")
327 second := newUtpSocket("localhost:3001")
329 getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
330 return func() (net.Conn, error) {
331 return sock.DialContext(ctx, network, addr)
334 err := testSimultaneousOpen(
336 getDial(first, "localhost:3001"),
337 getDial(second, "localhost:3000"),
339 c.Assert(err, qt.ErrorIs, errMsgNotReceived)
342 func testDirectDialMsg(c *qt.C, r, w net.Conn) {
345 c.Assert(err, qt.IsNil)
348 // Show that dialling one socket and accepting from the other results in them having ends of the
350 func TestUtpDirectDialMsg(t *testing.T) {
352 const network = "udp"
353 ctx := context.Background()
354 newUtpSocket := func(addr string) utpSocket {
355 socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
358 c.Assert(err, qt.IsNil)
361 first := newUtpSocket("localhost:0")
363 second := newUtpSocket("localhost:0")
365 writer, err := first.DialContext(ctx, network, second.Addr().String())
366 c.Assert(err, qt.IsNil)
368 reader, err := second.Accept()
370 c.Assert(err, qt.IsNil)
371 testDirectDialMsg(c, reader, writer)