IMPALA-14282: Workload Management Custom Cluster Tests Use New Utility Functions

Consume various utility functions added as part of previous changes.

Testing accomplished by running exhaustive tests in
test_query_log.py, test_query_live.py, and test_otel_trace.py both
locally and in jenkins.

Change-Id: If42a8b5b6fdb43fb2bb37dd2a3be4668e8a5e283
Reviewed-on: http://gerrit.cloudera.org:8080/23234
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
jasonmfehr
2025-07-31 16:54:47 -07:00
committed by Impala Public Jenkins
parent 0292e296c0
commit 88eb907795
2 changed files with 42 additions and 47 deletions

View File

@@ -109,16 +109,21 @@ def parse_retried_query_id(profile_text):
return retried_query_id.group(1)
def parse_num_rows_fetched(profile_text):
def parse_num_rows_fetched(profile_text, missing_ok=False):
"""Parses the number of rows fetched from the query profile text."""
num_rows_fetched = re.search(r'\n\s+\-\sNumRowsFetched:\s+(\d+)', profile_text)
num_rows_fetched = re.search(r'\n\s+\-\s+NumRowsFetched:\s+\S+\s+\((\d+)\)',
profile_text)
if missing_ok and num_rows_fetched is None:
return None
assert num_rows_fetched is not None, "Number of Rows Fetched not found in query profile"
return int(num_rows_fetched.group(1))
def parse_admission_result(profile_text):
def parse_admission_result(profile_text, missing_ok=False):
"""Parses the admission result from the query profile text."""
admission_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', profile_text)
if missing_ok and admission_result is None:
return None
assert admission_result is not None, "Admission Result not found in query profile"
return admission_result.group(1)

View File

@@ -16,17 +16,28 @@
# under the License.
from __future__ import absolute_import, division, print_function
from datetime import datetime
import os
import re
import requests
from datetime import datetime
from time import sleep, time
from impala_thrift_gen.SystemTables.ttypes import TQueryTableColumn
from tests.util.assert_time import assert_time_str, convert_to_milliseconds
from tests.util.memory import assert_byte_str, convert_to_bytes
from tests.util.query_profile_util import (
parse_admission_result,
parse_coordinator,
parse_db_user,
parse_default_db,
parse_impala_query_state,
parse_num_rows_fetched,
parse_query_id,
parse_query_state,
parse_query_status,
parse_query_type,
parse_session_id,
parse_sql,
)
DEDICATED_COORD_SAFETY_BUFFER_BYTES = 104857600
WM_DB = "sys"
@@ -61,16 +72,11 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None,
if query_id is not None:
assert impalad is not None
assert raw_profile is None, "cannot specify both query_id and raw_profile"
resp = requests.get("http://{0}:{1}/query_profile_plain_text?query_id={2}"
.format(impalad.hostname, impalad.get_webserver_port(), query_id))
assert resp.status_code == 200, "Response code was: {0}".format(resp.status_code)
profile_text = resp.text
profile_text = impalad.service.read_query_profile_page(query_id)
else:
profile_text = raw_profile
assert query_id is None, "cannot specify both raw_profile and query_id"
match = re.search(r'Query \(id=(.*?)\)', profile_text)
assert match is not None
query_id = match.group(1)
query_id = parse_query_id(profile_text)
print("Query Id: {0}".format(query_id))
profile_lines = profile_text.split("\n")
@@ -133,9 +139,7 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None,
assert column_val(TQueryTableColumn.QUERY_ID) == query_id
# Session ID
session_id = re.search(r'\n\s+Session ID:\s+(.*)\n', profile_text)
assert session_id is not None
assert column_val(TQueryTableColumn.SESSION_ID) == session_id.group(1),\
assert column_val(TQueryTableColumn.SESSION_ID) == parse_session_id(profile_text), \
"session id incorrect"
# Session Type
@@ -154,9 +158,8 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None,
assert value == ""
# Database User
user = re.search(r'\n\s+User:\s+(.*?)\n', profile_text)
assert user is not None
assert column_val(TQueryTableColumn.DB_USER) == user.group(1), "db user incorrect"
assert column_val(TQueryTableColumn.DB_USER) == parse_db_user(profile_text), \
"db user incorrect"
# Connected Database User
db_user = re.search(r'\n\s+Connected User:\s+(.*?)\n', profile_text)
@@ -165,46 +168,35 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None,
"db user connection incorrect"
# Database Name
default_db = re.search(r'\n\s+Default Db:\s+(.*?)\n', profile_text)
assert default_db is not None
assert column_val(TQueryTableColumn.DB_NAME) == default_db.group(1),\
default_db = parse_default_db(profile_text)
assert column_val(TQueryTableColumn.DB_NAME) == default_db, \
"database name incorrect"
# Coordinator
coordinator = re.search(r'\n\s+Coordinator:\s+(.*?)\n', profile_text)
assert coordinator is not None
assert column_val(TQueryTableColumn.IMPALA_COORDINATOR) == coordinator.group(1),\
coordinator = parse_coordinator(profile_text)
assert column_val(TQueryTableColumn.IMPALA_COORDINATOR) == coordinator, \
"impala coordinator incorrect"
# Query Status (can be multiple lines if the query errored)
query_status = re.search(r'\n\s+Query Status:\s+(.*?)\n\s+Impala Version', profile_text,
re.DOTALL)
assert query_status is not None
assert column_val(TQueryTableColumn.QUERY_STATUS) == query_status.group(1),\
assert column_val(TQueryTableColumn.QUERY_STATUS) == parse_query_status(profile_text), \
"query status incorrect"
# Query State
query_state = re.search(r'\n\s+Query State:\s+(.*?)\n', profile_text)
assert query_state is not None
query_state_value = query_state.group(1)
query_state_value = parse_query_state(profile_text)
assert column_val(TQueryTableColumn.QUERY_STATE) == query_state_value,\
"query state incorrect"
# Impala Query End State
impala_query_state = re.search(r'\n\s+Impala Query State:\s+(.*?)\n', profile_text)
assert impala_query_state is not None
assert column_val(TQueryTableColumn.IMPALA_QUERY_END_STATE) \
== impala_query_state.group(1), "impala query end state incorrect"
== parse_impala_query_state(profile_text), "impala query end state incorrect"
# Query Type
value = column_val(TQueryTableColumn.QUERY_TYPE)
if query_state_value == "EXCEPTION":
assert value == "UNKNOWN", "query type incorrect"
else:
query_type = re.search(r'\n\s+Query Type:\s+(.*?)\n', profile_text)
assert query_type is not None
assert value == query_type.group(1), "query type incorrect"
query_type = query_type.group(1)
query_type = parse_query_type(profile_text)
assert value == query_type, "query type incorrect"
# Client Network Address
network_address = re.search(r'\n\s+Network Address:\s+(.*?)\n', profile_text)
@@ -332,13 +324,13 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None,
# Admission Result
value = column_val(TQueryTableColumn.ADMISSION_RESULT)
adm_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', profile_text)
adm_result = parse_admission_result(profile_text, True)
if query_state_value == "EXCEPTION" or query_type == "DDL":
assert adm_result is None
assert value == "", "admission result incorrect"
else:
assert adm_result is not None
assert value == adm_result.group(1), "admission result incorrect"
assert value == adm_result, "admission result incorrect"
# Cluster Memory Admitted
value = column_val(TQueryTableColumn.CLUSTER_MEMORY_ADMITTED)
@@ -390,12 +382,12 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None,
# Rows Fetched
value = column_val(TQueryTableColumn.NUM_ROWS_FETCHED)
rows_fetched = re.search(r'\n\s+\-\s+NumRowsFetched:\s+\S+\s+\((\d+)\)', profile_text)
rows_fetched = parse_num_rows_fetched(profile_text, True)
if query_state_value == "EXCEPTION":
assert rows_fetched is None
else:
assert rows_fetched is not None
assert value == rows_fetched.group(1)
assert value == str(rows_fetched)
# Row Materialization Rate
value = column_val(TQueryTableColumn.ROW_MATERIALIZATION_ROWS_PER_SEC)
@@ -597,9 +589,7 @@ def assert_query(query_tbl, client, expected_cluster_id="", raw_profile=None,
"pernode peak memory mean incorrect"
# SQL statement
sql_stmt = re.search(r'\n\s+Sql Statement:\s+(.*?)\n', profile_text)
assert sql_stmt is not None
assert column_val(TQueryTableColumn.SQL) == sql_stmt.group(1), "sql incorrect"
assert column_val(TQueryTableColumn.SQL) == parse_sql(profile_text), "sql incorrect"
# Query Plan
value = column_val(TQueryTableColumn.PLAN)