Files
impala/tests/custom_cluster/test_otel_trace.py
jasonmfehr f8d21f34bc IMPALA-14602: Fix Flaky OTel Test
Tests in test_otel_trace.py that rely on queries being queued assume
a first long-running query will be started before a second query.
These tests are flaky most likely because the first long running
query is executed asynchronously immediately followed by executing a
second query. During slower builds (such as ASAN), the first query
may not be in running state before the second query is started.

This patch adds a check on the first query to ensure it is running
before starting the second query.

Change-Id: I9e77ec70b4668f0daed2ab9411f8f6c52ddccb2a
Reviewed-on: http://gerrit.cloudera.org:8080/23743
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-12-04 08:05:24 +00:00

814 lines
33 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from datetime import datetime
from random import choice
from string import ascii_lowercase
from time import sleep
from impala.error import HiveServer2Error
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.file_utils import count_lines, wait_for_file_line_count
from tests.common.impala_connection import ERROR, FINISHED, PENDING, RUNNING
from tests.common.test_vector import BEESWAX, ImpalaTestDimension
from tests.util.otel_trace import assert_trace
from tests.util.query_profile_util import parse_query_id, parse_retry_status
from tests.util.retry import retry
OUT_DIR = "out_dir_traces"
TRACE_FILE = "export-trace.jsonl"
TRACE_FLAGS = "--otel_trace_enabled=true --otel_trace_exporter=file " \
"--otel_trace_exhaustive_dchecks --otel_file_flush_interval_ms=500 " \
"--otel_file_pattern={out_dir_traces}/" + TRACE_FILE
class TestOtelTraceBase(CustomClusterTestSuite):
def setup_method(self, method):
super(TestOtelTraceBase, self).setup_method(method)
self.assert_impalad_log_contains("INFO", "join Impala Service pool")
self.trace_file_path = "{}/{}".format(self.get_tmp_dir(OUT_DIR), TRACE_FILE)
self.trace_file_count = count_lines(self.trace_file_path, True)
def assert_trace(self, query_id, query_profile, cluster_id, trace_cnt=1, err_span="",
missing_spans=[], async_close=False, exact_trace_cnt=False):
"""Helper method to assert a trace exists in the trace file with the required inputs
for log file path, trace file path, and trace file line count (that was determined
before the test ran)."""
assert_trace(self.build_log_path("impalad", "INFO"), self.trace_file_path,
self.trace_file_count, query_id, query_profile, cluster_id, trace_cnt, err_span,
missing_spans, async_close, exact_trace_cnt)
@CustomClusterTestSuite.with_args(
impalad_args="-v=2 --cluster_id=select_dml --otel_debug {}".format(TRACE_FLAGS),
cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True)
class TestOtelTraceSelectsDMLs(TestOtelTraceBase):
"""Tests that exercise OpenTelemetry tracing behavior for select and dml queries."""
@classmethod
def setup_class(cls):
super(TestOtelTraceSelectsDMLs, cls).setup_class()
cls.test_db = "test_otel_trace_selects_dmls{}_{}".format(
datetime.now().strftime("%Y%m%d%H%M%S"),
"".join(choice(ascii_lowercase) for _ in range(7)))
cls.execute_query_expect_success(cls.client, "CREATE DATABASE {}".format(cls.test_db))
@classmethod
def teardown_class(cls):
cls.execute_query_expect_success(cls.client, "DROP DATABASE {} CASCADE"
.format(cls.test_db))
super(TestOtelTraceSelectsDMLs, cls).teardown_class()
def setup_method(self, method):
super(TestOtelTraceSelectsDMLs, self).setup_method(method)
self.client.clear_configuration()
def test_beeswax_no_trace(self):
"""Since tracing Beeswax queries is not supported, tests that no trace is created
when executing a query using the Beeswax protocol."""
with self.create_impala_client(protocol=BEESWAX) as client:
self.execute_query_expect_success(client,
"SELECT COUNT(*) FROM functional.alltypes")
sleep(2) # Wait for any spans to be flushed to the trace file.
line_count = count_lines(self.trace_file_path, True)
assert line_count - self.trace_file_count == 0
def test_query_success(self):
"""Test that OpenTelemetry tracing is working by running a simple query and
checking that the trace file is created and contains spans."""
result = self.execute_query_expect_success(self.client,
"SELECT COUNT(*) FROM functional.alltypes")
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml")
def test_invalid_sql(self):
"""Asserts that queries with invalid SQL still generate the expected traces."""
query = "SELECT * FROM functional.alltypes WHERE field_does_not_exist=1"
self.execute_query_expect_failure(self.client, query)
# Retrieve the query id and runtime profile from the UI since the query execute call
# only returns a HiveServer2Error object and not the query id or profile.
query_id, profile = self.query_id_from_ui(section="completed_queries",
match_query=query)
self.assert_trace(
query_id=query_id,
query_profile=profile,
cluster_id="select_dml",
trace_cnt=1,
err_span="Planning",
missing_spans=["AdmissionControl", "QueryExecution"])
def test_cte_query_success(self):
"""Test that OpenTelemetry tracing is working by running a simple query that uses a
common table expression and checking that the trace file is created and contains
expected spans with the expected attributes."""
result = self.execute_query_expect_success(self.client,
"WITH alltypes_tiny1 AS (SELECT * FROM functional.alltypes WHERE tinyint_col=1 "
"LIMIT 10) SELECT id FROM alltypes_tiny1")
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml")
def test_query_exec_fail(self):
"""Test that OpenTelemetry tracing is working by running a query that fails during
execution and checking that the trace file is created and contains expected spans
with the expected attributes."""
query = "SELECT 1 FROM functional.alltypessmall a JOIN functional.alltypessmall b " \
"ON a.id != b.id"
self.execute_query_expect_failure(self.client,
query,
{
"abort_on_error": 1,
"batch_size": 0,
"debug_action": "3:PREPARE:FAIL",
"disable_codegen": 0,
"disable_codegen_rows_threshold": 0,
"exec_single_node_rows_threshold": 0,
"mt_dop": 4,
"num_nodes": 0,
"test_replan": 1
})
query_id, profile = self.query_id_from_ui(section="completed_queries",
match_query=query)
self.assert_trace(
query_id=query_id,
query_profile=profile,
cluster_id="select_dml",
err_span="QueryExecution",
async_close=True)
def test_select_timeout(self):
"""Asserts queries that timeout generate the expected traces."""
query = "SELECT * FROM functional.alltypes WHERE id=sleep(5000)"
self.execute_query_expect_failure(self.client, query, {"exec_time_limit_s": "1"})
# Retrieve the query id and runtime profile from the UI since the query execute call
# only returns a HiveServer2Error object and not the query id or profile.
query_id, profile = self.query_id_from_ui(section="completed_queries",
match_query=query)
self.assert_trace(
query_id=query_id,
query_profile=profile,
cluster_id="select_dml",
trace_cnt=1,
err_span="QueryExecution",
async_close=True)
def test_select_empty(self):
"""Asserts empty queries do not generate traces."""
self.execute_query_expect_failure(self.client, "")
# Run a query that will succeed to ensure all traces have been flushed.
result = self.execute_query_expect_success(self.client, "select 1")
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=1,
err_span="QueryExecution",
async_close=True)
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
exact_trace_cnt=True)
def test_select_union(self):
"""Asserts queries with a union generate the expected traces."""
result = self.execute_query_expect_success(self.client, "SELECT * FROM "
"functional.alltypes LIMIT 5 UNION ALL SELECT * FROM functional.alltypessmall")
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
exact_trace_cnt=True)
def test_dml_timeout(self):
"""Asserts insert DMLs that timeout generate the expected traces."""
query = "INSERT INTO functional.alltypes (id, string_col, year, month) VALUES " \
"(99999, 'foo', 2025, 1)"
self.execute_query_expect_failure(self.client, query,
{"debug_action": "CRS_AFTER_COORD_STARTS:SLEEP@3000", "exec_time_limit_s": "1"})
# Retrieve the query id and runtime profile from the UI since the query execute call
# only returns a HiveServer2Error object and not the query id or profile.
query_id, profile = self.query_id_from_ui(section="completed_queries",
match_query=query)
self.assert_trace(
query_id=query_id,
query_profile=profile,
cluster_id="select_dml",
trace_cnt=1,
err_span="QueryExecution",
async_close=True)
def test_dml_update(self, unique_name):
"""Asserts update DMLs generate the expected traces."""
tbl = "{}.{}".format(self.test_db, unique_name)
self.execute_query_expect_success(self.client, "CREATE TABLE {} (id int, "
"string_col string) STORED AS ICEBERG TBLPROPERTIES('format-version'='2')"
.format(tbl))
self.execute_query_expect_success(self.client, "INSERT INTO {} (id, string_col) "
"VALUES (1, 'foo'), (2, 'bar')".format(tbl))
result = self.execute_query_expect_success(self.client,
"UPDATE {} SET string_col='a' WHERE id=1".format(tbl))
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=3)
def test_dml_delete(self, unique_name):
"""Asserts delete DMLs generate the expected traces."""
tbl = "{}.{}".format(self.test_db, unique_name)
self.execute_query_expect_success(self.client, "CREATE TABLE {} (id int, "
"string_col string) STORED AS ICEBERG TBLPROPERTIES('format-version'='2')"
.format(tbl))
self.execute_query_expect_success(self.client, "INSERT INTO {} (id, string_col) "
"VALUES (1, 'foo'), (2, 'bar')".format(tbl))
result = self.execute_query_expect_success(self.client,
"DELETE FROM {} WHERE id=1".format(tbl))
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=3)
def test_dml_delete_join(self, unique_name):
"""Asserts delete join DMLs generate the expected traces."""
tbl1 = "{}.{}_1".format(self.test_db, unique_name)
self.execute_query_expect_success(self.client, "CREATE TABLE {} STORED AS ICEBERG "
"TBLPROPERTIES('format-version'='2') AS SELECT id, bool_col, int_col, year, month "
"FROM functional.alltypes ORDER BY id limit 100".format(tbl1))
tbl2 = "{}.{}_2".format(self.test_db, unique_name)
self.execute_query_expect_success(self.client, "CREATE TABLE {} STORED AS ICEBERG "
"TBLPROPERTIES('format-version'='2') AS SELECT id, bool_col, int_col, year, month "
"FROM functional.alltypessmall ORDER BY id LIMIT 100".format(tbl2))
result = self.execute_query_expect_success(self.client, "DELETE t1 FROM {} t1 JOIN "
"{} t2 ON t1.id = t2.id WHERE t2.id < 5".format(tbl1, tbl2))
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=3)
def test_ignored_queries(self, unique_name):
"""Asserts queries that should not generate traces do not generate traces."""
tbl = "{}.{}".format(self.test_db, unique_name)
res_create = self.execute_query_expect_success(self.client,
"CREATE TABLE {} (a int)".format(tbl))
# These queries are not expected to have traces created for them.
ignore_queries = [
"COMMENT ON DATABASE {} IS 'test'".format(self.test_db),
"DESCRIBE {}".format(tbl),
"EXPLAIN SELECT * FROM {}".format(tbl),
"REFRESH FUNCTIONS functional",
"REFRESH functional.alltypes",
"SET ALL",
"SHOW TABLES IN {}".format(self.test_db),
"SHOW DATABASES",
"TRUNCATE TABLE {}".format(tbl),
"USE functional",
"VALUES (1, 2, 3)",
"/*comment1*/SET EXPLAIN_LEVEL=0",
"--comment1\nSET EXPLAIN_LEVEL=0"
]
for query in ignore_queries:
self.execute_query_expect_success(self.client, query)
# REFRESH AUTHORIZATION will error if authorization is not enabled. Since the test is
# only asserting that no trace is created, ignore any errors from the query.
try:
self.execute_query_expect_success(self.client, "REFRESH AUTHORIZATION")
except HiveServer2Error:
pass
# Run one more query that is expected to have a trace created to ensure that all
# traces have been flushed to the trace file.
res_drop = self.execute_query_expect_success(self.client, "DROP TABLE {} PURGE"
.format(tbl))
# Ensure the expected number of traces are present in the trace file.
# The expected line count is 2 because:
# 1. test runs create table
# 2. test runs drop table
wait_for_file_line_count(file_path=self.trace_file_path,
expected_line_count=2 + self.trace_file_count, max_attempts=10, sleep_time_s=1,
backoff=1, exact_match=True)
# Assert the traces for the create/drop table query to ensure both were created.
self.assert_trace(
query_id=res_create.query_id,
query_profile=res_create.runtime_profile,
cluster_id="select_dml",
trace_cnt=2,
missing_spans=["AdmissionControl"])
self.assert_trace(
query_id=res_drop.query_id,
query_profile=res_drop.runtime_profile,
cluster_id="select_dml",
trace_cnt=2,
missing_spans=["AdmissionControl"])
def test_dml_insert_success(self, unique_name):
"""Asserts successful insert DMLs generate the expected traces."""
self.execute_query_expect_success(self.client,
"CREATE TABLE {}.{} (id int, string_col string, int_col int)"
.format(self.test_db, unique_name))
result = self.execute_query_expect_success(self.client,
"INSERT INTO {}.{} (id, string_col, int_col) VALUES (1, 'a', 10), (2, 'b', 20), "
"(3, 'c', 30)".format(self.test_db, unique_name))
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=2)
def test_dml_insert_fail(self, unique_name):
"""Asserts failed insert DMLs generate the expected traces."""
self.execute_query_expect_success(self.client,
"CREATE TABLE {}.{} (id int, string_col string, int_col int)"
.format(self.test_db, unique_name))
fail_query = "INSERT INTO {}.{} (id, string_col, int_col) VALUES (1, 'a', 10), " \
"(2, 'b', 20), (3, 'c', 30)".format(self.test_db, unique_name)
self.execute_query_expect_failure(self.client, fail_query,
{"debug_action": "0:OPEN:FAIL"})
query_id, profile = self.query_id_from_ui(section="completed_queries",
match_query=fail_query)
self.assert_trace(
query_id=query_id,
query_profile=profile,
cluster_id="select_dml",
trace_cnt=2,
err_span="QueryExecution")
def test_dml_insert_cte_success(self, unique_name):
"""Asserts insert DMLs that use a CTE generate the expected traces."""
self.execute_query_expect_success(self.client,
"CREATE TABLE {}.{} (id int)".format(self.test_db, unique_name))
result = self.execute_query_expect_success(self.client,
"WITH a1 AS (SELECT * FROM functional.alltypes WHERE tinyint_col=1 limit 10) "
"INSERT INTO {}.{} SELECT id FROM a1".format(self.test_db, unique_name))
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=2)
def test_dml_insert_overwrite(self, unique_name):
"""Test that OpenTelemetry tracing is working by running an insert overwrite query and
checking that the trace file is created and contains expected spans with the
expected attributes."""
self.execute_query_expect_success(self.client,
"CREATE TABLE {}.{} AS SELECT * FROM functional.alltypes WHERE id < 500 ".format(
self.test_db, unique_name))
result = self.execute_query_expect_success(self.client,
"INSERT OVERWRITE TABLE {}.{} SELECT * FROM functional.alltypes WHERE id > 500 "
"AND id < 1000".format(self.test_db, unique_name))
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=2)
def test_debug_logs(self):
"""Asserts OpenTelemetry SDK debug logs are written to the Impalad INFO log when
vlog is set to 2."""
result = self.execute_query_expect_success(self.client,
"SELECT COUNT(*) FROM functional.alltypes")
assert result.success
# Assert the trace exporter debug log lines were emitted.
self.assert_impalad_log_contains("INFO", r"otel-log-handler.cc:\d+\] \[OTLP TRACE "
r"FILE Exporter\] Export \d+ trace span\(s\) success file=\".*?"
r"\/opentelemetry-cpp-\d+.\d+.\d+\/exporters\/otlp\/src\/otlp_file_exporter.cc\" "
r"line=\"\d+\"", -1)
self.assert_impalad_log_contains("INFO", r"otel-log-handler.cc:\d+\] \[OTLP FILE "
r"Client\] Write body\(Json\).*? file=\".*?"
r"\/opentelemetry-cpp-\d+.\d+.\d+\/exporters\/otlp\/src\/otlp_file_client.cc\" "
r"line=\"\d+\"", -1)
class TestOtelTraceSelectQueued(TestOtelTraceBase):
"""Tests that require setting additional startup flags to assert admission control
queueing behavior. The cluster must be restarted after each test to apply the
new flags."""
@CustomClusterTestSuite.with_args(
impalad_args="-v=2 --cluster_id=select_queued --default_pool_max_requests=1 {}"
.format(TRACE_FLAGS),
cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True)
def test_select_queued(self):
"""Asserts a query that is queued in admission control then completes successfully
generates the expected trace."""
# Launch two queries, the second will be queued until the first completes.
query = "SELECT * FROM functional.alltypes WHERE id = 1"
handle1 = self.client.execute_async("{} AND int_col = SLEEP(5000)".format(query))
self.client.wait_for_impala_state(handle1, RUNNING, 60)
query_id_1 = self.client.handle_id(handle1)
handle2 = self.client.execute_async(query)
query_id_2 = self.client.handle_id(handle2)
self.client.wait_for_impala_state(handle1, FINISHED, 60)
self.client.fetch(None, handle1)
query_profile_1 = self.client.get_runtime_profile(handle1)
self.client.close_query(handle1)
self.client.wait_for_impala_state(handle2, FINISHED, 60)
self.client.fetch(None, handle2)
query_profile_2 = self.client.get_runtime_profile(handle2)
self.client.close_query(handle2)
self.assert_trace(
query_id=query_id_1,
query_profile=query_profile_1,
cluster_id="select_queued",
trace_cnt=3)
self.assert_trace(
query_id=query_id_2,
query_profile=query_profile_2,
cluster_id="select_queued",
trace_cnt=3)
@CustomClusterTestSuite.with_args(
impalad_args="-v=2 --cluster_id=cancel_adm_ctl --default_pool_max_requests=1 {}"
.format(TRACE_FLAGS), cluster_size=1, tmp_dir_placeholders=[OUT_DIR],
disable_log_buffering=True)
def test_cancel_during_admission_control(self):
"""Tests that a query that is queued in admission control can be cancelled and
the trace is created with the expected spans and events."""
# Start a long-running query that will take up the only admission control slot.
handle1 = self.client.execute_async("SELECT * FROM functional.alltypes WHERE id = "
"SLEEP(5000)")
self.client.wait_for_impala_state(handle1, RUNNING, 60)
# Start a second query that will be queued and then cancelled.
handle2 = self.client.execute_async("SELECT * FROM functional.alltypes")
self.client.wait_for_impala_state(handle2, PENDING, 30)
query_id = self.client.handle_id(handle2)
# Cancel the second query while it is queued.
self.execute_query_expect_success(self.client, "KILL QUERY '{}'".format(query_id))
self.client.wait_for_impala_state(handle2, ERROR, 30)
query_profile = self.client.get_runtime_profile(handle2)
self.assert_trace(
query_id=query_id,
query_profile=query_profile,
cluster_id="cancel_adm_ctl",
trace_cnt=1,
err_span="AdmissionControl",
missing_spans=["QueryExecution"],
async_close=True)
class TestOtelTraceSelectRetry(TestOtelTraceBase):
"""Tests the require ending an Impala daemon and thus the cluster must restart after
each test."""
@CustomClusterTestSuite.with_args(
impalad_args="-v=2 --cluster_id=retry_select_success {}".format(TRACE_FLAGS),
cluster_size=3, num_exclusive_coordinators=1, tmp_dir_placeholders=[OUT_DIR],
disable_log_buffering=True,
statestored_args="-statestore_heartbeat_frequency_ms=60000")
def test_retry_select_success(self):
"""Asserts select queries that are successfully retried generate the expected
traces."""
self.cluster.impalads[1].kill()
result = self.execute_query_expect_success(self.client,
"SELECT COUNT(*) FROM tpch_parquet.lineitem WHERE l_orderkey < 50",
{"RETRY_FAILED_QUERIES": True})
retried_query_id = parse_query_id(result.runtime_profile)
orig_query_profile = self.query_profile_from_ui(result.query_id)
# Assert the trace from the original query.
self.assert_trace(
query_id=result.query_id,
query_profile=orig_query_profile,
cluster_id="retry_select_success",
trace_cnt=2,
err_span="QueryExecution",
async_close=True)
# Assert the trace from the retried query.
self.assert_trace(
query_id=retried_query_id,
query_profile=result.runtime_profile,
cluster_id="retry_select_success",
trace_cnt=2,
missing_spans=["Submitted", "Planning"],
async_close=True)
@CustomClusterTestSuite.with_args(
impalad_args="-v=2 --cluster_id=retry_select_failed {}".format(TRACE_FLAGS),
cluster_size=3, num_exclusive_coordinators=1, tmp_dir_placeholders=[OUT_DIR],
disable_log_buffering=True,
statestored_args="-statestore_heartbeat_frequency_ms=1000")
def test_retry_select_failed(self):
"""Asserts select queries that are retried but ultimately fail generate the expected
traces."""
with self.create_impala_client() as client:
client.set_configuration({"retry_failed_queries": "true"})
# Launch a shuffle heavy query, it should be retried.
handle = client.execute_async("SELECT * FROM tpch.lineitem t1, tpch.lineitem "
"t2 WHERE t1.l_orderkey = t2.l_orderkey ORDER BY t1.l_orderkey, t2.l_orderkey "
"LIMIT 1")
client.wait_for_impala_state(handle, RUNNING, 60)
query_id = client.handle_id(handle)
self.cluster.impalads[1].kill()
# Wait until the retry is running.
def __wait_until_retried():
return parse_retry_status(self.query_profile_from_ui(query_id)) == "RETRIED"
retry(__wait_until_retried, 60, 1, 1, False)
# Kill another impalad so that another retry is attempted.
self.cluster.impalads[2].kill()
# Wait until the query fails.
client.wait_for_impala_state(handle, ERROR, 60)
retried_query_profile = client.get_runtime_profile(handle)
retried_query_id = parse_query_id(retried_query_profile)
orig_query_profile = self.query_profile_from_ui(query_id)
client.close_query(handle)
# Assert the trace from the original query.
self.assert_trace(
query_id=query_id,
query_profile=orig_query_profile,
cluster_id="retry_select_failed",
trace_cnt=2,
err_span="QueryExecution",
async_close=True)
# Assert the trace from the retried query.
self.assert_trace(
query_id=retried_query_id,
query_profile=retried_query_profile,
cluster_id="retry_select_failed",
trace_cnt=2,
err_span="QueryExecution",
missing_spans=["Submitted", "Planning"],
async_close=True)
@CustomClusterTestSuite.with_args(
impalad_args="-v=2 --cluster_id=trace_ddl {}".format(TRACE_FLAGS),
cluster_size=2, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True)
class TestOtelTraceDDLs(TestOtelTraceBase):
"""Tests that exercise OpenTelemetry tracing behavior on DDLs. These tests are in their
own class because they require an additional test dimension for async DDLs,"""
@classmethod
def setup_class(cls):
super(TestOtelTraceDDLs, cls).setup_class()
cls.test_db = "test_otel_trace_ddls_{}_{}".format(
datetime.now().strftime("%Y%m%d%H%M%S"),
"".join(choice(ascii_lowercase) for _ in range(7)))
cls.execute_query_expect_success(cls.client, "CREATE DATABASE {}".format(cls.test_db))
@classmethod
def teardown_class(cls):
cls.execute_query_expect_success(cls.client, "DROP DATABASE {} CASCADE"
.format(cls.test_db))
super(TestOtelTraceDDLs, cls).teardown_class()
@classmethod
def add_test_dimensions(cls):
super(TestOtelTraceDDLs, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('async_ddl', True, False))
def test_ddl_createdb(self, vector, unique_name):
"""Asserts a successful create database and drop database generate the expected
traces."""
try:
result = self.execute_query_expect_success(self.client,
"CREATE DATABASE {}".format(unique_name),
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="trace_ddl",
missing_spans=["AdmissionControl"])
finally:
result = self.execute_query_expect_success(self.client,
"DROP DATABASE IF EXISTS {}".format(unique_name),
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="trace_ddl",
trace_cnt=2,
missing_spans=["AdmissionControl"])
def test_ddl_create_alter_table(self, vector, unique_name):
"""Tests that traces are created for a successful create table, a successful alter
table, and a failed alter table (adding a column that already exists)."""
create_result = self.execute_query_expect_success(self.client,
"CREATE TABLE {}.{} (id int, string_col string, int_col int)"
.format(self.test_db, unique_name),
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
alter_success_result = self.execute_query_expect_success(self.client, "ALTER TABLE "
"{}.{} ADD COLUMNS (new_col string)".format(self.test_db, unique_name),
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
fail_query = "ALTER TABLE {}.{} ADD COLUMNS (new_col string)" \
.format(self.test_db, unique_name)
self.execute_query_expect_failure(self.client, fail_query,
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
fail_query_id, fail_profile = self.query_id_from_ui(section="completed_queries",
match_query=fail_query)
self.assert_trace(
query_id=create_result.query_id,
query_profile=create_result.runtime_profile,
cluster_id="trace_ddl",
trace_cnt=3,
missing_spans=["AdmissionControl"])
self.assert_trace(
query_id=alter_success_result.query_id,
query_profile=alter_success_result.runtime_profile,
cluster_id="trace_ddl",
trace_cnt=3,
missing_spans=["AdmissionControl"])
self.assert_trace(
query_id=fail_query_id,
query_profile=fail_profile,
cluster_id="trace_ddl",
trace_cnt=3,
missing_spans=["AdmissionControl", "QueryExecution"],
err_span="Planning")
def test_ddl_createtable_fail(self, vector, unique_name):
"""Asserts a failed create table generates the expected trace."""
query = "CREATE TABLE {}.{} AS (SELECT * FROM functional.alltypes LIMIT 1)" \
.format(self.test_db, unique_name)
self.execute_query_expect_failure(self.client, query,
{"debug_action": "CLIENT_REQUEST_UPDATE_CATALOG:FAIL",
"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
query_id, profile = self.query_id_from_ui(section="completed_queries",
match_query=query)
self.assert_trace(
query_id=query_id,
query_profile=profile,
cluster_id="trace_ddl",
trace_cnt=1,
missing_spans=["AdmissionControl"],
err_span="QueryExecution")
def test_ddl_createtable_cte_success(self, vector, unique_name):
"""Asserts create table queries that use a CTE generate the expected traces."""
result = self.execute_query_expect_success(self.client,
"CREATE TABLE {}.{} AS WITH a1 AS (SELECT * FROM functional.alltypes WHERE "
"tinyint_col=1 LIMIT 10) SELECT id FROM a1".format(self.test_db, unique_name),
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="trace_ddl",
trace_cnt=1,
missing_spans=["AdmissionControl"])
def test_compute_stats(self, vector, unique_name):
"""The compute stats queries are a special case. These statements run two separate
select queries. Locate both select queries on the UI and assert their traces."""
tbl_name = "{}.{}_alltypes".format(self.test_db, unique_name)
# Setup a test table to ensure calculating stats on an existing table does not impact
# other tests.
self.execute_query_expect_success(self.client, "CREATE TABLE {} PARTITIONED BY "
"(year, month) AS SELECT * FROM functional.alltypes".format(tbl_name))
compute_result = self.execute_query_expect_success(self.client,
"COMPUTE STATS {}".format(tbl_name),
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
query_id_cnt, profile_cnt = self.query_id_from_ui(section="completed_queries",
match_query="SELECT COUNT(*), `year`, `month` FROM {} GROUP BY "
"`year`, `month`".format(tbl_name))
query_id_ndv, profile_ndv = self.query_id_from_ui(section="completed_queries",
match_func=lambda q: q['stmt'].startswith("SELECT NDV(id)"))
self.assert_trace(
query_id=compute_result.query_id,
query_profile=compute_result.runtime_profile,
cluster_id="trace_ddl",
trace_cnt=4,
missing_spans=["AdmissionControl"])
self.assert_trace(
query_id=query_id_cnt,
query_profile=profile_cnt,
cluster_id="trace_ddl",
trace_cnt=4)
self.assert_trace(
query_id=query_id_ndv,
query_profile=profile_ndv,
cluster_id="trace_ddl",
trace_cnt=4)
def test_compute_incremental_stats(self, vector, unique_name):
"""Asserts compute incremental stats queries generate the expected traces."""
tbl_name = "{}.{}_alltypes".format(self.test_db, unique_name)
# Setup a test table to ensure calculating stats on an existing table does not impact
# other tests.
self.execute_query_expect_success(self.client, "CREATE TABLE {} PARTITIONED BY "
"(year, month) AS SELECT * FROM functional.alltypes".format(tbl_name))
result = self.execute_query_expect_success(self.client,
"COMPUTE INCREMENTAL STATS {} PARTITION (month=1)".format(tbl_name),
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="trace_ddl",
trace_cnt=2,
missing_spans=["AdmissionControl"])
def test_invalidate_metadata(self, vector):
"""Asserts invalidate metadata queries generate the expected traces."""
result = self.execute_query_expect_success(self.client,
"INVALIDATE METADATA functional.alltypes",
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="trace_ddl",
trace_cnt=1,
missing_spans=["AdmissionControl"])