Files
impala/tests/util/web_pages_util.py
Riza Suminto c23d3cb58a IMPALA-14311: Fix ASAN issue in test_query_cancel_load_tables
verifiers.test_verify_metrics.TestValidateMetrics.test_metrics_are_zero
fails in ASAN tests. This is because a client session opened by
test_query_cancel_load_tables is not cleanly closed.

This patch fix the issue by creating the Impala client within the run
method of web_pages_util.py using with-as statement.

This patch also attempt to fix flakiness in test_query_expiration by
validating that time_limit_expire_handle is actually closed in
Coordinator log (IMPALA-13444).

Testing:
- Pass test_web_pages.py test in ASAN build.
- Loop and pass test_query_expiration 50x in ASAN build.

Change-Id: I6fcf13902478535f6fa15f267edc83891057993c
Reviewed-on: http://gerrit.cloudera.org:8080/23302
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-08-18 18:21:10 +00:00

120 lines
4.6 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from multiprocessing import Process, Queue
import json
import requests
from tests.util.retry import retry
def get_num_completed_backends(service, query_id):
"""Get the number of completed backends for the given query_id from the
'query_backends' web page."""
query_backend_url = 'query_backends?query_id={0}&json'.format(query_id)
query_backends_json = json.loads(service.read_debug_webpage(query_backend_url))
assert 'backend_states' in query_backends_json
num_complete_backends = 0
for backend_state in query_backends_json['backend_states']:
if backend_state['done']: num_complete_backends += 1
return num_complete_backends
def get_mem_admitted_backends_debug_page(cluster, ac_process=None):
"""Helper method assumes a cluster using a dedicated coordinator. Returns the mem
admitted to the backends extracted from the backends debug page of the coordinator
impala daemon. Returns a dictionary with the keys 'coordinator' and 'executor' and
their respective mem values in bytes. The entry for 'executor' is a list of the mem
admitted for each executor."""
# Based on how the cluster is setup, the first impalad in the cluster is the
# coordinator.
if ac_process is None:
ac_process = cluster.impalads[0]
response_json = ac_process.service.get_debug_webpage_json('backends')
assert 'backends' in response_json
assert len(response_json['backends']) >= 2
ret = dict()
ret['executor'] = []
from tests.verifiers.mem_usage_verifier import parse_mem_value
for backend in response_json['backends']:
if backend['is_coordinator']:
ret['coordinator'] = parse_mem_value(backend['mem_admitted'])
else:
ret['executor'].append(parse_mem_value(backend['mem_admitted']))
return ret
def cancel(impalad, query_id):
"""Cancel a query via /cancel_query on impalad."""
cancel_query_url = "http://{0}:{1}/cancel_query?json&query_id={2}"\
.format(impalad.webserver_interface, impalad.webserver_port, query_id)
response = requests.post(cancel_query_url)
assert response.status_code == requests.codes.ok
response_json = json.loads(response.text)
assert response_json['contents'] == "Query cancellation successful"
def wait_for_state(impalad, state):
"""Wait for the inflight query to reach 'state' on impalad. If state=None, wait for
no inflight queries."""
def is_state():
in_flight_queries = impalad.get_debug_webpage_json('queries')['in_flight_queries']
if state:
return len(in_flight_queries) > 0 and in_flight_queries[0]['state'] == state
else:
return len(in_flight_queries) <= 0
assert retry(is_state)
def assert_query_stopped(impalad, query_id):
"""Assert all queries are complete, and query_id is in completed_queries, on impalad."""
response_json = impalad.get_debug_webpage_json('queries')
assert response_json['num_in_flight_queries'] == 0
assert response_json['num_waiting_queries'] == 0
expected_queries = [q for q in response_json['completed_queries']
if q['query_id'] == query_id]
assert len(expected_queries) == 1
def run(test_class, query, options, queue):
"""Execute query and put results or errors into queue."""
with test_class.create_impala_client() as client:
if options:
client.set_configuration(options)
try:
queue.put(str(client.execute(query)))
except Exception as ex:
queue.put(str(ex))
def start(test_class, query, options=None):
"""Execute a query in a separate process. Returns (process, queue)."""
queue = Queue()
proc = Process(target=run, args=(test_class, query, options, queue))
proc.start()
return proc, queue
def join(proc, queue):
"""Returns result from queue returned by 'start', and waits for proc to complete."""
result = queue.get()
proc.join()
return result