Files
impala/tests/stress/test_update_stress.py
Riza Suminto 28cff4022d IMPALA-14333: Run impala-py.test using Python3
Running exhaustive tests with env var IMPALA_USE_PYTHON3_TESTS=true
reveals some tests that require adjustment. This patch made such
adjustment, which mostly revolves around encoding differences and string
vs bytes type in Python3. This patch also switch the default to run
pytest with Python3 by setting IMPALA_USE_PYTHON3_TESTS=true. The
following are the details:

Change hash() function in conftest.py to crc32() to produce
deterministic hash. Hash randomization is enabled by default since
Python 3.3 (see
https://docs.python.org/3/reference/datamodel.html#object.__hash__).
This cause test sharding (like --shard_tests=1/2) produce inconsistent
set of tests per shard. Always restart minicluster during custom cluster
tests if --shard_tests argument is set, because test order may change
and affect test correctness, depending on whether running on fresh
minicluster or not.

Moved one test case from delimited-latin-text.test to
test_delimited_text.py for easier binary comparison.

Add bytes_to_str() as a utility function to decode bytes in Python3.
This is often needed when inspecting the return value of
subprocess.check_output() as a string.

Implement DataTypeMetaclass.__lt__ to substitute
DataTypeMetaclass.__cmp__ that is ignored in Python3 (see
https://peps.python.org/pep-0207/).

Fix WEB_CERT_ERR difference in test_ipv6.py.

Fix trivial integer parsing in test_restart_services.py.

Fix various encoding issues in test_saml2_sso.py,
test_shell_commandline.py, and test_shell_interactive.py.

Change timeout in Impala.for_each_impalad() from sys.maxsize to 2^31-1.

Switch to binary comparison in test_iceberg.py where needed.

Specify text mode when calling tempfile.NamedTemporaryFile().

Simplify create_impala_shell_executable_dimension to skip testing dev
and python2 impala-shell when IMPALA_USE_PYTHON3_TESTS=true. The reason
is that several UTF-8 related tests in test_shell_commandline.py break
in Python3 pytest + Python2 impala-shell combo. This skipping already
happen automatically in build OS without system Python2 available like
RHEL9 (IMPALA_SYSTEM_PYTHON2 env var is empty).

Removed unused vector argument and fixed some trivial flake8 issues.

Several test logic require modification due to intermittent issue in
Python3 pytest. These include:

Add _run_query_with_client() in test_ranger.py to allow reusing a single
Impala client for running several queries. Ensure clients are closed
when the test is done. Mark several tests in test_ranger.py with

SkipIfFS.hive because they run queries through beeline + HiveServer2,
but Ozone and S3 build environment does not start HiveServer2 by
default.

Increase the sleep period from 0.1 to 0.5 seconds per iteration in
test_statestore.py and mark TestStatestore to execute serially. This is
because TServer appears to shut down more slowly when run concurrently
with other tests. Handle the deprecation of Thread.setDaemon() as well.

Always force_restart=True each test method in TestLoggingCore,
TestShellInteractiveReconnect, and TestQueryRetries to prevent them from
reusing minicluster from previous test method. Some of these tests
destruct minicluster (kill impalad) and will produce minidump if metrics
verifier for next tests fail to detect healthy minicluster state.

Testing:
Pass exhaustive tests with IMPALA_USE_PYTHON3_TESTS=true.

Change-Id: I401a93b6cc7bcd17f41d24e7a310e0c882a550d4
Reviewed-on: http://gerrit.cloudera.org:8080/23319
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-09-03 10:01:29 +00:00

413 lines
17 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from builtins import map, range
import pytest
import random
import time
from multiprocessing import Value
from subprocess import check_output
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.parametrize import UniqueDatabase
from tests.common.skip import SkipIfFS
from tests.common.test_dimensions import create_exec_option_dimension
from tests.stress.stress_util import run_tasks, Task
from tests.util.filesystem_utils import FILESYSTEM_PREFIX, IS_HDFS
from tests.util.parse_util import bytes_to_str
from tests.conftest import DEFAULT_HIVE_SERVER2
# Longer-running UPDATE tests are executed here
class TestIcebergV2UpdateStress(ImpalaTestSuite):
"""UPDATE tests against Iceberg V2 tables."""
BATCH_SIZES = [0, 32]
EXHAUSTIVE_BATCH_SIZES = [0, 1, 11, 32]
@classmethod
def add_test_dimensions(cls):
super(TestIcebergV2UpdateStress, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
batch_sizes = (TestIcebergV2UpdateStress.BATCH_SIZES
if cls.exploration_strategy() == 'core'
else TestIcebergV2UpdateStress.EXHAUSTIVE_BATCH_SIZES)
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
batch_sizes=batch_sizes))
def test_update_stress(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-update-stress', vector,
unique_database)
if IS_HDFS:
self._update_stress_hive_tests(unique_database)
def _update_stress_hive_tests(self, db):
stmt = """
SELECT count(*), sum(ss_ticket_number)
FROM {}.ice_store_sales
WHERE ss_item_sk % 1999 = 0""".format(db)
hive_results = self.run_stmt_in_hive(stmt).split("\n", 1)[1]
assert hive_results == \
"3138,848464922\n"
# Stress test for concurrent UPDATE operations against Iceberg tables.
class TestIcebergConcurrentUpdateStress(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'targeted-stress'
@classmethod
def add_test_dimensions(cls):
super(TestIcebergConcurrentUpdateStress, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: (v.get_value('table_format').file_format == 'parquet'
and v.get_value('table_format').compression_codec == 'snappy'))
def _impala_role_concurrent_writer(self, tbl_name, col, num_updates):
"""Increments values in column 'total' and in the column which is passed in 'col'."""
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
impalad_client = self.create_client_for_nth_impalad(target_impalad)
update_cnt = 0
while update_cnt < num_updates:
try:
impalad_client.execute(
"update {0} set total = total + 1, {1} = {1} + 1".format(tbl_name, col))
update_cnt += 1
# Sleep after a succesful operation.
time.sleep(random.random())
except Exception:
# Exceptions are expected due to concurrent operations.
pass
impalad_client.close()
def _impala_role_concurrent_checker(self, tbl_name, target_total):
"""Checks if the table's invariant is true. The invariant is that the equation
'total == a + b + c' is true. Returns 'total'."""
def verify_result_set(result):
assert len(result.data) == 1
line = result.data[0]
[total, a, b, c] = list(map(int, (line.split('\t'))))
assert total == a + b + c
return total
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
impalad_client = self.create_client_for_nth_impalad(target_impalad)
total = 0
while total < target_total:
result = impalad_client.execute("select * from %s" % tbl_name)
new_total = verify_result_set(result)
assert total <= new_total
total = new_total
time.sleep(random.random())
impalad_client.close()
@pytest.mark.execute_serially
@UniqueDatabase.parametrize(sync_ddl=True)
def test_iceberg_updates(self, unique_database):
"""Issues UPDATE statements against multiple impalads in a way that some
invariants must be true when a spectator process inspects the table. E.g.
the value of a column should be equal to the sum of other columns."""
tbl_name = "%s.test_concurrent_updates" % unique_database
self.client.set_configuration_option("SYNC_DDL", "true")
self.client.execute("""create table {0}
(total bigint, a bigint, b bigint, c bigint)
stored as iceberg
tblproperties('format-version'='2')""".format(tbl_name,))
self.client.execute(
"insert into {} values (0, 0, 0, 0)".format(tbl_name))
num_checkers = 2
cols = 3
updates_per_col = 30
target_total = updates_per_col * cols
updater_a = Task(self._impala_role_concurrent_writer, tbl_name, "a", updates_per_col)
updater_b = Task(self._impala_role_concurrent_writer, tbl_name, "b", updates_per_col)
updater_c = Task(self._impala_role_concurrent_writer, tbl_name, "c", updates_per_col)
checkers = [Task(self._impala_role_concurrent_checker, tbl_name, target_total)
for i in range(0, num_checkers)]
run_tasks([updater_a, updater_b, updater_c] + checkers)
class TestIcebergConcurrentOperations(ImpalaTestSuite):
"""This test checks that concurrent DELETE and UPDATE operations leave the table
in a consistent state."""
@classmethod
def get_workload(self):
return 'targeted-stress'
@classmethod
def add_test_dimensions(cls):
super(TestIcebergConcurrentOperations, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: (v.get_value('table_format').file_format == 'parquet'
and v.get_value('table_format').compression_codec == 'snappy'))
def _impala_role_concurrent_deleter(self, tbl_name, all_rows_deleted, num_rows):
"""Deletes every row from the table one by one."""
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
impalad_client = self.create_client_for_nth_impalad(target_impalad)
impalad_client.set_configuration_option("SYNC_DDL", "true")
i = 0
while i < num_rows:
try:
impalad_client.execute(
"delete from {0} WHERE id = {1}".format(tbl_name, i))
i += 1
except Exception as e:
# Exceptions are expected due to concurrent operations.
print(str(e))
time.sleep(random.random())
all_rows_deleted.value = 1
impalad_client.close()
def _impala_role_concurrent_writer(self, tbl_name, all_rows_deleted):
"""Updates every row in the table in a loop."""
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
impalad_client = self.create_client_for_nth_impalad(target_impalad)
impalad_client.set_configuration_option("SYNC_DDL", "true")
while all_rows_deleted.value != 1:
try:
impalad_client.execute(
"update {0} set j = j + 1".format(tbl_name))
except Exception as e:
# Exceptions are expected due to concurrent operations.
print(str(e))
time.sleep(random.random())
impalad_client.close()
def _impala_role_concurrent_optimizer(self, tbl_name, all_rows_deleted):
"""Optimizes the table in a loop."""
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
impalad_client = self.create_client_for_nth_impalad(target_impalad)
impalad_client.set_configuration_option("SYNC_DDL", "true")
while all_rows_deleted.value != 1:
try:
impalad_client.execute("optimize table {0}".format(tbl_name))
except Exception as e:
# Exceptions are expected due to concurrent operations.
print(str(e))
time.sleep(random.random())
impalad_client.close()
def _hive_role_concurrent_writer(self, tbl_name, all_rows_deleted):
"""Increments j's value with each iteration until all rows are deleted"""
hive_client = ImpalaTestSuite.create_impala_client(
host_port=DEFAULT_HIVE_SERVER2, protocol='hs2', is_hive=True)
while all_rows_deleted.value != 1:
try:
hive_client.execute(
"update {0} set j = j + 1".format(tbl_name))
except Exception as e:
# Exceptions are expected due to concurrent operations.
print(str(e))
time.sleep(random.random() * 7)
hive_client.close()
def _hive_role_concurrent_deleter(self, tbl_name, all_rows_deleted, num_rows):
hive_client = ImpalaTestSuite.create_impala_client(
host_port=DEFAULT_HIVE_SERVER2, protocol='hs2', is_hive=True)
i = 0
while i < num_rows:
try:
hive_client.execute(
"delete from {0} WHERE id = {1}".format(tbl_name, i))
i += 1
except Exception as e:
# Exceptions are expected due to concurrent operations.
print(str(e))
time.sleep(random.random())
all_rows_deleted.value = 1
hive_client.close()
def _impala_role_concurrent_checker(self, tbl_name, all_rows_deleted, num_rows):
"""Checks if the table's invariant is true. The invariant is that we have a
consecutive range of 'id's starting from N to num_rows - 1. And 'j's are equal."""
def verify_result_set(result):
if len(result.data) == 0: return
line = result.data[0]
[prev_id, prev_j] = list(map(int, (line.split('\t'))))
for line in result.data[1:]:
[id, j] = list(map(int, (line.split('\t'))))
assert id - prev_id == 1
assert j == prev_j
prev_id = id
prev_j = j
assert prev_id == num_rows - 1
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
impalad_client = self.create_client_for_nth_impalad(target_impalad)
while all_rows_deleted.value != 1:
result = impalad_client.execute("select * from %s order by id" % tbl_name)
verify_result_set(result)
time.sleep(random.random())
impalad_client.close()
@pytest.mark.execute_serially
@UniqueDatabase.parametrize(sync_ddl=True)
def test_iceberg_deletes_and_updates(self, unique_database):
"""Issues DELETE and UPDATE statements in parallel in a way that some
invariants must be true when a spectator process inspects the table."""
tbl_suffix = "test_concurrent_deletes_and_updates"
tbl_name = unique_database + "." + tbl_suffix
self.client.set_configuration_option("SYNC_DDL", "true")
self.client.execute("""create table {0} (id int, j bigint)
stored as iceberg
tblproperties('format-version'='2')""".format(tbl_name,))
num_rows = 10
values = ', '.join("({}, 0)".format(i) for i in range(num_rows))
self.client.execute("insert into {} values ({})".format(tbl_name, values))
all_rows_deleted = Value('i', 0)
deleter = Task(self._impala_role_concurrent_deleter, tbl_name, all_rows_deleted,
num_rows)
updater = Task(self._impala_role_concurrent_writer, tbl_name, all_rows_deleted)
checker = Task(self._impala_role_concurrent_checker, tbl_name, all_rows_deleted,
num_rows)
run_tasks([deleter, updater, checker])
result = self.client.execute("select count(*) from {}".format(tbl_name))
assert result.data == ['0']
self.check_no_orphan_files(unique_database, tbl_suffix)
@pytest.mark.execute_serially
@UniqueDatabase.parametrize(sync_ddl=True)
def test_iceberg_deletes_and_updates_and_optimize(self, unique_database):
"""Issues DELETE and UPDATE statements in parallel in a way that some
invariants must be true when a spectator process inspects the table.
An optimizer thread also invokes OPTMIZE regularly on the table."""
tbl_suffix = "test_concurrent_write_and_optimize"
tbl_name = unique_database + "." + tbl_suffix
self.client.set_configuration_option("SYNC_DDL", "true")
self.client.execute("""create table {0} (id int, j bigint)
stored as iceberg
tblproperties('format-version'='2')""".format(tbl_name,))
num_rows = 10
values_str = ""
# Prepare INSERT statement of 'num_rows' records.
for i in range(num_rows):
values_str += "({}, 0)".format(i)
if i != num_rows - 1:
values_str += ", "
self.client.execute("insert into {} values {}".format(tbl_name, values_str))
all_rows_deleted = Value('i', 0)
deleter = Task(self._impala_role_concurrent_deleter, tbl_name, all_rows_deleted,
num_rows)
updater = Task(self._impala_role_concurrent_writer, tbl_name, all_rows_deleted)
optimizer = Task(self._impala_role_concurrent_optimizer, tbl_name,
all_rows_deleted)
checker = Task(self._impala_role_concurrent_checker, tbl_name, all_rows_deleted,
num_rows)
run_tasks([deleter, updater, optimizer, checker])
result = self.client.execute("select count(*) from {}".format(tbl_name))
assert result.data == ['0']
self.check_no_orphan_files(unique_database, tbl_suffix)
def check_no_orphan_files(self, unique_database, table_name):
# Check that the uncommitted data and delete files are removed from the file system
# and only those files remain that are reachable through valid snapshots.
data_files_in_tbl_result = self.client.execute(
"select file_path from {}.{}.all_files;".format(unique_database, table_name))
data_files_in_tbl = [row.split('/test-warehouse')[1]
for row in data_files_in_tbl_result.data]
table_location = "{0}/test-warehouse/{1}.db/{2}/data".format(
FILESYSTEM_PREFIX, unique_database, table_name)
data_files_on_fs_result = bytes_to_str(
check_output(["hdfs", "dfs", "-ls", table_location]))
# The first row of the HDFS result is a summary, the following lines contain
# 1 file each.
data_files_on_fs_rows = data_files_on_fs_result.strip().split('\n')[1:]
data_files_on_fs = [row.split()[-1].split('/test-warehouse')[1]
for row in data_files_on_fs_rows]
assert len(data_files_in_tbl) == len(data_files_on_fs_rows)
assert set(data_files_on_fs) == set(data_files_in_tbl)
@SkipIfFS.hive
@pytest.mark.execute_serially
@UniqueDatabase.parametrize(sync_ddl=True)
def test_iceberg_impala_deletes_and_hive_updates(self, unique_database):
"""Issues DELETE statements from Impala and UPDATE statements from Hive
in parallel in a way that some invariants must be true when a spectator process
inspects the table."""
tbl_name = "%s.test_concurrent_deletes_and_updates" % unique_database
self.client.set_configuration_option("SYNC_DDL", "true")
self.client.execute("""create table {0} (id int, j bigint)
stored as iceberg
tblproperties('format-version'='2')""".format(tbl_name,))
num_rows = 5
values = ', '.join("({}, 0)".format(i) for i in range(num_rows))
self.client.execute("insert into {} values ({})".format(tbl_name, values))
all_rows_deleted = Value('i', 0)
deleter = Task(self._impala_role_concurrent_deleter, tbl_name, all_rows_deleted,
num_rows)
hive_updater = Task(self._hive_role_concurrent_writer, tbl_name, all_rows_deleted)
checker = Task(self._impala_role_concurrent_checker, tbl_name, all_rows_deleted,
num_rows)
run_tasks([deleter, hive_updater, checker])
result = self.client.execute("select count(*) from {}".format(tbl_name))
assert result.data == ['0']
@SkipIfFS.hive
@pytest.mark.execute_serially
@UniqueDatabase.parametrize(sync_ddl=True)
def test_iceberg_impala_updates_and_hive_deletes(self, unique_database):
"""Issues DELETE statemes from Hive and UPDATE statements from Impala
in parallel in a way that some invariants must be true when a spectator
process inspects the table."""
tbl_name = "%s.test_concurrent_deletes_and_updates" % unique_database
self.client.set_configuration_option("SYNC_DDL", "true")
self.client.execute("""create table {0} (id int, j bigint)
stored as iceberg
tblproperties('format-version'='2')""".format(tbl_name))
num_rows = 5
values = ', '.join("({}, 0)".format(i) for i in range(num_rows))
self.client.execute("insert into {} values ({})".format(tbl_name, values))
all_rows_deleted = Value('i', 0)
impala_updater = Task(self._impala_role_concurrent_writer, tbl_name, all_rows_deleted)
hive_deleter = Task(self._hive_role_concurrent_deleter,
tbl_name, all_rows_deleted, num_rows)
checker = Task(self._impala_role_concurrent_checker, tbl_name, all_rows_deleted,
num_rows)
run_tasks([impala_updater, hive_deleter, checker])
self.client.execute("refresh %s" % tbl_name)
result = self.client.execute("select count(*) from {}".format(tbl_name))
assert result.data == ['0']