Files
impala/tests/custom_cluster/test_exchange_eos.py
Riza Suminto 00dc79adf6 IMPALA-13907: Remove reference to create_beeswax_client
This patch replace create_beeswax_client() reference to
create_hs2_client() or vector-based client creation to prepare towards
hs2 test migration.

test_session_expiration_with_queued_query is changed to use impala.dbapi
directly from Impyla due to limitation in ImpylaHS2Connection.

TestAdmissionControllerRawHS2 is migrated to use hs2 as default test
protocol.

Modify test_query_expiration.py to set query option through client
instead of SET query. test_query_expiration is slightly modified due to
behavior difference in hs2 ImpylaHS2Connection.

Remove remaining reference to BeeswaxConnection.QueryState.

Fixed a bug in ImpylaHS2Connection.wait_for_finished_timeout().

Fix some easy flake8 issues caught thorugh this command:
git show HEAD --name-only | grep '^tests.*py' \
  | xargs -I {} impala-flake8 {} \
  | grep -e U100 -e E111 -e E301 -e E302 -e E303 -e F...

Testing:
- Pass exhaustive tests.

Change-Id: I1d84251835d458cc87fb8fedfc20ee15aae18d51
Reviewed-on: http://gerrit.cloudera.org:8080/22700
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-03-29 18:37:45 +00:00

69 lines
2.9 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.impala_cluster import ImpalaCluster
from tests.verifiers.metric_verifier import MetricVerifier
class TestExchangeEos(CustomClusterTestSuite):
""" Test to verify that the senders' fragments get unblocked and run to completion
after exchange node hits eos"""
@classmethod
def get_workload(cls):
return 'tpch'
@classmethod
def add_test_dimensions(cls):
super(CustomClusterTestSuite, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(cluster_size=9, num_exclusive_coordinators=1)
def test_exchange_eos(self, vector):
""" Test IMPALA-8845: runs with result spooling enabled and defers the fetching
of results until all non-coordinator fragments have completed. It aims to verify
that once the coordinator fragment reaches eos, the rest of the fragments will
get unblocked. Using a cluster of size 9 which can reliably reproduce the hang of
some non-coordinator fragments without the fix of IMPALA-8845.
"""
cluster = ImpalaCluster.get_e2e_test_cluster()
coordinator = cluster.get_first_impalad()
client = coordinator.service.create_hs2_client()
vector.get_value('exec_option')['spool_query_results'] = 'true'
for query in ["select * from tpch.lineitem order by l_orderkey limit 10000",
"select * from tpch.lineitem limit 10000"]:
handle = self.execute_query_async_using_client(client, query, vector)
for impalad in ImpalaCluster.get_e2e_test_cluster().impalads:
verifier = MetricVerifier(impalad.service)
if impalad.get_webserver_port() == coordinator.get_webserver_port():
num_fragments = 1
else:
num_fragments = 0
verifier.wait_for_metric("impala-server.num-fragments-in-flight", num_fragments)
results = client.fetch(query, handle)
assert results.success
assert len(results.data) == 10000
client.close()