mirror of
https://github.com/apache/impala.git
synced 2025-12-19 09:58:28 -05:00
Before this patch, ImpylaHS2Connection unconditionally opened a cursor (and HS2 session) as it connected, followed by running a "SET ALL" query to populate the default query options. This patch changes the behavior of ImpylaHS2Connection to open the default cursor only when querying is needed for the first time. This helps preserve assertions for a test that is sensitive about client connection, like IMPALA-13925. Default query options are now parsed from newly instantiated TQueryOptions object rather than issuing a "SET ALL" query or making BeeswaxService.get_default_configuration() RPC. Fix test_query_profile_contains_query_compilation_metadata_cached_event slightly by setting the 'sync_ddl' option because the test is flaky without it. Tweak test_max_hs2_sessions_per_user to run queries so that sessions will open. Deduplicate test cases between utc-timestamp-functions.test and local-timestamp-functions.test. Rename TestUtcTimestampFunctions to TestTimestampFunctions, and expand it to also tests local-timestamp-functions.test and file-formats-with-local-tz-conversion.test. The table_format is now contrained to 'test/none' because it is unnecessary to permute other table_format. Deprecate 'use_local_tz_for_unix_timestamp_conversions' in favor of query option with the same name. Filed IMPALA-13953 to update the documentation of 'use_local_tz_for_unix_timestamp_conversions' flag/option. Testing: Run and pass a few pytests such as: test_admission_controller.py test_observability.py test_runtime_filters.py test_session_expiration.py. test_set.py Change-Id: I9d5e3e5c11ad386b7202431201d1a4cff46cbff5 Reviewed-on: http://gerrit.cloudera.org:8080/22731 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
222 lines
9.9 KiB
Python
222 lines
9.9 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# Tests for query expiration.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import pytest
|
|
import socket
|
|
|
|
import re
|
|
from time import sleep
|
|
|
|
from impala.dbapi import connect
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.common.impala_cluster import DEFAULT_HS2_PORT
|
|
from tests.util.thrift_util import op_handle_to_query_id
|
|
|
|
|
|
class TestSessionExpiration(CustomClusterTestSuite):
|
|
"""Tests query expiration logic"""
|
|
PROFILE_PAGE = "http://localhost:{0}/query_profile?query_id={1}&json"
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--idle_session_timeout=6 "
|
|
"--idle_client_poll_period_s=0")
|
|
def test_session_expiration(self):
|
|
impalad = self.cluster.get_any_impalad()
|
|
self.close_impala_clients()
|
|
num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired")
|
|
num_connections = impalad.service.get_metric_value(
|
|
"impala.thrift-server.beeswax-frontend.connections-in-use")
|
|
client = impalad.service.create_hs2_client()
|
|
client.execute('select 1')
|
|
# Sleep for half the expiration time to confirm that the session is not expired early
|
|
# (see IMPALA-838)
|
|
sleep(3)
|
|
assert client is not None
|
|
assert num_expired == impalad.service.get_metric_value(
|
|
"impala-server.num-sessions-expired")
|
|
# Wait for session expiration. Impala will poll the session expiry queue every second
|
|
impalad.service.wait_for_metric_value(
|
|
"impala-server.num-sessions-expired", num_expired + 1, 20)
|
|
# Verify that the idle connection is not closed.
|
|
assert 1 + num_connections == impalad.service.get_metric_value(
|
|
"impala.thrift-server.hiveserver2-frontend.connections-in-use")
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--idle_session_timeout=3 "
|
|
"--idle_client_poll_period_s=0")
|
|
def test_session_expiration_with_set(self):
|
|
impalad = self.cluster.get_any_impalad()
|
|
self.close_impala_clients()
|
|
num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired")
|
|
|
|
# Test if we can set a shorter timeout than the process-wide option
|
|
client = impalad.service.create_hs2_client()
|
|
client.execute("SET IDLE_SESSION_TIMEOUT=1")
|
|
sleep(2.5)
|
|
assert num_expired + 1 == impalad.service.get_metric_value(
|
|
"impala-server.num-sessions-expired")
|
|
|
|
# Test if we can set a longer timeout than the process-wide option
|
|
client = impalad.service.create_hs2_client()
|
|
client.execute("SET IDLE_SESSION_TIMEOUT=10")
|
|
sleep(5)
|
|
assert num_expired + 1 == impalad.service.get_metric_value(
|
|
"impala-server.num-sessions-expired")
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--idle_session_timeout=5 "
|
|
"--idle_client_poll_period_s=0")
|
|
def test_unsetting_session_expiration(self):
|
|
impalad = self.cluster.get_any_impalad()
|
|
self.close_impala_clients()
|
|
num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired")
|
|
|
|
# Test unsetting IDLE_SESSION_TIMEOUT
|
|
client = impalad.service.create_hs2_client()
|
|
client.execute("SET IDLE_SESSION_TIMEOUT=1")
|
|
|
|
# Unset to 5 sec
|
|
client.execute('SET IDLE_SESSION_TIMEOUT=""')
|
|
sleep(2)
|
|
# client session should be alive at this point
|
|
assert num_expired == impalad.service.get_metric_value(
|
|
"impala-server.num-sessions-expired")
|
|
sleep(5)
|
|
# now client should have expired
|
|
assert num_expired + 1 == impalad.service.get_metric_value(
|
|
"impala-server.num-sessions-expired")
|
|
|
|
def _get_fast_timeout_cursor_from_hs2_client(self, connection, idle_session_timeout=3):
|
|
"""Get a fast timing out HiveServer2Cursor from a HiveServer2Connection."""
|
|
cursor = connection.cursor()
|
|
# Set disable the trivial query otherwise "select 1" would be admitted as a
|
|
# trivial query.
|
|
cursor.execute('set enable_trivial_query_for_admission=false')
|
|
cursor.execute('set idle_session_timeout={}'.format(idle_session_timeout))
|
|
return cursor
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--default_pool_max_requests=1 "
|
|
"--idle_client_poll_period_s=0")
|
|
def test_session_expiration_with_queued_query(self):
|
|
"""Ensure that a query waiting in queue gets cancelled if the session expires."""
|
|
# It is currently not possible to run two successive execute_async within single
|
|
# session using ImpylaHS2Connection. Therefore, we obtain 2 HiveServer2Cursor from
|
|
# HiveServer2Connection instead.
|
|
impalad = self.cluster.get_any_impalad()
|
|
with connect(port=impalad.service.hs2_port) as conn:
|
|
timeout = 3
|
|
debug_cursor = self._get_fast_timeout_cursor_from_hs2_client(conn, timeout)
|
|
queued_cursor = self._get_fast_timeout_cursor_from_hs2_client(conn, timeout)
|
|
debug_cursor.execute_async("select sleep(10000)")
|
|
queued_cursor.execute_async("select 1")
|
|
impalad.service.wait_for_metric_value(
|
|
"admission-controller.local-num-queued.default-pool", 1)
|
|
sleep(timeout)
|
|
impalad.service.wait_for_metric_value(
|
|
"admission-controller.local-num-queued.default-pool", 0)
|
|
impalad.service.wait_for_metric_value(
|
|
"admission-controller.agg-num-running.default-pool", 0)
|
|
queued_query_id = op_handle_to_query_id(queued_cursor._last_operation.handle)
|
|
assert queued_query_id is not None
|
|
json_summary = self.get_debug_page(
|
|
self.PROFILE_PAGE.format(impalad.service.webserver_port, queued_query_id))
|
|
assert "Admission result: Cancelled (queued)" in json_summary['profile']
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(impalad_args="--idle_session_timeout=10 "
|
|
"--idle_client_poll_period_s=1", cluster_size=1)
|
|
def test_closing_idle_connection(self):
|
|
""" IMPALA-7802: verifies that connections of idle sessions are closed
|
|
after the sessions have expired."""
|
|
impalad = self.cluster.get_any_impalad()
|
|
self.close_impala_clients()
|
|
|
|
for protocol in ['beeswax', 'hiveserver2']:
|
|
num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired")
|
|
num_connections_metrics_name = \
|
|
"impala.thrift-server.{0}-frontend.connections-in-use".format(protocol)
|
|
num_connections = impalad.service.get_metric_value(num_connections_metrics_name)
|
|
|
|
# Connect to Impala using either beeswax or HS2 client and verify the number of
|
|
# opened connections.
|
|
client = impalad.service.create_client(
|
|
protocol=('hs2' if protocol == 'hiveserver2' else protocol))
|
|
client.execute("select 1")
|
|
impalad.service.wait_for_metric_value(num_connections_metrics_name,
|
|
num_connections + 1, 20)
|
|
|
|
# Wait till the session has expired.
|
|
impalad.service.wait_for_metric_value("impala-server.num-sessions-expired",
|
|
num_expired + 1, 20)
|
|
# Wait till the idle connection is closed.
|
|
impalad.service.wait_for_metric_value(num_connections_metrics_name,
|
|
num_connections, 5)
|
|
|
|
# Verify that connecting to HS2 port without establishing a session will not cause
|
|
# the connection to be closed.
|
|
num_hs2_connections = impalad.service.get_metric_value(
|
|
"impala.thrift-server.hiveserver2-frontend.connections-in-use")
|
|
sock = socket.socket()
|
|
sock.connect((impalad._get_hostname(), DEFAULT_HS2_PORT))
|
|
impalad.service.wait_for_metric_value(
|
|
"impala.thrift-server.hiveserver2-frontend.connections-in-use",
|
|
num_hs2_connections + 1, 60)
|
|
# Sleep for some time for the frontend service thread to check for idleness.
|
|
sleep(15)
|
|
assert num_hs2_connections + 1 == impalad.service.get_metric_value(
|
|
"impala.thrift-server.hiveserver2-frontend.connections-in-use")
|
|
sock.close()
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--max_hs2_sessions_per_user=2")
|
|
def test_max_hs2_sessions_per_user(self):
|
|
"""Test that the --max_hs2_sessions_per_user flag restricts the total number of
|
|
sessions per user. Also checks that the per-user count of hs2 sessions can
|
|
be seen in the webui."""
|
|
impalad = self.cluster.get_first_impalad()
|
|
self.close_impala_clients()
|
|
# Create 2 clients.
|
|
client1 = impalad.service.create_hs2_client()
|
|
client2 = impalad.service.create_hs2_client()
|
|
# Run query to open session.
|
|
client1.execute('select "client1 should succeed"')
|
|
client2.execute('select "client2 should succeed"')
|
|
try:
|
|
# Trying to open a third session should fail.
|
|
client3 = impalad.service.create_hs2_client()
|
|
client3.execute('select "client3 should fail"')
|
|
assert False, "should have failed"
|
|
except Exception as e:
|
|
assert re.match(r".*Number of sessions for user \S+ exceeds coordinator limit 2",
|
|
str(e), re.DOTALL), "Unexpected exception: " + str(e)
|
|
|
|
# Test webui for hs2 sessions.
|
|
res = impalad.service.get_debug_webpage_json("/sessions")
|
|
assert res['num_sessions'] == 2
|
|
assert res['users'][0]['user'] is not None
|
|
assert res['users'][0]['session_count'] == 2
|
|
# Let queries finish, session count should go to zero.
|
|
sleep(6)
|
|
client1.close()
|
|
client2.close()
|
|
res = impalad.service.get_debug_webpage_json("/sessions")
|
|
assert res['num_sessions'] == 0
|