mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-12829 added extra code to CatalogServiceCatalog's reloadTableIfExists() that can throw a ClassCastException when it reloads an Iceberg table. When it happens during event processing the event processor invalidates the table. This usually happens when another engine updates an Iceberg table. It then causes slow table loading times as the tables need to be fully reloaded instead of just doing an incremental table loading. This patches fixes the ClassCastException by moving the cast into an if statement. Testing * e2e tests added Change-Id: I892cf326a72024674facad6750893352b982c658 Reviewed-on: http://gerrit.cloudera.org:8080/23349 Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
807 lines
38 KiB
Python
807 lines
38 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
from __future__ import absolute_import, division, print_function
|
|
from subprocess import check_call
|
|
import logging
|
|
import pytest
|
|
import re
|
|
import time
|
|
import threading
|
|
|
|
from tests.common.test_dimensions import (
|
|
create_single_exec_option_dimension,
|
|
add_mandatory_exec_option)
|
|
from tests.common.impala_cluster import ImpalaCluster
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfCatalogV2
|
|
from tests.common.test_vector import HS2
|
|
from tests.metadata.test_event_processing_base import TestEventProcessingBase
|
|
from tests.util.event_processor_utils import EventProcessorUtils
|
|
|
|
PROCESSING_TIMEOUT_S = 10
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
@SkipIfFS.hive
|
|
class TestEventProcessing(TestEventProcessingBase):
|
|
"""This class contains tests that exercise the event processing mechanism in the
|
|
catalog."""
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
super(TestEventProcessing, cls).setup_class()
|
|
|
|
@classmethod
|
|
def default_test_protocol(cls):
|
|
return HS2
|
|
|
|
@SkipIfHive2.acid
|
|
def test_transactional_insert_events(self, unique_database):
|
|
"""Executes 'run_test_insert_events' for transactional tables.
|
|
"""
|
|
TestEventProcessingBase._run_test_insert_events_impl(self,
|
|
unique_database, is_transactional=True)
|
|
|
|
def test_insert_events(self, unique_database):
|
|
"""Executes 'run_test_insert_events' for non-transactional tables.
|
|
"""
|
|
TestEventProcessingBase._run_test_insert_events_impl(self, unique_database)
|
|
|
|
def test_iceberg_inserts(self):
|
|
"""IMPALA-10735: INSERT INTO Iceberg table fails during INSERT event generation
|
|
This test doesn't test event processing. It tests that Iceberg INSERTs still work
|
|
when HMS event polling is enabled.
|
|
IMPALA-10736 tracks adding proper support for Hive Replication."""
|
|
db_name = ImpalaTestSuite.get_random_name("iceberg_insert_event_db_")
|
|
tbl_name = "ice_test"
|
|
try:
|
|
self.execute_query("create database if not exists {0}".format(db_name))
|
|
self.execute_query("""
|
|
create table {0}.{1} (i int)
|
|
partitioned by spec (bucket(5, i))
|
|
stored as iceberg;""".format(db_name, tbl_name))
|
|
self.execute_query("insert into {0}.{1} values (1)".format(db_name, tbl_name))
|
|
data = self.execute_scalar("select * from {0}.{1}".format(db_name, tbl_name))
|
|
assert data == '1'
|
|
finally:
|
|
self.execute_query("drop database if exists {0} cascade".format(db_name))
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_hive_impala_iceberg_reloads(self, unique_database):
|
|
def get_refresh_count():
|
|
return EventProcessorUtils.get_int_metric('tables-refreshed', 0)
|
|
|
|
def run_hive_check_refresh(stmt):
|
|
refresh_before = get_refresh_count()
|
|
self.run_stmt_in_hive(stmt)
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
refresh_after = get_refresh_count()
|
|
assert refresh_after - refresh_before == 1
|
|
|
|
test_tbl = unique_database + ".test_events"
|
|
self.run_stmt_in_hive("create table {} (value string) \
|
|
partitioned by (year int) stored by iceberg".format(test_tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
self.execute_query("describe {}".format(test_tbl))
|
|
|
|
run_hive_check_refresh("insert into {} values ('1', 2025)".format(test_tbl))
|
|
|
|
res = self.execute_query("select * from {}".format(test_tbl))
|
|
assert ["1\t2025"] == res.data
|
|
res = self.execute_query("refresh {}".format(test_tbl))
|
|
assert "Iceberg table reload skipped as no change detected" in res.runtime_profile
|
|
|
|
run_hive_check_refresh("alter table {} add columns (s string)".format(test_tbl))
|
|
|
|
res = self.execute_query("select * from {}".format(test_tbl))
|
|
assert ["1\t2025\tNULL"] == res.data
|
|
res = self.execute_query("refresh {}".format(test_tbl))
|
|
assert "Iceberg table reload skipped as no change detected" in res.runtime_profile
|
|
|
|
@SkipIfHive2.acid
|
|
def test_empty_partition_events_transactional(self, unique_database):
|
|
self._run_test_empty_partition_events(unique_database, True)
|
|
|
|
def test_empty_partition_events(self, unique_database):
|
|
self._run_test_empty_partition_events(unique_database, False)
|
|
|
|
def test_event_based_replication(self):
|
|
self._run_event_based_replication_tests_impl(self,
|
|
self.filesystem_client)
|
|
|
|
def _run_test_empty_partition_events(self, unique_database, is_transactional):
|
|
test_tbl = unique_database + ".test_events"
|
|
TBLPROPERTIES = self._get_transactional_tblproperties(is_transactional)
|
|
self.run_stmt_in_hive("create table {0} (key string, value string) \
|
|
partitioned by (year int) stored as parquet {1}".format(test_tbl, TBLPROPERTIES))
|
|
self.client.set_configuration({
|
|
"sync_hms_events_wait_time_s": PROCESSING_TIMEOUT_S,
|
|
"sync_hms_events_strict_mode": True
|
|
})
|
|
self.client.execute("describe {0}".format(test_tbl))
|
|
|
|
self.run_stmt_in_hive(
|
|
"alter table {0} add partition (year=2019)".format(test_tbl))
|
|
assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
|
|
|
|
self.run_stmt_in_hive(
|
|
"alter table {0} add if not exists partition (year=2019)".format(test_tbl))
|
|
assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
|
|
self.run_stmt_in_hive(
|
|
"alter table {0} drop partition (year=2019)".format(test_tbl))
|
|
assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
|
|
self.run_stmt_in_hive(
|
|
"alter table {0} drop if exists partition (year=2019)".format(test_tbl))
|
|
assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_load_data_from_impala(self, unique_database):
|
|
tbl_nopart = "tbl_nopart"
|
|
tbl_part = "tbl_part"
|
|
staging_dir = "/tmp/{0}".format(unique_database)
|
|
check_call(["hdfs", "dfs", "-mkdir", staging_dir])
|
|
try:
|
|
self.execute_query(
|
|
"drop table if exists {0}.{1} purge".format(unique_database, tbl_nopart))
|
|
self.execute_query(
|
|
"drop table if exists {0}.{1} purge".format(unique_database, tbl_part))
|
|
|
|
self.execute_query(
|
|
"create table {0}.{1} like functional_parquet.tinytable stored as parquet"
|
|
.format(unique_database, tbl_nopart))
|
|
self.execute_query(
|
|
"create table {0}.{1} like functional_parquet.alltypessmall stored as \
|
|
parquet".format(unique_database, tbl_part))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
|
|
check_call([
|
|
"hdfs", "dfs", "-cp", "/test-warehouse/tinytable_parquet", staging_dir])
|
|
last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
|
|
self.execute_query("load data inpath '{0}/tinytable_parquet' \
|
|
into table {1}.{2}".format(staging_dir, unique_database, tbl_nopart))
|
|
# Check if there is an insert event fired after load data statement.
|
|
events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
|
|
assert len(events) == 1
|
|
last_event = events[0]
|
|
assert last_event.dbName == unique_database
|
|
assert last_event.tableName == tbl_nopart
|
|
assert last_event.eventType == "INSERT"
|
|
|
|
check_call(["hdfs", "dfs", "-cp", "/test-warehouse/alltypessmall_parquet",
|
|
staging_dir])
|
|
self.execute_query(
|
|
"alter table {0}.{1} add partition (year=2009,month=1)".format(
|
|
unique_database, tbl_part))
|
|
last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
|
|
self.execute_query(
|
|
"load data inpath '{0}/alltypessmall_parquet/year=2009/month=1' \
|
|
into table {1}.{2} partition (year=2009,month=1)".format(
|
|
staging_dir, unique_database, tbl_part))
|
|
# Check if there is an insert event fired after load data statement.
|
|
events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
|
|
assert len(events) == 1
|
|
last_event = events[0]
|
|
assert last_event.dbName == unique_database
|
|
assert last_event.tableName == tbl_part
|
|
assert last_event.eventType == "INSERT"
|
|
finally:
|
|
check_call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", staging_dir])
|
|
|
|
def test_transact_partition_location_change_from_hive(self, unique_database):
|
|
"""IMPALA-12356: Verify alter partition from hive on transactional table"""
|
|
self.run_test_partition_location_change_from_hive(unique_database,
|
|
"transact_alter_part_hive", True)
|
|
|
|
def test_partition_location_change_from_hive(self, unique_database):
|
|
"""IMPALA-12356: Verify alter partition from hive on non-transactional table"""
|
|
self.run_test_partition_location_change_from_hive(unique_database, "alter_part_hive")
|
|
|
|
def run_test_partition_location_change_from_hive(self, unique_database, tbl_name,
|
|
is_transactional=False):
|
|
fq_tbl_name = unique_database + "." + tbl_name
|
|
TBLPROPERTIES = TestEventProcessingBase._get_transactional_tblproperties(
|
|
is_transactional)
|
|
# Create the table
|
|
self.client.execute(
|
|
"create table %s (i int) partitioned by(j int) stored as parquet %s"
|
|
% (fq_tbl_name, TBLPROPERTIES))
|
|
# Insert some data to a partition
|
|
p1 = "j=1"
|
|
self.client.execute("insert into table %s partition(%s) values (0),(1),(2)"
|
|
% (fq_tbl_name, p1))
|
|
tbl_location = self._get_table_property("Location:", fq_tbl_name)
|
|
partitions = self.get_impala_partition_info(fq_tbl_name, 'Location')
|
|
assert [("{0}/{1}".format(tbl_location, p1),)] == partitions
|
|
# Alter partition location from hive
|
|
new_part_location = tbl_location + "/j=2"
|
|
self.run_stmt_in_hive("alter table %s partition(%s) set location '%s'"
|
|
% (fq_tbl_name, p1, new_part_location))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
# Verify if the location is updated
|
|
partitions = self.get_impala_partition_info(fq_tbl_name, 'Location')
|
|
assert [(new_part_location,)] == partitions
|
|
|
|
def _get_table_property(self, property_name, table_name):
|
|
"""Extract the table property value from output of DESCRIBE FORMATTED."""
|
|
result = self.client.execute("describe formatted {0}".format(table_name))
|
|
for row in result.data:
|
|
if property_name in row:
|
|
row = row.split('\t')
|
|
if row[1] == 'NULL':
|
|
break
|
|
return row[1].rstrip()
|
|
return None
|
|
|
|
def _exec_and_check_ep_cmd(self, cmd, expected_status):
|
|
cmd_output = self.execute_query(cmd).get_data()
|
|
match = re.search(
|
|
r"EventProcessor status: %s. LastSyncedEventId: \d+. LatestEventId: \d+." %
|
|
expected_status,
|
|
cmd_output)
|
|
assert match, cmd_output
|
|
assert EventProcessorUtils.get_event_processor_status() == expected_status
|
|
return cmd_output
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_event_processor_cmds(self, unique_database):
|
|
###########################################################################
|
|
# 1. Test normal PAUSE and RESUME. Also check the STATUS command.
|
|
self._exec_and_check_ep_cmd(":event_processor('pause')", "PAUSED")
|
|
self._exec_and_check_ep_cmd(":event_processor('status')", "PAUSED")
|
|
self._exec_and_check_ep_cmd(":event_processor('start')", "ACTIVE")
|
|
self._exec_and_check_ep_cmd(":event_processor('status')", "ACTIVE")
|
|
|
|
# Make sure the CREATE_DATABASE event for 'unique_database' is processed
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
|
|
###########################################################################
|
|
# 2. Test failure of restarting at an older event id when status is ACTIVE
|
|
last_synced_event_id = EventProcessorUtils.get_last_synced_event_id()
|
|
e = self.execute_query_expect_failure(
|
|
self.client, ":event_processor('start', %d)" % (last_synced_event_id / 2))
|
|
assert "EventProcessor is active. Failed to set last synced event id from " +\
|
|
str(last_synced_event_id) + " back to " + str(int(last_synced_event_id / 2)) +\
|
|
". Please pause EventProcessor first." in str(e)
|
|
|
|
###########################################################################
|
|
# 3. Test restarting to the latest event id
|
|
self._exec_and_check_ep_cmd(":event_processor('pause')", "PAUSED")
|
|
# Create some HMS events
|
|
for i in range(3):
|
|
self.run_stmt_in_hive("create table %s.tbl_%d(i int)" % (unique_database, i))
|
|
latest_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
|
|
# Wait some time for EP to update its latest event id
|
|
time.sleep(2)
|
|
# Restart to the latest event id
|
|
self._exec_and_check_ep_cmd(":event_processor('start', -1)", "ACTIVE")
|
|
assert EventProcessorUtils.get_last_synced_event_id() == latest_event_id
|
|
# Verify the new events are skipped so Impala queries should fail
|
|
for i in range(3):
|
|
self.execute_query_expect_failure(
|
|
self.client, "describe %s.tbl_%d" % (unique_database, i))
|
|
|
|
###########################################################################
|
|
# 4. Test setting back the last synced event id after pausing EP
|
|
self._exec_and_check_ep_cmd(":event_processor('pause')", "PAUSED")
|
|
# Restart to the previous last synced event id to process the missing HMS events
|
|
self._exec_and_check_ep_cmd(
|
|
":event_processor('start', %d)" % last_synced_event_id, "ACTIVE")
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
# Tables should be visible now
|
|
for i in range(3):
|
|
self.execute_query_expect_success(
|
|
self.client, "describe %s.tbl_%d" % (unique_database, i))
|
|
|
|
###########################################################################
|
|
# 5. Test unknown commands
|
|
e = self.execute_query_expect_failure(self.client, ":event_processor('bad_cmd')")
|
|
assert "Unknown command: BAD_CMD. Supported commands: PAUSE, START, STATUS" in str(e)
|
|
|
|
###########################################################################
|
|
# 6. Test illegal event id
|
|
e = self.execute_query_expect_failure(self.client, ":event_processor('start', -2)")
|
|
assert "Illegal event id -2. Should be >= -1" in str(e)
|
|
|
|
###########################################################################
|
|
# 7. Test restarting on a future event id
|
|
cmd_output = self._exec_and_check_ep_cmd(
|
|
":event_processor('start', %d)" % (latest_event_id + 2), "ACTIVE")
|
|
warning = ("Target event id %d is larger than the latest event id %d. Some future "
|
|
"events will be skipped.") % (latest_event_id + 2, latest_event_id)
|
|
assert warning in cmd_output
|
|
# The cleanup method will drop 'unique_database' and tables in it, which generates
|
|
# more than 2 self-events. It's OK for EP to skip them.
|
|
|
|
|
|
@SkipIfFS.hive
|
|
class TestEventSyncWaiting(ImpalaTestSuite):
|
|
"""Verify query option sync_hms_events_wait_time_s should protect the query by
|
|
waiting until Impala sync the HMS changes."""
|
|
|
|
@classmethod
|
|
def get_workload(cls):
|
|
return 'functional-planner'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestEventSyncWaiting, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
|
|
add_mandatory_exec_option(cls, 'sync_hms_events_wait_time_s', PROCESSING_TIMEOUT_S)
|
|
add_mandatory_exec_option(cls, 'sync_hms_events_strict_mode', True)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_event_processor_pauses(self, vector, unique_database):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
tbl = unique_database + ".foo"
|
|
|
|
# Create a table in Hive and submit a query on it when EP is paused.
|
|
client.execute(":event_processor('pause')")
|
|
self.run_stmt_in_hive("create table {} as select 1".format(tbl))
|
|
|
|
# execute_async() is not really async that it returns after query planning finishes.
|
|
# So we use execute_query_expect_success here and resume EP in a background thread.
|
|
def resume_event_processor():
|
|
time.sleep(2)
|
|
client = self.create_impala_client_from_vector(vector)
|
|
client.execute(":event_processor('start')")
|
|
resume_ep_thread = threading.Thread(target=resume_event_processor)
|
|
resume_ep_thread.start()
|
|
res = self.execute_query_expect_success(client, "select * from " + tbl)
|
|
assert res.data == ['1']
|
|
|
|
def test_describe(self, vector, unique_database):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
tbl_name = unique_database + ".tbl"
|
|
# Test DESCRIBE on new table created in Hive
|
|
self.run_stmt_in_hive(
|
|
"create table {0} (i int) partitioned by (p int)".format(tbl_name))
|
|
res = self.execute_query_expect_success(client, "describe " + tbl_name)
|
|
assert res.data == ["i\tint\t", 'p\tint\t']
|
|
assert res.log == ''
|
|
self.__verify_profile_timeline(res.runtime_profile)
|
|
|
|
def test_show_tables(self, vector, unique_database):
|
|
# Test SHOW TABLES gets new tables created in Hive
|
|
client = self.create_impala_client_from_vector(vector)
|
|
tbl_name = unique_database + ".tbl"
|
|
self.run_stmt_in_hive("create table {0} (i int)".format(tbl_name))
|
|
res = self.execute_query_expect_success(client, "show tables in " + unique_database)
|
|
assert res.data == ["tbl"]
|
|
assert res.log == ''
|
|
self.__verify_profile_timeline(res.runtime_profile)
|
|
|
|
# Test SHOW VIEWS gets new views created in Hive
|
|
self.run_stmt_in_hive(
|
|
"create view {0}.v as select * from {1}".format(unique_database, tbl_name))
|
|
res = self.execute_query_expect_success(client, "show views in " + unique_database)
|
|
assert res.data == ["v"]
|
|
assert res.log == ''
|
|
self.__verify_profile_timeline(res.runtime_profile)
|
|
|
|
def test_drop_created_table(self, vector, unique_database):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
self.run_stmt_in_hive("create table {0}.tbl(i int)".format(unique_database))
|
|
self.execute_query_expect_success(
|
|
client, "drop table {0}.tbl".format(unique_database))
|
|
|
|
def test_insert_under_missing_db(self, vector, unique_name):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
db = unique_name + "_db"
|
|
try:
|
|
# Create the table in Hive immediately after creating the db. So it's more likely
|
|
# that when the INSERT is submitted, the db is still missing in Impala.
|
|
self.run_stmt_in_hive("""create database {0};
|
|
create table {0}.tbl(i int)""".format(db))
|
|
self.execute_query_expect_success(
|
|
client, "insert into {0}.tbl values (0)".format(db))
|
|
finally:
|
|
self.run_stmt_in_hive(
|
|
"drop database if exists {0} cascade".format(db))
|
|
|
|
def test_show_databases(self, vector, unique_database):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
res = self.execute_query_expect_success(client, "show databases")
|
|
assert unique_database + "\t" in res.data
|
|
assert unique_database + "_2\t" not in res.data
|
|
self.__verify_profile_timeline(res.runtime_profile)
|
|
|
|
# Create a new db in Hive
|
|
self.run_stmt_in_hive("create database {0}_2".format(unique_database))
|
|
res = self.execute_query_expect_success(client, "show databases")
|
|
assert unique_database + "\t" in res.data
|
|
assert unique_database + "_2\t" in res.data
|
|
self.__verify_profile_timeline(res.runtime_profile)
|
|
|
|
# Drop the new db in Hive
|
|
self.run_stmt_in_hive("drop database {0}_2".format(unique_database))
|
|
res = self.execute_query_expect_success(client, "show databases")
|
|
assert unique_database + "\t" in res.data
|
|
assert unique_database + "_2\t" not in res.data
|
|
self.__verify_profile_timeline(res.runtime_profile)
|
|
|
|
def test_drop_db(self, vector, unique_name):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
db = unique_name + "_db"
|
|
try:
|
|
self.run_stmt_in_hive("create database {0}".format(db))
|
|
self.execute_query_expect_success(client, "drop database {0}".format(db))
|
|
finally:
|
|
self.run_stmt_in_hive(
|
|
"drop database if exists {0} cascade".format(db))
|
|
|
|
def test_describe_db(self, vector, unique_name):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
db = unique_name + "_db"
|
|
try:
|
|
self.run_stmt_in_hive("create database {0}".format(db))
|
|
self.execute_query_expect_success(
|
|
client, "describe database {0}".format(db))
|
|
finally:
|
|
self.run_stmt_in_hive("drop database if exists {0}".format(db))
|
|
|
|
def test_show_functions(self, vector, unique_name):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
db = unique_name + "_db"
|
|
try:
|
|
self.run_stmt_in_hive("create database {0}".format(db))
|
|
self.execute_query_expect_success(
|
|
client, "show functions in {0}".format(db))
|
|
finally:
|
|
self.run_stmt_in_hive("drop database if exists {0}".format(db))
|
|
|
|
def test_hive_insert(self, vector, unique_database):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
# Create a partitioned table and DESCRIBE it to make it loaded.
|
|
tbl_name = unique_database + ".tbl"
|
|
self.execute_query("create table {0}(i int) partitioned by(p int)".format(tbl_name))
|
|
self.execute_query("describe " + tbl_name)
|
|
# Test SELECT gets new values inserted by Hive
|
|
self.run_stmt_in_hive(
|
|
"insert into table {0} partition (p=0) select 0".format(tbl_name))
|
|
res = self.execute_query_expect_success(client, "select * from " + tbl_name)
|
|
assert res.data == ["0\t0"]
|
|
assert res.log == ''
|
|
self.__verify_profile_timeline(res.runtime_profile)
|
|
# Same case but using INSERT OVERWRITE in Hive
|
|
self.run_stmt_in_hive(
|
|
"insert overwrite table {0} partition (p=0) select 1".format(tbl_name))
|
|
res = self.execute_query_expect_success(client, "select * from " + tbl_name)
|
|
assert res.data == ["1\t0"]
|
|
assert res.log == ''
|
|
self.__verify_profile_timeline(res.runtime_profile)
|
|
|
|
# Test SHOW PARTITIONS gets new partitions created by Hive
|
|
self.run_stmt_in_hive(
|
|
"insert into table {0} partition (p=2) select 2".format(tbl_name))
|
|
res = self.execute_query_expect_success(client, "show partitions " + tbl_name)
|
|
assert self.has_value('p=0', res.data)
|
|
assert self.has_value('p=2', res.data)
|
|
# 3 result lines: 2 for partitions, 1 for total info
|
|
assert len(res.data) == 3
|
|
assert res.log == ''
|
|
self.__verify_profile_timeline(res.runtime_profile)
|
|
|
|
def test_create_dropped_db(self, vector, unique_name):
|
|
"""Test CREATE DATABASE on db dropped by Hive.
|
|
Use unique_name instead of unique_database to avoid cleanup failure overwriting
|
|
the real test failure, i.e. when the test fails, the db probably doesn't exist
|
|
so cleanup of unique_database will also fail."""
|
|
client = self.create_impala_client_from_vector(vector)
|
|
db = unique_name + "_db"
|
|
self.execute_query("create database " + db)
|
|
try:
|
|
self.run_stmt_in_hive("drop database " + db)
|
|
res = self.execute_query_expect_success(client, "create database " + db)
|
|
self.__verify_profile_timeline(res.runtime_profile)
|
|
finally:
|
|
self.execute_query("drop database if exists {} cascade".format(db))
|
|
|
|
def test_create_dropped_table(self, vector, unique_database):
|
|
"""Test CREATE TABLE on table dropped by Hive"""
|
|
client = self.create_impala_client_from_vector(vector)
|
|
# Create a table and DESCRIBE it to make it loaded
|
|
tbl_name = unique_database + ".tbl"
|
|
self.execute_query_expect_success(client, "create table {0} (i int)".format(tbl_name))
|
|
res = self.execute_query_expect_success(client, "describe " + tbl_name)
|
|
assert res.data == ["i\tint\t"]
|
|
# Drop it in Hive and re-create it in Impala using a new schema
|
|
self.run_stmt_in_hive("drop table " + tbl_name)
|
|
self.execute_query_expect_success(client, "create table {0} (j int)".format(tbl_name))
|
|
res = self.execute_query_expect_success(client, "describe " + tbl_name)
|
|
assert res.data == ["j\tint\t"]
|
|
assert res.log == ''
|
|
self.__verify_profile_timeline(res.runtime_profile)
|
|
|
|
def __verify_profile_timeline(self, profile):
|
|
self.verify_timeline_item(
|
|
"Query Compilation", "Synced events from Metastore", profile)
|
|
|
|
def test_multiple_tables(self, vector, unique_database):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
for i in range(3):
|
|
self.execute_query("create table {0}.tbl{1} (i int)".format(unique_database, i))
|
|
res = self.execute_query_expect_success(client, """
|
|
select t1.i from {0}.tbl0 t0, {0}.tbl1 t1, {0}.tbl2 t2
|
|
where t0.i = t1.i and t1.i = t2.i""".format(unique_database))
|
|
assert len(res.data) == 0
|
|
|
|
for i in range(3):
|
|
self.run_stmt_in_hive("insert into table {0}.tbl{1} select 1".format(
|
|
unique_database, i))
|
|
res = self.execute_scalar_expect_success(client, """
|
|
select t1.i from {0}.tbl0 t0, {0}.tbl1 t1, {0}.tbl2 t2
|
|
where t0.i = t1.i and t1.i = t2.i""".format(unique_database))
|
|
assert res == "1"
|
|
|
|
def test_view(self, vector, unique_database):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
tbl = unique_database + ".foo"
|
|
view = unique_database + ".foo_view"
|
|
count_stmt = "select count(*) from {}".format(view)
|
|
self.execute_query("create table {}(i int)".format(tbl))
|
|
self.execute_query("create view {} as select * from {}".format(view, tbl))
|
|
# Run a query to make the metadata loaded so they can be stale later.
|
|
res = self.execute_scalar(count_stmt)
|
|
assert res == '0'
|
|
|
|
# Modify the table in Hive and read the view in Impala
|
|
self.run_stmt_in_hive("insert into {} select 1".format(tbl))
|
|
res = self.execute_query_expect_success(client, count_stmt)
|
|
assert res.data[0] == '1'
|
|
|
|
# Modify the view in Hive and read it in Impala
|
|
self.run_stmt_in_hive(
|
|
"alter view {} as select * from {} where i > 1".format(view, tbl))
|
|
res = self.execute_query_expect_success(client, count_stmt)
|
|
assert res.data[0] == '0'
|
|
|
|
def test_view_partitioned(self, vector, unique_database):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
tbl = unique_database + ".foo"
|
|
view = unique_database + ".foo_view"
|
|
select_stmt = "select * from " + view
|
|
self.execute_query("create table {}(i int) partitioned by(p int)".format(tbl))
|
|
self.execute_query("create view {} as select * from {} where p>0".format(view, tbl))
|
|
res = self.execute_query_expect_success(client, select_stmt)
|
|
assert len(res.data) == 0
|
|
|
|
# Ingest data in Hive and read the view in Impala
|
|
# Add a new partition that will be filtered out by the view
|
|
self.run_stmt_in_hive("insert into {} select 0, 0".format(tbl))
|
|
res = self.execute_query_expect_success(client, select_stmt)
|
|
assert len(res.data) == 0
|
|
# Add a new partition that will show up in the view
|
|
self.run_stmt_in_hive("insert into {} select 1, 1".format(tbl))
|
|
res = self.execute_scalar_expect_success(client, select_stmt)
|
|
assert res == '1\t1'
|
|
# Add a new partition and alter the view to only show it
|
|
self.run_stmt_in_hive("insert into {} select 2, 2".format(tbl))
|
|
self.run_stmt_in_hive("alter view {} as select * from {} where p>1".format(view, tbl))
|
|
res = self.execute_scalar_expect_success(client, select_stmt)
|
|
assert res == '2\t2'
|
|
|
|
def test_compute_stats(self, vector, unique_database):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
tbl = unique_database + ".foo"
|
|
self.execute_query("create table {}(i int) partitioned by(p int)".format(tbl))
|
|
# Add one partition in Hive and compute incremental stats on that partition in Impala
|
|
self.run_stmt_in_hive("insert into {} select 0,0".format(tbl))
|
|
res = self.execute_query_expect_success(
|
|
client, "compute incremental stats {} partition(p=0)".format(tbl))
|
|
assert res.data == ['Updated 1 partition(s) and 1 column(s).']
|
|
# Add one partition in Hive and compute incremental stats on that table in Impala
|
|
self.run_stmt_in_hive("insert into {} select 1,1 union all select 2,2".format(tbl))
|
|
res = self.execute_query_expect_success(
|
|
client, "compute incremental stats {}".format(tbl))
|
|
assert res.data == ['Updated 2 partition(s) and 1 column(s).']
|
|
# Drop two partitions in Hive and compute stats on that table in Impala. The
|
|
# incremental stats will be replaced with non-incremental stats so the remaining
|
|
# partition is updated.
|
|
self.run_stmt_in_hive("alter table {} drop partition(p<2)".format(tbl))
|
|
res = self.execute_query_expect_success(
|
|
client, "compute stats {}".format(tbl))
|
|
assert res.data == ['Updated 1 partition(s) and 1 column(s).']
|
|
|
|
def test_ctas(self, vector, unique_database):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
tbl = unique_database + ".foo"
|
|
tmp_tbl = unique_database + ".tmp"
|
|
self.execute_query("create table {}(i int) partitioned by(p int)".format(tbl))
|
|
# Add one partition in Hive and use the table in Impala
|
|
self.run_stmt_in_hive("insert into {} select 0,0".format(tbl))
|
|
res = self.execute_query_expect_success(
|
|
client, "create table {} as select * from {}".format(tmp_tbl, tbl))
|
|
assert res.data == ['Inserted 1 row(s)']
|
|
# Insert one row into the same partition in Hive and use the table in Impala
|
|
self.run_stmt_in_hive("insert into {} select 1,0".format(tbl))
|
|
res = self.execute_query_expect_success(
|
|
client, "create table {}_2 as select * from {}".format(tmp_tbl, tbl))
|
|
assert res.data == ['Inserted 2 row(s)']
|
|
# Truncate the table in Hive and use it in Impala
|
|
self.run_stmt_in_hive("truncate table {}".format(tbl))
|
|
res = self.execute_query_expect_success(
|
|
client, "create table {}_3 as select * from {}".format(tmp_tbl, tbl))
|
|
assert res.data == ['Inserted 0 row(s)']
|
|
|
|
# Create a table in Hive before CTAS of it in Impala
|
|
self.run_stmt_in_hive("create table {}_4(i int) partitioned by(p int)".format(tbl))
|
|
exception = self.execute_query_expect_failure(
|
|
client, "create table {}_4 as select 1,1".format(tbl))
|
|
assert 'Table already exists: {}_4'.format(tbl) in str(exception)
|
|
|
|
def test_impala_insert(self, vector, unique_database):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
tbl = unique_database + ".foo"
|
|
tmp_tbl = unique_database + ".tmp"
|
|
self.execute_query("create table {}(i int) partitioned by(p int)".format(tbl))
|
|
self.execute_query("create table {}(i int) partitioned by(p int)".format(tmp_tbl))
|
|
insert_stmt = "insert into {} partition (p) select * from {}".format(tmp_tbl, tbl)
|
|
# Add one partition in Hive and use the table in INSERT in Impala
|
|
self.run_stmt_in_hive("insert into {} select 0,0".format(tbl))
|
|
res = self.execute_query_expect_success(client, insert_stmt)
|
|
# Result rows are "partition_name: num_rows_inserted" for each modified partitions
|
|
assert 'Partition: p=0\nNumModifiedRows: 1\n' in res.runtime_profile
|
|
# Insert one row into the same partition in Hive and use the table in INSERT in Impala
|
|
self.run_stmt_in_hive("insert into {} select 1,0".format(tbl))
|
|
res = self.execute_query_expect_success(client, insert_stmt)
|
|
assert 'Partition: p=0\nNumModifiedRows: 2\n' in res.runtime_profile
|
|
# Add another new partition in Hive and use the table in INSERT in Impala
|
|
self.run_stmt_in_hive("insert into {} select 2,2".format(tbl))
|
|
res = self.execute_query_expect_success(client, insert_stmt)
|
|
assert 'Partition: p=0\nNumModifiedRows: 2\n' in res.runtime_profile
|
|
assert 'Partition: p=2\nNumModifiedRows: 1\n' in res.runtime_profile
|
|
# Drop one partition in Hive and use the table in INSERT in Impala
|
|
self.run_stmt_in_hive("alter table {} drop partition(p=0)".format(tbl))
|
|
res = self.execute_query_expect_success(client, insert_stmt)
|
|
assert 'Partition: p=2\nNumModifiedRows: 1\n' in res.runtime_profile
|
|
# Truncate the table in Hive and use it in INSERT in Impala
|
|
self.run_stmt_in_hive("truncate table {}".format(tbl))
|
|
res = self.execute_query_expect_success(client, insert_stmt)
|
|
assert 'NumModifiedRows:' not in res.runtime_profile
|
|
|
|
def test_txn(self, vector, unique_database):
|
|
client = self.create_impala_client_from_vector(vector)
|
|
tbl = unique_database + ".foo"
|
|
self.run_stmt_in_hive(
|
|
"create transactional table {}(i int) partitioned by(p int)".format(tbl))
|
|
# Load the table in Impala
|
|
self.execute_query_expect_success(client, "describe " + tbl)
|
|
# Insert the table in Hive and check it in Impala immediately
|
|
self.run_stmt_in_hive("insert into {} select 0,0".format(tbl))
|
|
res = self.execute_query_expect_success(client, "select * from " + tbl)
|
|
assert res.data == ['0\t0']
|
|
# Insert the table in Hive again and check number of rows in Impala
|
|
self.run_stmt_in_hive("insert into {} select 1,0".format(tbl))
|
|
res = self.execute_query_expect_success(client, "select count(*) from " + tbl)
|
|
assert res.data == ['2']
|
|
res = self.execute_query_expect_success(client, "show files in " + tbl)
|
|
assert len(res.data) == 2
|
|
# Trigger compaction in Hive
|
|
self.run_stmt_in_hive(
|
|
"alter table {} partition(p=0)compact 'minor' and wait".format(tbl))
|
|
res = self.execute_query_expect_success(client, "show files in " + tbl)
|
|
assert len(res.data) == 1
|
|
|
|
def test_hms_event_sync_on_deletion(self, vector, unique_name):
|
|
"""Regression test for IMPALA-13829: TWaitForHmsEventResponse not able to collect
|
|
removed objects due to their items in deleteLog being GCed."""
|
|
client = self.create_impala_client_from_vector(vector)
|
|
# Set a sleep time so catalogd has time to GC the deleteLog.
|
|
client.execute("set debug_action='collect_catalog_results_delay:SLEEP@1000'")
|
|
db = unique_name + "_db"
|
|
tbl = db + ".foo"
|
|
self.execute_query("drop database if exists {} cascade".format(db))
|
|
self.execute_query("drop database if exists {}_2 cascade".format(db))
|
|
self.execute_query("create database {}".format(db))
|
|
self.execute_query("create database {}_2".format(db))
|
|
# Create HMS Thrift clients to drop db/tables in the fastest way
|
|
hive_clients = []
|
|
hive_transports = []
|
|
for _ in range(2):
|
|
c, t = ImpalaTestSuite.create_hive_client()
|
|
hive_clients.append(c)
|
|
hive_transports.append(t)
|
|
|
|
try:
|
|
# Drop 2 dbs concurrently. So their DROP_DATABASE events are processed together (in
|
|
# the same event batch). We need more than one db to be dropped so one of them in
|
|
# catalogd's deleteLog can be GCed since its version < latest catalog version.
|
|
# Note that this is no longer the way catalogd GCs the deleteLog after IMPALA-13829,
|
|
# but it can be used to trigger the issue before this fix.
|
|
def drop_db_in_hive(i, db_name):
|
|
hive_clients[i].drop_database(db_name, deleteData=True, cascade=True)
|
|
LOG.info("Dropped database {} in Hive".format(db_name))
|
|
ts = [threading.Thread(target=drop_db_in_hive, args=params)
|
|
for params in [[0, db], [1, db + "_2"]]]
|
|
for t in ts:
|
|
t.start()
|
|
for t in ts:
|
|
t.join()
|
|
client.execute("create database " + db)
|
|
|
|
self.execute_query("create table {}(i int)".format(tbl))
|
|
self.execute_query("create table {}_2(i int)".format(tbl))
|
|
|
|
# Drop 2 tables concurrently. So their DROP_TABLE events are processed together (in
|
|
# the same event batch). We need more than one table to be dropped so one of them in
|
|
# catalogd's deleteLog can be GCed since its version < latest catalog version.
|
|
# Note that this is no longer the way catalogd GCs the deleteLog after IMPALA-13829,
|
|
# but it can be used to trigger the issue before this fix.
|
|
def drop_table_in_hive(i, tbl_name):
|
|
hive_clients[i].drop_table(db, tbl_name, deleteData=True)
|
|
LOG.info("Dropped table {}.{} in Hive".format(db, tbl_name))
|
|
ts = [threading.Thread(target=drop_table_in_hive, args=params)
|
|
for params in [[0, "foo"], [1, "foo_2"]]]
|
|
for t in ts:
|
|
t.start()
|
|
for t in ts:
|
|
t.join()
|
|
client.execute("create table {}(i int)".format(tbl))
|
|
finally:
|
|
for t in hive_transports:
|
|
t.close()
|
|
self.execute_query("drop database if exists {} cascade".format(db))
|
|
self.execute_query("drop database if exists {}_2 cascade".format(db))
|
|
|
|
|
|
class TestSelfRenameEvent(ImpalaTestSuite):
|
|
@pytest.mark.execute_serially
|
|
def test_self_rename_events(self, unique_database):
|
|
"""Regression test for IMPALA-14307"""
|
|
try:
|
|
catalogd = ImpalaCluster.get_e2e_test_cluster().catalogd
|
|
self.execute_query("create table {}.tbl_a(i int)".format(unique_database))
|
|
# Wait until the CREATE_DATABASE and CREATE_TABLE events are skipped.
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
self.execute_query(
|
|
"alter table {0}.tbl_a rename to {0}.tbl_b".format(unique_database))
|
|
self.execute_query(":event_processor('pause')")
|
|
|
|
with self.create_impala_client() as alter_client:
|
|
version_after_create = catalogd.service.get_catalog_version()
|
|
alter_client.set_configuration(
|
|
{"debug_action": "catalogd_table_rename_delay:SLEEP@6000"})
|
|
alter_handle = alter_client.execute_async(
|
|
"alter table {0}.tbl_b rename to {0}.tbl_a".format(unique_database))
|
|
alter_client.wait_for_admission_control(alter_handle, timeout_s=10)
|
|
# Wait for at most 10 second until catalogd increase the version for rename
|
|
# operation. This indicates the rename starts.
|
|
start_time = time.time()
|
|
while (time.time() - start_time < 10.0
|
|
and catalogd.service.get_catalog_version() <= version_after_create):
|
|
time.sleep(0.05)
|
|
# Sleep to let catalogd sends the alter_table HMS RPC
|
|
time.sleep(1)
|
|
# Invalidate tbl_b to remove it in catalog
|
|
self.execute_query("invalidate metadata {}.tbl_b".format(unique_database))
|
|
alter_client.wait_for_finished_timeout(alter_handle, timeout=10)
|
|
alter_client.close_query(alter_handle)
|
|
|
|
# Resume event processing. The first ALTER_TABLE RENAME events should be skipped.
|
|
events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
self.execute_query(":event_processor('start')")
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
assert events_skipped_after == events_skipped_before + 2
|
|
finally:
|
|
# Recover event processing to avoid impacting other tests
|
|
self.execute_query(":event_processor('start')")
|