mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-13999: Refactor test_hms_event_sync_basic to be smaller parallel tests
test_hms_event_sync_basic is not a simple test that it actually tests several kinds of statements in sequence. This refactors it into smaller parallel tests so there are more concurrent HMS events to be processed and easier to reveal bugs. Renamed some tests to use shorter names. Tests: - Ran all parallel tests of TestEventSyncWaiting 32 times. Change-Id: I8a2be548697f6259961b83dc91230306f38e03ad Reviewed-on: http://gerrit.cloudera.org:8080/22829 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
5ddd360cfb
commit
076536d508
@@ -299,6 +299,8 @@ class TestEventProcessing(ImpalaTestSuite):
|
||||
@SkipIfFS.hive
|
||||
@SkipIfCatalogV2.hms_event_polling_disabled()
|
||||
class TestEventSyncWaiting(ImpalaTestSuite):
|
||||
"""Verify query option sync_hms_events_wait_time_s should protect the query by
|
||||
waiting until Impala sync the HMS changes."""
|
||||
|
||||
@classmethod
|
||||
def get_workload(cls):
|
||||
@@ -331,26 +333,26 @@ class TestEventSyncWaiting(ImpalaTestSuite):
|
||||
res = self.execute_query_expect_success(client, "select * from " + tbl)
|
||||
assert res.data == ['1']
|
||||
|
||||
def test_hms_event_sync_basic(self, vector, unique_database):
|
||||
"""Verify query option sync_hms_events_wait_time_s should protect the query by
|
||||
waiting until Impala sync the HMS changes."""
|
||||
def test_describe(self, vector, unique_database):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
tbl_name = unique_database + ".tbl"
|
||||
label = "Synced events from Metastore"
|
||||
# Test DESCRIBE on new table created in Hive
|
||||
self.run_stmt_in_hive(
|
||||
"create table {0} (i int) partitioned by (p int)".format(tbl_name))
|
||||
res = self.execute_query_expect_success(client, "describe " + tbl_name)
|
||||
assert res.data == ["i\tint\t", 'p\tint\t']
|
||||
assert res.log == ''
|
||||
self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
|
||||
self.__verify_profile_timeline(res.runtime_profile)
|
||||
|
||||
def test_show_tables(self, vector, unique_database):
|
||||
# Test SHOW TABLES gets new tables created in Hive
|
||||
self.run_stmt_in_hive("create table {0}_2 (i int)".format(tbl_name))
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
tbl_name = unique_database + ".tbl"
|
||||
self.run_stmt_in_hive("create table {0} (i int)".format(tbl_name))
|
||||
res = self.execute_query_expect_success(client, "show tables in " + unique_database)
|
||||
assert res.data == ["tbl", "tbl_2"]
|
||||
assert res.data == ["tbl"]
|
||||
assert res.log == ''
|
||||
self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
|
||||
self.__verify_profile_timeline(res.runtime_profile)
|
||||
|
||||
# Test SHOW VIEWS gets new views created in Hive
|
||||
self.run_stmt_in_hive(
|
||||
@@ -358,64 +360,99 @@ class TestEventSyncWaiting(ImpalaTestSuite):
|
||||
res = self.execute_query_expect_success(client, "show views in " + unique_database)
|
||||
assert res.data == ["v"]
|
||||
assert res.log == ''
|
||||
self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
|
||||
self.__verify_profile_timeline(res.runtime_profile)
|
||||
|
||||
# Test DROP TABLE
|
||||
def test_drop_created_table(self, vector, unique_database):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
self.run_stmt_in_hive("create table {0}.tbl(i int)".format(unique_database))
|
||||
self.execute_query_expect_success(
|
||||
client, "drop table {0}.tbl".format(unique_database))
|
||||
|
||||
def test_insert_under_missing_db(self, vector, unique_name):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
db = unique_name + "_db"
|
||||
try:
|
||||
self.run_stmt_in_hive("""create database {0}_2;
|
||||
create table {0}_2.tbl(i int);
|
||||
create table {0}_2.tbl_2(i int);""".format(unique_database))
|
||||
# Create the table in Hive immediately after creating the db. So it's more likely
|
||||
# that when the INSERT is submitted, the db is still missing in Impala.
|
||||
self.run_stmt_in_hive("""create database {0};
|
||||
create table {0}.tbl(i int)""".format(db))
|
||||
self.execute_query_expect_success(
|
||||
client, "drop table {0}_2.tbl".format(unique_database))
|
||||
client, "insert into {0}.tbl values (0)".format(db))
|
||||
finally:
|
||||
self.run_stmt_in_hive(
|
||||
"drop database if exists {0}_2 cascade".format(unique_database))
|
||||
"drop database if exists {0} cascade".format(db))
|
||||
|
||||
# Test SHOW DATABASES
|
||||
def test_show_databases(self, vector, unique_database):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
res = self.execute_query_expect_success(client, "show databases")
|
||||
assert unique_database + "\t" in res.data
|
||||
assert unique_database + "_2\t" not in res.data
|
||||
self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
|
||||
self.__verify_profile_timeline(res.runtime_profile)
|
||||
|
||||
# Test DROP DATABASE
|
||||
# Create a new db in Hive
|
||||
self.run_stmt_in_hive("create database {0}_2".format(unique_database))
|
||||
res = self.execute_query_expect_success(client, "show databases")
|
||||
assert unique_database + "\t" in res.data
|
||||
assert unique_database + "_2\t" in res.data
|
||||
self.__verify_profile_timeline(res.runtime_profile)
|
||||
|
||||
# Drop the new db in Hive
|
||||
self.run_stmt_in_hive("drop database {0}_2".format(unique_database))
|
||||
res = self.execute_query_expect_success(client, "show databases")
|
||||
assert unique_database + "\t" in res.data
|
||||
assert unique_database + "_2\t" not in res.data
|
||||
self.__verify_profile_timeline(res.runtime_profile)
|
||||
|
||||
def test_drop_db(self, vector, unique_name):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
db = unique_name + "_db"
|
||||
try:
|
||||
self.run_stmt_in_hive("create database {0}_3".format(unique_database))
|
||||
self.execute_query_expect_success(
|
||||
client, "drop database {0}_3".format(unique_database))
|
||||
self.run_stmt_in_hive("create database {0}".format(db))
|
||||
self.execute_query_expect_success(client, "drop database {0}".format(db))
|
||||
finally:
|
||||
self.run_stmt_in_hive(
|
||||
"drop database if exists {0}_3 cascade".format(unique_database))
|
||||
"drop database if exists {0} cascade".format(db))
|
||||
|
||||
# Test DESCRIBE DATABASE
|
||||
def test_describe_db(self, vector, unique_name):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
db = unique_name + "_db"
|
||||
try:
|
||||
self.run_stmt_in_hive("create database {0}_4".format(unique_database))
|
||||
self.run_stmt_in_hive("create database {0}".format(db))
|
||||
self.execute_query_expect_success(
|
||||
client, "describe database {0}_4".format(unique_database))
|
||||
client, "describe database {0}".format(db))
|
||||
finally:
|
||||
self.run_stmt_in_hive("drop database if exists {0}_4".format(unique_database))
|
||||
self.run_stmt_in_hive("drop database if exists {0}".format(db))
|
||||
|
||||
# Test SHOW FUNCTIONS
|
||||
def test_show_functions(self, vector, unique_name):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
db = unique_name + "_db"
|
||||
try:
|
||||
self.run_stmt_in_hive("create database {0}_5".format(unique_database))
|
||||
self.run_stmt_in_hive("create database {0}".format(db))
|
||||
self.execute_query_expect_success(
|
||||
client, "show functions in {0}_5".format(unique_database))
|
||||
client, "show functions in {0}".format(db))
|
||||
finally:
|
||||
self.run_stmt_in_hive("drop database if exists {0}_5".format(unique_database))
|
||||
self.run_stmt_in_hive("drop database if exists {0}".format(db))
|
||||
|
||||
def test_hive_insert(self, vector, unique_database):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
# Create a partitioned table and DESCRIBE it to make it loaded.
|
||||
tbl_name = unique_database + ".tbl"
|
||||
self.execute_query("create table {0}(i int) partitioned by(p int)".format(tbl_name))
|
||||
self.execute_query("describe " + tbl_name)
|
||||
# Test SELECT gets new values inserted by Hive
|
||||
self.run_stmt_in_hive(
|
||||
"insert into table {0} partition (p=0) select 0".format(tbl_name))
|
||||
res = self.execute_query_expect_success(client, "select * from " + tbl_name)
|
||||
assert res.data == ["0\t0"]
|
||||
assert res.log == ''
|
||||
self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
|
||||
self.__verify_profile_timeline(res.runtime_profile)
|
||||
# Same case but using INSERT OVERWRITE in Hive
|
||||
self.run_stmt_in_hive(
|
||||
"insert overwrite table {0} partition (p=0) select 1".format(tbl_name))
|
||||
res = self.execute_query_expect_success(client, "select * from " + tbl_name)
|
||||
assert res.data == ["1\t0"]
|
||||
assert res.log == ''
|
||||
self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
|
||||
self.__verify_profile_timeline(res.runtime_profile)
|
||||
|
||||
# Test SHOW PARTITIONS gets new partitions created by Hive
|
||||
self.run_stmt_in_hive(
|
||||
@@ -426,17 +463,44 @@ class TestEventSyncWaiting(ImpalaTestSuite):
|
||||
# 3 result lines: 2 for partitions, 1 for total info
|
||||
assert len(res.data) == 3
|
||||
assert res.log == ''
|
||||
self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
|
||||
self.__verify_profile_timeline(res.runtime_profile)
|
||||
|
||||
# Test CREATE TABLE on table dropped by Hive
|
||||
def test_create_dropped_db(self, vector, unique_name):
|
||||
"""Test CREATE DATABASE on db dropped by Hive.
|
||||
Use unique_name instead of unique_database to avoid cleanup failure overwriting
|
||||
the real test failure, i.e. when the test fails, the db probably doesn't exist
|
||||
so cleanup of unique_database will also fail."""
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
db = unique_name + "_db"
|
||||
self.execute_query("create database " + db)
|
||||
try:
|
||||
self.run_stmt_in_hive("drop database " + db)
|
||||
res = self.execute_query_expect_success(client, "create database " + db)
|
||||
self.__verify_profile_timeline(res.runtime_profile)
|
||||
finally:
|
||||
self.execute_query("drop database if exists {} cascade".format(db))
|
||||
|
||||
def test_create_dropped_table(self, vector, unique_database):
|
||||
"""Test CREATE TABLE on table dropped by Hive"""
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
# Create a table and DESCRIBE it to make it loaded
|
||||
tbl_name = unique_database + ".tbl"
|
||||
self.execute_query_expect_success(client, "create table {0} (i int)".format(tbl_name))
|
||||
res = self.execute_query_expect_success(client, "describe " + tbl_name)
|
||||
assert res.data == ["i\tint\t"]
|
||||
# Drop it in Hive and re-create it in Impala using a new schema
|
||||
self.run_stmt_in_hive("drop table " + tbl_name)
|
||||
self.execute_query_expect_success(client, "create table {0} (j int)".format(tbl_name))
|
||||
res = self.execute_query_expect_success(client, "describe " + tbl_name)
|
||||
assert res.data == ["j\tint\t"]
|
||||
assert res.log == ''
|
||||
self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
|
||||
self.__verify_profile_timeline(res.runtime_profile)
|
||||
|
||||
def test_hms_event_sync_multiple_tables(self, vector, unique_database):
|
||||
def __verify_profile_timeline(self, profile):
|
||||
self.verify_timeline_item(
|
||||
"Query Compilation", "Synced events from Metastore", profile)
|
||||
|
||||
def test_multiple_tables(self, vector, unique_database):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
for i in range(3):
|
||||
self.execute_query("create table {0}.tbl{1} (i int)".format(unique_database, i))
|
||||
@@ -453,7 +517,7 @@ class TestEventSyncWaiting(ImpalaTestSuite):
|
||||
where t0.i = t1.i and t1.i = t2.i""".format(unique_database))
|
||||
assert res == "1"
|
||||
|
||||
def test_hms_event_sync_with_view(self, vector, unique_database):
|
||||
def test_view(self, vector, unique_database):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
tbl = unique_database + ".foo"
|
||||
view = unique_database + ".foo_view"
|
||||
@@ -475,7 +539,7 @@ class TestEventSyncWaiting(ImpalaTestSuite):
|
||||
res = self.execute_query_expect_success(client, count_stmt)
|
||||
assert res.data[0] == '0'
|
||||
|
||||
def test_hms_event_sync_with_view_partitioned(self, vector, unique_database):
|
||||
def test_view_partitioned(self, vector, unique_database):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
tbl = unique_database + ".foo"
|
||||
view = unique_database + ".foo_view"
|
||||
@@ -500,7 +564,7 @@ class TestEventSyncWaiting(ImpalaTestSuite):
|
||||
res = self.execute_scalar_expect_success(client, select_stmt)
|
||||
assert res == '2\t2'
|
||||
|
||||
def test_hms_event_sync_compute_stats(self, vector, unique_database):
|
||||
def test_compute_stats(self, vector, unique_database):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
tbl = unique_database + ".foo"
|
||||
self.execute_query("create table {}(i int) partitioned by(p int)".format(tbl))
|
||||
@@ -522,7 +586,7 @@ class TestEventSyncWaiting(ImpalaTestSuite):
|
||||
client, "compute stats {}".format(tbl))
|
||||
assert res.data == ['Updated 1 partition(s) and 1 column(s).']
|
||||
|
||||
def test_hms_event_sync_ctas(self, vector, unique_database):
|
||||
def test_ctas(self, vector, unique_database):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
tbl = unique_database + ".foo"
|
||||
tmp_tbl = unique_database + ".tmp"
|
||||
@@ -549,7 +613,7 @@ class TestEventSyncWaiting(ImpalaTestSuite):
|
||||
client, "create table {}_4 as select 1,1".format(tbl))
|
||||
assert 'Table already exists: {}_4'.format(tbl) in str(exception)
|
||||
|
||||
def test_hms_event_sync_insert(self, vector, unique_database):
|
||||
def test_impala_insert(self, vector, unique_database):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
tbl = unique_database + ".foo"
|
||||
tmp_tbl = unique_database + ".tmp"
|
||||
@@ -578,7 +642,7 @@ class TestEventSyncWaiting(ImpalaTestSuite):
|
||||
res = self.execute_query_expect_success(client, insert_stmt)
|
||||
assert len(res.data) == 0
|
||||
|
||||
def test_hms_event_sync_txn(self, vector, unique_database):
|
||||
def test_txn(self, vector, unique_database):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
tbl = unique_database + ".foo"
|
||||
self.run_stmt_in_hive(
|
||||
@@ -601,19 +665,6 @@ class TestEventSyncWaiting(ImpalaTestSuite):
|
||||
res = self.execute_query_expect_success(client, "show files in " + tbl)
|
||||
assert len(res.data) == 1
|
||||
|
||||
def test_hms_event_sync_on_missing_db(self, vector, unique_name):
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
db = unique_name + "_db"
|
||||
try:
|
||||
self.run_stmt_in_hive("""create database {0};
|
||||
create table {0}.tbl(i int);
|
||||
create table {0}.tbl_2(i int);""".format(db))
|
||||
self.execute_query_expect_success(
|
||||
client, "insert into {0}.tbl values (0)".format(db))
|
||||
finally:
|
||||
self.run_stmt_in_hive(
|
||||
"drop database if exists {0} cascade".format(db))
|
||||
|
||||
def test_hms_event_sync_on_deletion(self, vector, unique_name):
|
||||
"""Regression test for IMPALA-13829: TWaitForHmsEventResponse not able to collect
|
||||
removed objects due to their items in deleteLog being GCed."""
|
||||
|
||||
Reference in New Issue
Block a user