mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
'impala.disableHmsSync' FEATURE: Implement global 'disable_hms_sync_by_default' flag for event processing. This change introduces a new catalogd startup flag, `disable_hms_sync_by_default`, to simplify skipping/processing events. Problem: Disabling event processing globally requires tedious process of setting 'impala.disableHmsSync' property on every database and table, especially if few specific tables requires sync up of events. Solution: The new flag provides a global default for the 'impala.disableHmsSync' property. Behavior: - If `disable_hms_sync_by_default` is true (the intended default-off state), event processing is skipped for all tables/databases unless the property "impala.disableHmsSync"="false" is explicitly set. - This allows users to easily keep event processing off by default and opt-in specific databases or tables to start syncing. - The check order is: table-property > db-property > global default. - HMS polling remains independent and unaffected by this flag. Change-Id: I4ee617aed48575502d9cf5cf2cbea6ec897d6839 Reviewed-on: http://gerrit.cloudera.org:8080/23487 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2038 lines
106 KiB
Python
2038 lines
106 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
from __future__ import absolute_import, division, print_function
|
|
from builtins import range
|
|
import logging
|
|
import os
|
|
import pytest
|
|
import re
|
|
from os import getenv
|
|
from time import sleep
|
|
|
|
from impala_thrift_gen.hive_metastore.ttypes import FireEventRequest
|
|
from impala_thrift_gen.hive_metastore.ttypes import FireEventRequestData
|
|
from impala_thrift_gen.hive_metastore.ttypes import InsertEventRequestData
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.common.impala_connection import ERROR, FINISHED
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.common.parametrize import UniqueDatabase
|
|
from tests.common.skip import SkipIf, SkipIfFS, SkipIfCdpHive
|
|
from tests.common.test_dimensions import add_exec_option_dimension
|
|
from tests.util.acid_txn import AcidTxn
|
|
from tests.util.hive_utils import HiveDbWrapper
|
|
from tests.util.event_processor_utils import EventProcessorUtils
|
|
from tests.util.filesystem_utils import WAREHOUSE
|
|
from tests.util.iceberg_util import IcebergCatalogs
|
|
|
|
HIVE_SITE_HOUSEKEEPING_ON =\
|
|
getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-housekeeping-on'
|
|
HIVE_SITE_ALTER_PARTITIONS_EVENT =\
|
|
getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-events-config'
|
|
TRUNCATE_TBL_STMT = 'truncate table'
|
|
# The statestore heartbeat and topic update frequency (ms). Set low for testing.
|
|
STATESTORE_RPC_FREQUENCY_MS = 100
|
|
STATESTORED_ARGS = (
|
|
"-statestore_heartbeat_frequency_ms={freq_ms} "
|
|
"-statestore_priority_update_frequency_ms={freq_ms}").format(
|
|
freq_ms=STATESTORE_RPC_FREQUENCY_MS)
|
|
EVENT_SYNC_QUERY_OPTIONS = {
|
|
"sync_hms_events_wait_time_s": 10,
|
|
"sync_hms_events_strict_mode": True
|
|
}
|
|
|
|
|
|
def wait_statestore_heartbeat(num_heartbeat=1):
|
|
"""Wait for state sync across impalads."""
|
|
assert num_heartbeat > 0
|
|
sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0 * num_heartbeat)
|
|
|
|
|
|
class TestEventProcessingCustomConfigsBase(CustomClusterTestSuite):
|
|
"""This the base class for other test classes in this file."""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestEventProcessingCustomConfigsBase, cls).add_test_dimensions()
|
|
|
|
def _run_self_events_test(self, db_name, options, use_impala):
|
|
"""Runs multiple queries which generate events and makes
|
|
sure that tables and partitions are not refreshed the queries are run from Impala.
|
|
If the queries are run from Hive, we make sure that the tables and partitions are
|
|
refreshed"""
|
|
recover_tbl_name = ImpalaTestSuite.get_random_name("tbl_")
|
|
# create a table similar to alltypes so that we can recover the partitions on it
|
|
# later in one of the test queries
|
|
alltypes_tab_location = self._get_tbl_location("functional", "alltypes")
|
|
self.execute_query(
|
|
"create external table {0}.{1} like functional.alltypes location '{2}'"
|
|
.format(db_name, recover_tbl_name, alltypes_tab_location), options)
|
|
self.execute_query("refresh {0}.{1}".format(db_name, recover_tbl_name), options)
|
|
if use_impala:
|
|
queries = self._get_impala_test_queries(db_name, options, recover_tbl_name)
|
|
# some queries do not trigger self-event evaluation (creates and drops) however,
|
|
# its still good to confirm that we don't do unnecessary refreshes in such cases
|
|
# For such queries we use a different metrics events-skipped to confirm that these
|
|
# events are skipped.
|
|
for stmt in queries[False]:
|
|
self._exec_sql_and_check_selfevent_counter(stmt, options, use_impala, False)
|
|
# All the queries with True key should confirm that the events-skipped counter
|
|
# is also incremented
|
|
for stmt in queries[True]:
|
|
self._exec_sql_and_check_selfevent_counter(stmt, options, use_impala)
|
|
else:
|
|
queries = self._get_hive_test_queries(db_name, options, recover_tbl_name)
|
|
for stmt in queries:
|
|
self._exec_sql_and_check_selfevent_counter(stmt, options, use_impala)
|
|
|
|
def _get_impala_test_queries(self, db_name, options, recover_tbl_name):
|
|
tbl_name = ImpalaTestSuite.get_random_name("tbl_")
|
|
acid_tbl_name = ImpalaTestSuite.get_random_name("acid_tbl_")
|
|
acid_no_part_tbl_name = ImpalaTestSuite.get_random_name("acid_no_part_tbl_")
|
|
tbl2 = ImpalaTestSuite.get_random_name("tbl_")
|
|
view_name = ImpalaTestSuite.get_random_name("view_")
|
|
view2 = ImpalaTestSuite.get_random_name("view_")
|
|
# create a empty table for both partitioned and unpartitioned case for testing insert
|
|
# events
|
|
empty_unpartitioned_tbl = ImpalaTestSuite.get_random_name("empty_unpart_tbl_")
|
|
empty_partitioned_tbl = ImpalaTestSuite.get_random_name("empty_parttbl_")
|
|
self.execute_query(
|
|
"create table {0}.{1} (c1 int)".format(db_name, empty_unpartitioned_tbl),
|
|
options)
|
|
self.execute_query(
|
|
"create table {0}.{1} (c1 int) partitioned by (part int)"
|
|
.format(db_name, empty_partitioned_tbl), options)
|
|
acid_props = self._get_transactional_tblproperties(True)
|
|
self_event_test_queries = {
|
|
# Queries which will increment the events-skipped counter
|
|
True: [
|
|
# ALTER_DATABASE case
|
|
"comment on database {0} is 'self-event test database'".format(db_name),
|
|
"alter database {0} set owner user `test-user`".format(db_name),
|
|
"create function {0}.f() returns int location '{1}/libTestUdfs.so' "
|
|
"symbol='NoArgs'".format(db_name, WAREHOUSE),
|
|
"drop function {0}.f()".format(db_name),
|
|
# ALTER_TABLE case
|
|
"alter table {0}.{1} set TBLPROPERTIES ('k'='v')".format(db_name, tbl_name),
|
|
"alter table {0}.{1} ADD COLUMN c1 int".format(db_name, tbl_name),
|
|
"alter table {0}.{1} ALTER COLUMN C1 set comment 'c1 comment'".format(db_name,
|
|
tbl_name),
|
|
"comment on table {0}.{1} IS 'table level comment'".format(db_name, tbl_name),
|
|
"comment on column {0}.{1}.C1 IS 'column level comment'".format(db_name,
|
|
tbl_name),
|
|
"alter table {0}.{1} ADD COLUMNS (c2 int, c3 string)".format(db_name, tbl_name),
|
|
"alter table {0}.{1} DROP COLUMN c1".format(db_name, tbl_name),
|
|
"alter table {0}.{1} DROP COLUMN c2".format(db_name, tbl_name),
|
|
"alter table {0}.{1} DROP COLUMN c3".format(db_name, tbl_name),
|
|
"alter table {0}.{1} set owner user `test-user`".format(db_name, tbl_name),
|
|
"alter table {0}.{1} set owner role `test-role`".format(db_name, tbl_name),
|
|
"alter view {0}.{1} set owner user `test-view-user`".format(db_name, view_name),
|
|
"alter view {0}.{1} set owner role `test-view-role`".format(db_name, view_name),
|
|
# compute stats will generates ALTER_PARTITION
|
|
"compute stats {0}.{1}".format(db_name, tbl_name),
|
|
"compute incremental stats {0}.{1}".format(db_name, tbl_name),
|
|
"drop stats {0}.{1}".format(db_name, tbl_name),
|
|
# insert into a existing partition; generates INSERT self-event
|
|
"insert into table {0}.{1} partition "
|
|
"(year, month) select * from functional.alltypessmall where year=2009 "
|
|
"and month=1".format(db_name, tbl_name),
|
|
# insert overwrite query from Impala also generates a INSERT self-event
|
|
"insert overwrite table {0}.{1} partition "
|
|
"(year, month) select * from functional.alltypessmall where year=2009 "
|
|
"and month=1".format(db_name, tbl_name)],
|
|
False: [
|
|
"create table {0}.{1} like functional.alltypessmall "
|
|
"stored as parquet".format(db_name, tbl_name),
|
|
"create view {0}.{1} as select * from functional.alltypessmall "
|
|
"where year=2009".format(db_name, view_name),
|
|
# in case of rename we process it as drop+create and hence
|
|
# the events-skipped counter is not updated. Instead if this event is processed,
|
|
# it will increment the tables-added and tables-removed counters.
|
|
"alter table {0}.{1} rename to {0}.{2}".format(db_name, tbl_name, tbl2),
|
|
"alter table {0}.{1} rename to {0}.{2}".format(db_name, tbl2, tbl_name),
|
|
"alter view {0}.{1} rename to {0}.{2}".format(db_name, view_name, view2),
|
|
"alter view {0}.{1} rename to {0}.{2}".format(db_name, view2, view_name),
|
|
# ADD_PARTITION cases
|
|
# dynamic partition insert (creates new partitions)
|
|
"insert into table {0}.{1} partition (year,month) "
|
|
"select * from functional.alltypessmall where month % 2 = 0".format(db_name,
|
|
tbl_name),
|
|
"insert overwrite table {0}.{1} partition (year,month) "
|
|
"select * from functional.alltypessmall where month % 2 = 1".format(db_name,
|
|
tbl_name),
|
|
# we add this statement below just to make sure that the subsequent statement is
|
|
# a no-op
|
|
"alter table {0}.{1} add if not exists partition (year=2100, month=1)".format(
|
|
db_name, tbl_name),
|
|
"alter table {0}.{1} add if not exists partition (year=2100, month=1)".format(
|
|
db_name, tbl_name),
|
|
# DROP_PARTITION cases
|
|
"alter table {0}.{1} drop if exists partition (year=2100, month=1)".format(
|
|
db_name, tbl_name),
|
|
# drop non-existing partition; essentially this is a no-op
|
|
"alter table {0}.{1} drop if exists partition (year=2100, month=1)".format(
|
|
db_name, tbl_name),
|
|
# empty table case where no insert events are generated
|
|
"insert overwrite {0}.{1} select * from {0}.{1}".format(
|
|
db_name, empty_unpartitioned_tbl),
|
|
"insert overwrite {0}.{1} partition(part) select * from {0}.{1}".format(
|
|
db_name, empty_partitioned_tbl),
|
|
# in case of ACID tables no INSERT event is generated as the COMMIT event
|
|
# contains the related data
|
|
"create table {0}.{1} (c1 int) {2}".format(db_name,
|
|
acid_no_part_tbl_name, acid_props),
|
|
# TODO: IMPALA-14305 will fix the transactional self-events detection
|
|
# "insert into table {0}.{1} values (1) ".format(db_name, acid_no_part_tbl_name),
|
|
# "insert overwrite table {0}.{1} select * from {0}.{1}".format(
|
|
# db_name, acid_no_part_tbl_name),
|
|
"{0} {1}.{2}".format(TRUNCATE_TBL_STMT, db_name, acid_no_part_tbl_name),
|
|
# the table is empty so the following insert adds 0 rows
|
|
# "insert overwrite table {0}.{1} select * from {0}.{1}".format(
|
|
# db_name, acid_no_part_tbl_name),
|
|
"create table {0}.{1} (c1 int) partitioned by (part int) {2}".format(db_name,
|
|
acid_tbl_name, acid_props),
|
|
# "insert into table {0}.{1} partition (part=1) "
|
|
# "values (1) ".format(db_name, acid_tbl_name),
|
|
# "insert into table {0}.{1} partition (part) select id, int_col "
|
|
# "from functional.alltypestiny".format(db_name, acid_tbl_name),
|
|
# repeat the same insert, now it writes to existing partitions
|
|
# "insert into table {0}.{1} partition (part) select id, int_col "
|
|
# "from functional.alltypestiny".format(db_name, acid_tbl_name),
|
|
# following insert overwrite is used instead of truncate, because truncate
|
|
# leads to a non-self event that reloads the table
|
|
# "insert overwrite table {0}.{1} partition (part) select id, int_col "
|
|
# "from functional.alltypestiny where id=-1".format(db_name, acid_tbl_name),
|
|
# "insert overwrite table {0}.{1} partition (part) select id, int_col "
|
|
# "from functional.alltypestiny".format(db_name, acid_tbl_name),
|
|
# "insert overwrite {0}.{1} partition(part) select * from {0}.{1}".format(
|
|
# db_name, acid_tbl_name),
|
|
# recover partitions will generate add_partition events
|
|
"alter table {0}.{1} recover partitions".format(db_name, recover_tbl_name),
|
|
# events processor doesn't process delete column stats events currently,
|
|
# however, in case of incremental stats, there could be alter table and
|
|
# alter partition events which should be ignored. Hence we run compute stats
|
|
# before to make sure that the truncate table command generated alter events
|
|
# are ignored.
|
|
"compute incremental stats {0}.{1}".format(db_name, tbl_name),
|
|
"{0} {1}.{2}".format(TRUNCATE_TBL_STMT, db_name, tbl_name)
|
|
]
|
|
}
|
|
return self_event_test_queries
|
|
|
|
def _get_hive_test_queries(self, db_name, options, recover_tbl_name):
|
|
tbl_name = ImpalaTestSuite.get_random_name("hive_test_tbl_")
|
|
tbl2 = ImpalaTestSuite.get_random_name("hive_renamed_tbl_")
|
|
view_name = ImpalaTestSuite.get_random_name("hive_view_")
|
|
# we use a custom table schema to make it easier to change columns later in the
|
|
# test_queries
|
|
self.execute_query("create table {0}.{1} (key int) partitioned by "
|
|
"(part int) stored as parquet".format(db_name, tbl_name),
|
|
options)
|
|
self.execute_query(
|
|
"create view {0}.{1} as select * from functional.alltypessmall where year=2009"
|
|
.format(db_name, view_name), options)
|
|
# events-processor only refreshes loaded tables, hence its important to issue a
|
|
# refresh here so that table is in loaded state
|
|
self.execute_query("refresh {0}.{1}".format(db_name, tbl_name), options)
|
|
self_event_test_queries = [
|
|
# ALTER_DATABASE cases
|
|
"alter database {0} set dbproperties ('comment'='self-event test "
|
|
"database')".format(db_name),
|
|
"alter database {0} set owner user `test-user`".format(db_name),
|
|
# ALTER_TABLE case
|
|
"alter table {0}.{1} set tblproperties ('k'='v')".format(db_name, tbl_name),
|
|
"alter table {0}.{1} add columns (value string)".format(db_name, tbl_name),
|
|
"alter table {0}.{1} set owner user `test-user`".format(db_name, tbl_name),
|
|
"alter table {0}.{1} set owner role `test-role`".format(db_name, tbl_name),
|
|
# need to set this config to make sure the dynamic partition insert works below
|
|
"set hive.exec.dynamic.partition.mode=nonstrict",
|
|
# ADD_PARTITION cases
|
|
"insert into table {0}.{1} partition (part=2009) "
|
|
"select id as key, string_col as value from functional.alltypessmall".format(
|
|
db_name, tbl_name),
|
|
# add partition
|
|
"alter table {0}.{1} add if not exists partition (part=1111)".format(
|
|
db_name, tbl_name),
|
|
# add existing partition; essentially this is a no-op
|
|
"alter table {0}.{1} add if not exists partition (part=1111)".format(
|
|
db_name, tbl_name),
|
|
# DROP_PARTITION cases
|
|
"alter table {0}.{1} drop if exists partition (part=1111)".format(
|
|
db_name, tbl_name),
|
|
# drop non-existing partition; essentially this is a no-op
|
|
"alter table {0}.{1} drop if exists partition (part=1111)".format(
|
|
db_name, tbl_name),
|
|
# compute stats will generates ALTER_PARTITION
|
|
"analyze table {0}.{1} compute statistics for columns".format(db_name, tbl_name),
|
|
"msck repair table {0}.{1}".format(db_name, recover_tbl_name),
|
|
# we rename in the end since impala will have the new table in unloaded
|
|
# state after rename and hence any events later will be ignored anyways.
|
|
"alter table {0}.{1} rename to {0}.{2}".format(db_name, tbl_name, tbl2),
|
|
"alter view {0}.{1} rename to {0}.{2}".format(db_name, view_name,
|
|
ImpalaTestSuite.get_random_name("view_")),
|
|
]
|
|
return self_event_test_queries
|
|
|
|
@staticmethod
|
|
def _get_self_event_metrics():
|
|
"""
|
|
Gets the tables-refreshed, partitions-refreshed and events-skipped metric values
|
|
from Metastore EventsProcessor
|
|
"""
|
|
tbls_refreshed_count = EventProcessorUtils.get_int_metric('tables-refreshed', 0)
|
|
partitions_refreshed_count = EventProcessorUtils.get_int_metric(
|
|
'partitions-refreshed', 0)
|
|
events_skipped_count = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
return int(tbls_refreshed_count), int(partitions_refreshed_count), \
|
|
int(events_skipped_count)
|
|
|
|
def _exec_sql_and_check_selfevent_counter(self, stmt, options, use_impala_client,
|
|
check_events_skipped_counter=True):
|
|
"""
|
|
Method runs a given query statement using a impala client or hive client based on the
|
|
argument use_impala_client and confirms if the self-event related counters are as
|
|
expected based on whether we expect a self-event or not. If the
|
|
check_self_event_counter is False it skips checking the events-skipped metric.
|
|
"""
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
tbls_refreshed, partitions_refreshed, \
|
|
events_skipped = self._get_self_event_metrics()
|
|
last_synced_event = EventProcessorUtils.get_last_synced_event_id()
|
|
logging.info("Running statement in {1}: {0}".format(stmt,
|
|
"impala" if use_impala_client else "hive"))
|
|
if not use_impala_client:
|
|
self.run_stmt_in_hive(stmt)
|
|
else:
|
|
self.execute_query(stmt, options)
|
|
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
tbls_refreshed_after, partitions_refreshed_after, \
|
|
events_skipped_after = self._get_self_event_metrics()
|
|
last_synced_event_after = EventProcessorUtils.get_last_synced_event_id()
|
|
# we assume that any event which comes due to stmts run from impala-client are
|
|
# self-events
|
|
logging.info(
|
|
"Event id before {0} event id after {1}".format(last_synced_event,
|
|
last_synced_event_after))
|
|
if use_impala_client:
|
|
# self-event counter must increase if this is a self-event if
|
|
# check_self_event_counter is set
|
|
# some of the test queries generate no events at all. If that is the case
|
|
# skip the below comparison
|
|
if last_synced_event_after > last_synced_event:
|
|
if check_events_skipped_counter:
|
|
assert events_skipped_after > events_skipped, \
|
|
"Failing query(impala={}): {}".format(use_impala_client, stmt)
|
|
# if this is a self-event, no table or partitions should be refreshed,
|
|
# except for "truncate table"
|
|
if (TRUNCATE_TBL_STMT not in stmt):
|
|
assert tbls_refreshed == tbls_refreshed_after, \
|
|
"Failing query(impala={}): {}".format(use_impala_client, stmt)
|
|
# TRUNCATE refreshes partitions
|
|
if (TRUNCATE_TBL_STMT not in stmt):
|
|
assert partitions_refreshed == partitions_refreshed_after, \
|
|
"Failing query(impala={}): {}".format(use_impala_client, stmt)
|
|
else:
|
|
# hive was used to run the stmts, any events generated should not have been deemed
|
|
# as self events unless there are empty partition add/drop events
|
|
assert events_skipped <= events_skipped_after
|
|
|
|
def _get_tbl_location(self, db_name, tbl_name):
|
|
assert self.hive_client is not None
|
|
return self.hive_client.get_table(db_name, tbl_name).sd.location
|
|
|
|
def _get_transactional_tblproperties(self, is_transactional):
|
|
"""
|
|
Util method to generate the tblproperties for transactional tables
|
|
"""
|
|
tblproperties = ""
|
|
if is_transactional:
|
|
tblproperties = "tblproperties ('transactional'='true'," \
|
|
"'transactional_properties'='insert_only')"
|
|
return tblproperties
|
|
|
|
|
|
@SkipIfFS.hive
|
|
class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
|
|
"""This class contains tests that exercise the event processing mechanism in the
|
|
catalog for non-default configurations"""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestEventProcessingCustomConfigs, cls).add_test_dimensions()
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--blacklisted_dbs=testBlackListedDb "
|
|
"--blacklisted_tables=functional_parquet.testBlackListedTbl",
|
|
catalogd_args="--blacklisted_dbs=testBlackListedDb "
|
|
"--blacklisted_tables=functional_parquet.testBlackListedTbl "
|
|
"--hms_event_polling_interval_s=1")
|
|
def test_events_on_blacklisted_objects(self):
|
|
"""Executes hive queries on blacklisted database and tables and makes sure that
|
|
event processor does not error out
|
|
"""
|
|
try:
|
|
event_id_before = EventProcessorUtils.get_last_synced_event_id()
|
|
# create a blacklisted database from hive and make sure event is ignored
|
|
self.run_stmt_in_hive("create database TESTblackListedDb")
|
|
# wait until all the events generated above are processed
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
assert EventProcessorUtils.get_last_synced_event_id() > event_id_before
|
|
# make sure that the blacklisted db is ignored
|
|
assert "TESTblackListedDb".lower() not in self.all_db_names()
|
|
|
|
event_id_before = EventProcessorUtils.get_last_synced_event_id()
|
|
self.run_stmt_in_hive("create table testBlackListedDb.testtbl (id int)")
|
|
# create a table on the blacklisted database with a different case
|
|
self.run_stmt_in_hive("create table TESTBLACKlISTEDDb.t2 (id int)")
|
|
self.run_stmt_in_hive(
|
|
"create table functional_parquet.testBlackListedTbl (id int, val string)"
|
|
" partitioned by (part int) stored as parquet")
|
|
self.run_stmt_in_hive(
|
|
"alter table functional_parquet.testBlackListedTbl add partition (part=1)")
|
|
# wait until all the events generated above are processed
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
assert EventProcessorUtils.get_last_synced_event_id() > event_id_before
|
|
# make sure that the black listed table is not created
|
|
table_names = self.client.execute("show tables in functional_parquet").get_data()
|
|
assert "testBlackListedTbl".lower() not in table_names
|
|
|
|
event_id_before = EventProcessorUtils.get_last_synced_event_id()
|
|
# generate a table level event with a different case
|
|
self.run_stmt_in_hive("drop table functional_parquet.TESTBlackListedTbl")
|
|
# wait until all the events generated above are processed
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
assert EventProcessorUtils.get_last_synced_event_id() > event_id_before
|
|
finally:
|
|
self.run_stmt_in_hive("drop database testBlackListedDb cascade")
|
|
self.run_stmt_in_hive("drop table functional_parquet.testBlackListedTbl")
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=10")
|
|
def test_drop_table_events(self):
|
|
"""IMPALA-10187: Event processing fails on multiple events + DROP TABLE.
|
|
This test issues ALTER TABLE + DROP in quick succession and checks whether event
|
|
processing still works.
|
|
"""
|
|
event_proc_timeout = 15
|
|
db_name = ImpalaTestSuite.get_random_name("drop_event_db_")
|
|
with HiveDbWrapper(self, db_name):
|
|
tbl_name = "foo"
|
|
self.run_stmt_in_hive("""
|
|
drop table if exists {db}.{tbl};
|
|
create table {db}.{tbl} (id int);
|
|
insert into {db}.{tbl} values(1);""".format(db=db_name, tbl=tbl_name))
|
|
# With MetastoreEventProcessor running, the insert event will be processed. Query
|
|
# the table from Impala. Verify that the data is present in Impala.
|
|
result = self.execute_query_with_hms_sync(
|
|
"select * from %s.%s" % (db_name, tbl_name), event_proc_timeout)
|
|
assert len(result.data) == 1
|
|
assert result.data[0] == '1'
|
|
# Execute ALTER TABLE + DROP in quick succession so they will be processed in the
|
|
# same event batch.
|
|
self.run_stmt_in_hive("""
|
|
alter table {db}.{tbl} set tblproperties ('foo'='bar');
|
|
drop table {db}.{tbl};""".format(db=db_name, tbl=tbl_name))
|
|
EventProcessorUtils.wait_for_event_processing(self, event_proc_timeout)
|
|
# Check that the event processor status is still ACTIVE.
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
catalogd_args="--hms_event_polling_interval_s=1")
|
|
def test_create_drop_events(self, unique_database):
|
|
"""Regression test for IMPALA-10502. The test runs very slow with default
|
|
statestored update frequency and hence this is changed to a custom cluster
|
|
test."""
|
|
self.__run_create_drop_test(unique_database, "database")
|
|
self.__run_create_drop_test(unique_database, "table")
|
|
self.__run_create_drop_test(unique_database, "table", True)
|
|
self.__run_create_drop_test(unique_database, "table", True, True)
|
|
self.__run_create_drop_test(unique_database, "partition")
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=1",
|
|
cluster_size=1)
|
|
def test_local_catalog_create_drop_events(self, unique_database):
|
|
"""
|
|
Test is similar to the test_create_drop_events except this runs on local
|
|
"""
|
|
self.__run_create_drop_test(unique_database, "database")
|
|
self.__run_create_drop_test(unique_database, "table")
|
|
self.__run_create_drop_test(unique_database, "table", True)
|
|
self.__run_create_drop_test(unique_database, "table", True, True)
|
|
self.__run_create_drop_test(unique_database, "partition")
|
|
|
|
def __run_create_drop_test(self, db, type, rename=False, rename_db=False):
|
|
if type == "table":
|
|
if not rename:
|
|
queries = [
|
|
"create table {0}.test_{1} (i int)".format(db, 1),
|
|
"drop table {0}.test_{1}".format(db, 1)
|
|
]
|
|
else:
|
|
db_1 = "{}_1".format(db)
|
|
if rename_db:
|
|
self.execute_query_expect_success(self.create_impala_client(),
|
|
"drop database if exists {0} cascade".format(db_1))
|
|
self.execute_query_expect_success(self.create_impala_client(),
|
|
"create database {0}".format(db_1))
|
|
self.execute_query_expect_success(self.create_impala_client(),
|
|
"create table if not exists {0}.rename_test_1 (i int)".format(db))
|
|
if rename_db:
|
|
queries = [
|
|
"alter table {0}.rename_test_1 rename to {1}.rename_test_1".format(db,
|
|
db_1),
|
|
"alter table {0}.rename_test_1 rename to {1}.rename_test_1".format(db_1, db)
|
|
]
|
|
else:
|
|
queries = [
|
|
"alter table {0}.rename_test_1 rename to {0}.rename_test_2".format(db),
|
|
"alter table {0}.rename_test_2 rename to {0}.rename_test_1".format(db)
|
|
]
|
|
create_metric_name = "tables-added"
|
|
removed_metric_name = "tables-removed"
|
|
elif type == "database":
|
|
self.execute_query_expect_success(self.create_impala_client(),
|
|
"drop database if exists {0}".format("test_create_drop_db"))
|
|
queries = [
|
|
"create database {db}".format(db="test_create_drop_db"),
|
|
"drop database {db}".format(db="test_create_drop_db")
|
|
]
|
|
create_metric_name = "databases-added"
|
|
removed_metric_name = "databases-removed"
|
|
else:
|
|
tbl_name = "test_create_drop_partition"
|
|
self.execute_query_expect_success(self.create_impala_client(),
|
|
"create table {db}.{tbl} (c int) partitioned by (p int)".format(
|
|
db=db, tbl=tbl_name))
|
|
queries = [
|
|
"alter table {db}.{tbl} add partition (p=1)".format(db=db, tbl=tbl_name),
|
|
"alter table {db}.{tbl} drop partition (p=1)".format(db=db, tbl=tbl_name)
|
|
]
|
|
create_metric_name = "partitions-added"
|
|
removed_metric_name = "partitions-removed"
|
|
|
|
# get the metric before values
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
create_metric_val_before = EventProcessorUtils.get_int_metric(create_metric_name, 0)
|
|
removed_metric_val_before = EventProcessorUtils.get_int_metric(removed_metric_name, 0)
|
|
events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
num_iters = 100
|
|
for iter in range(num_iters):
|
|
for q in queries:
|
|
try:
|
|
self.execute_query_expect_success(self.create_impala_client(), q)
|
|
except Exception as e:
|
|
print("Failed in {} iterations. Error {}".format(iter, str(e)))
|
|
raise
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
create_metric_val_after = EventProcessorUtils.get_int_metric(create_metric_name, 0)
|
|
removed_metric_val_after = EventProcessorUtils.get_int_metric(removed_metric_name, 0)
|
|
events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
num_delete_event_entries = EventProcessorUtils.\
|
|
get_int_metric('delete-event-log-size', 0)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
# None of the queries above should actually trigger a add/remove object from events
|
|
assert create_metric_val_after == create_metric_val_before
|
|
assert removed_metric_val_after == removed_metric_val_before
|
|
# each query set generates 2 events and both of them should be skipped
|
|
assert events_skipped_after == num_iters * 2 + events_skipped_before
|
|
# make sure that there are no more entries in the delete event log
|
|
assert num_delete_event_entries == 0
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
|
|
def test_self_events_with_hive(self, unique_database):
|
|
"""Runs multiple queries which generate events using hive as client"""
|
|
self._run_self_events_test(unique_database, {}, use_impala=False)
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=0.2"
|
|
" --enable_hierarchical_event_processing=true")
|
|
def test_self_events_with_hive_hierarchical(self, unique_database):
|
|
self._run_self_events_test(unique_database, {}, use_impala=False)
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_polling_interval_s=5"
|
|
" --enable_reload_events=true"
|
|
" --enable_skipping_older_events=true")
|
|
def test_refresh_invalidate_events(self, unique_database):
|
|
"""The reload events generated by this test will be batched by
|
|
MetaStoreEvents#BatchPartitionEvent and BatchPartitionEvent#isOlderEvent() requires
|
|
'enable_skipping_older_events=true' to skip older reload events."""
|
|
self.run_test_refresh_invalidate_events(unique_database, "reload_table")
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_polling_interval_s=5"
|
|
" --enable_reload_events=true"
|
|
" --enable_sync_to_latest_event_on_ddls=true"
|
|
" --enable_skipping_older_events=true")
|
|
def test_refresh_invalidate_events_enable_sync_to_latest_events(self, unique_database):
|
|
self.run_test_refresh_invalidate_events(unique_database, "reload_table_sync", True)
|
|
|
|
def run_test_refresh_invalidate_events(self, unique_database, test_reload_table,
|
|
fire_reload_events_from_hive=False):
|
|
"""Test is to verify Impala-11808, refresh/invalidate commands should generate a
|
|
Reload event in HMS and CatalogD's event processor should process this event.
|
|
"""
|
|
self.client.execute(
|
|
"create table {}.{} (i int) partitioned by (year int) "
|
|
.format(unique_database, test_reload_table))
|
|
self.client.execute(
|
|
"insert into {}.{} partition (year=2022) values (1),(2),(3)"
|
|
.format(unique_database, test_reload_table))
|
|
self.client.execute(
|
|
"insert into {}.{} partition (year=2023) values (1),(2),(3)"
|
|
.format(unique_database, test_reload_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
|
|
def check_self_events(query, num_events=1):
|
|
tbls_refreshed_before, partitions_refreshed_before, \
|
|
events_skipped_before = self._get_self_event_metrics()
|
|
last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
|
|
self.client.execute(query)
|
|
# Check if there is a reload event fired after refresh query.
|
|
events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
|
|
for event in events:
|
|
assert event.dbName == unique_database
|
|
assert event.tableName == test_reload_table
|
|
assert event.eventType == "RELOAD"
|
|
assert len(events) == num_events
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
tbls_refreshed_after, partitions_refreshed_after, \
|
|
events_skipped_after = self._get_self_event_metrics()
|
|
assert events_skipped_after > events_skipped_before
|
|
|
|
check_self_events("refresh {}.{} partition(year=2022)"
|
|
.format(unique_database, test_reload_table))
|
|
check_self_events("refresh {}.{}".format(unique_database, test_reload_table))
|
|
# Refresh multiple partitions. The last two are the same. Verify only two RELOAD
|
|
# events are generated.
|
|
check_self_events(
|
|
"refresh {}.{} partition(year=2022) partition(year=2023) partition(year=2023)"
|
|
.format(unique_database, test_reload_table), 2)
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
|
|
if fire_reload_events_from_hive:
|
|
# Test to verify if older events from hive are being skipped in event processor.
|
|
# Firing 10 consecutive RELOAD events, the first one processes and updates the
|
|
# lastRefreshEventId, causing the remaining 9 to be ignored.
|
|
data = FireEventRequestData()
|
|
data.refreshEvent = True
|
|
req = FireEventRequest(True, data)
|
|
req.dbName = unique_database
|
|
req.tableName = test_reload_table
|
|
# table level reload events
|
|
tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
|
|
for _ in range(10):
|
|
self.hive_client.fire_listener_event(req)
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
|
|
assert tbl_events_skipped_after > tbl_events_skipped_before
|
|
# partition level reload events
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
part_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
|
|
self.client.execute(":event_processor('pause')")
|
|
req.partitionVals = ["2022"]
|
|
for _ in range(10):
|
|
self.hive_client.fire_listener_event(req)
|
|
self.client.execute("refresh {}.{} partition(year=2022)"
|
|
.format(unique_database, test_reload_table))
|
|
self.client.execute(":event_processor('start')")
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
part_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
|
|
assert part_events_skipped_after > part_events_skipped_before
|
|
|
|
# Test to verify IMPALA-12213
|
|
table = self.hive_client.get_table(unique_database, test_reload_table)
|
|
table.dbName = unique_database
|
|
table.tableName = "test_sequence_table"
|
|
self.hive_client.create_table(table)
|
|
data = FireEventRequestData()
|
|
data.refreshEvent = True
|
|
req = FireEventRequest(True, data)
|
|
req.dbName = unique_database
|
|
req.tableName = "test_sequence_table"
|
|
self.hive_client.fire_listener_event(req)
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--use_local_catalog=false",
|
|
catalogd_args="--catalog_topic_mode=full --enable_reload_events=true")
|
|
def test_reload_events_with_transient_partitions_legacy_catalog(self, unique_database):
|
|
self.run_test_reload_events_with_transient_partitions(unique_database)
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --enable_reload_events=true")
|
|
def test_reload_events_with_transient_partitions_local_catalog(self, unique_database):
|
|
self.run_test_reload_events_with_transient_partitions(unique_database)
|
|
|
|
def run_test_reload_events_with_transient_partitions(self, unique_database):
|
|
tbl = unique_database + ".tbl"
|
|
create_stmt = "create table {} (i int) partitioned by(p int)".format(tbl)
|
|
add_part_stmt = "alter table {} add if not exists partition(p=0)".format(tbl)
|
|
drop_part_stmt = "alter table {} drop if exists partition(p=0)".format(tbl)
|
|
refresh_stmt = "refresh {} partition(p=0)".format(tbl)
|
|
end_states = [FINISHED, ERROR]
|
|
|
|
self.execute_query(create_stmt)
|
|
self.execute_query(add_part_stmt)
|
|
# Run REFRESH partition in the background so we can drop the partition concurrently.
|
|
refresh_handle = self.client.execute_async(refresh_stmt)
|
|
# Before IMPALA-12855, REFRESH usually fails in 2-3 rounds.
|
|
for i in range(100):
|
|
self.execute_query(drop_part_stmt)
|
|
refresh_state = self.client.wait_for_any_impala_state(
|
|
refresh_handle, end_states, 10)
|
|
is_finished = (refresh_state == FINISHED)
|
|
error_log = None if is_finished else self.client.get_log(refresh_handle)
|
|
self.client.close_query(refresh_handle)
|
|
assert is_finished, "REFRESH is error. Error log: {}".format(error_log)
|
|
self.execute_query(add_part_stmt)
|
|
refresh_handle = self.client.execute_async(refresh_stmt)
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_polling_interval_s=10"
|
|
" --enable_skipping_older_events=true"
|
|
" --enable_sync_to_latest_event_on_ddls=true")
|
|
def test_skipping_older_events(self, unique_database):
|
|
"""Test is to verify IMPALA-11535, event processor should ignore older events if the
|
|
current event id is older than the lastRefreshEventId on the table/partition
|
|
"""
|
|
test_old_table = "test_old_table"
|
|
|
|
def verify_skipping_older_events(table_name, is_transactional, is_partitioned):
|
|
query = " ".join(["create", "transactional" if is_transactional else '',
|
|
"table {}.{} (i int)", "partitioned by (year int)" if is_partitioned else ''])
|
|
self.run_stmt_in_hive(query.format(unique_database, table_name))
|
|
values = "values (10),(20),(30)"
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
|
|
def verify_skipping_hive_stmt_events(stmt, new_table_name):
|
|
tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
|
|
self.run_stmt_in_hive(stmt)
|
|
self.client.execute(
|
|
"refresh {}.{}".format(unique_database, new_table_name))
|
|
tables_refreshed_before = EventProcessorUtils.get_int_metric("tables-refreshed")
|
|
partitions_refreshed_before = \
|
|
EventProcessorUtils.get_int_metric("partitions-refreshed")
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
|
|
assert tbl_events_skipped_after > tbl_events_skipped_before
|
|
tables_refreshed_after = EventProcessorUtils.get_int_metric("tables-refreshed")
|
|
partitions_refreshed_after = \
|
|
EventProcessorUtils.get_int_metric("partitions-refreshed")
|
|
if is_partitioned:
|
|
assert partitions_refreshed_after == partitions_refreshed_before
|
|
else:
|
|
assert tables_refreshed_after == tables_refreshed_before
|
|
|
|
# test single insert event
|
|
query = " ".join(["insert into `{}`.`{}`", "partition (year=2023)"
|
|
if is_partitioned else '', values])
|
|
verify_skipping_hive_stmt_events(
|
|
query.format(unique_database, table_name), table_name)
|
|
# test batch insert events
|
|
query = " ".join(["insert into `{}`.`{}`", "partition (year=2023)"
|
|
if is_partitioned else '', values, ";"])
|
|
complete_query = ""
|
|
for _ in range(3):
|
|
complete_query += query.format(unique_database, table_name)
|
|
verify_skipping_hive_stmt_events(complete_query, table_name)
|
|
# Dynamic partitions test
|
|
query = " ".join(["create", "table `{}`.`{}` (i int)",
|
|
" partitioned by (year int) " if is_partitioned else '',
|
|
self._get_transactional_tblproperties(is_transactional)])
|
|
self.client.execute(query.format(unique_database, "new_table"))
|
|
complete_query = "insert overwrite table `{db}`.`{tbl1}` " \
|
|
"select * from `{db}`.`{tbl2}`"\
|
|
.format(db=unique_database, tbl1="new_table", tbl2=table_name)
|
|
verify_skipping_hive_stmt_events(complete_query, "new_table")
|
|
# Drop the tables before running another test
|
|
self.client.execute("drop table {}.{}".format(unique_database, table_name))
|
|
self.client.execute("drop table {}.{}".format(unique_database, "new_table"))
|
|
verify_skipping_older_events(test_old_table, False, False)
|
|
verify_skipping_older_events(test_old_table, True, False)
|
|
verify_skipping_older_events(test_old_table, False, True)
|
|
verify_skipping_older_events(test_old_table, True, True)
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--enable_sync_to_latest_event_on_ddls=true "
|
|
"--debug_actions=catalogd_get_filtered_events_delay:SLEEP@3000 ")
|
|
def test_skipping_batching_events(self, unique_database):
|
|
"""Test to verify IMPALA-10949, improving batching logic for partition events.
|
|
Before batching the events, each event is checked if the event id is greater than
|
|
table's lastSyncEventId then the event can be batched else it can be skipped."""
|
|
# Print trace logs from DebugUtils.
|
|
self.cluster.catalogd.set_jvm_log_level("org.apache.impala.util.DebugUtils", "trace")
|
|
test_batch_table = "test_batch_table"
|
|
self.client.execute(
|
|
"create table {}.{} like functional.alltypes"
|
|
.format(unique_database, test_batch_table))
|
|
self.client.execute(
|
|
"insert into {}.{} partition (year,month) select * from functional.alltypes"
|
|
.format(unique_database, test_batch_table))
|
|
# Generate batch ALTER_PARTITION events
|
|
self.run_stmt_in_hive(
|
|
"analyze table {}.{} compute statistics".format(unique_database, test_batch_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
batch_events_metric = "batch-events-created"
|
|
batch_events_1 = EventProcessorUtils.get_int_metric(batch_events_metric)
|
|
prev_skipped_events = EventProcessorUtils.get_int_metric("events-skipped")
|
|
self.run_stmt_in_hive(
|
|
"analyze table {}.{} compute statistics".format(unique_database, test_batch_table))
|
|
self.client.execute("refresh {0}.{1}".format(unique_database, test_batch_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
batch_events_2 = EventProcessorUtils.get_int_metric(batch_events_metric)
|
|
current_skipped_events = EventProcessorUtils.get_int_metric("events-skipped")
|
|
# Make sure no new batch events are created
|
|
assert batch_events_2 == batch_events_1
|
|
assert current_skipped_events - prev_skipped_events >= 24
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
|
|
def test_batch_reload_events(self, unique_database):
|
|
"""Test to verify IMPALA-14082, adding batching logic for partitioned refresh events.
|
|
Before batching the events, each event is checked if the event id is greater than
|
|
table's lastSyncEventId then the event can be batched else it can be skipped."""
|
|
tbl = unique_database + ".batch_refresh_tbl"
|
|
self.client.execute(
|
|
"create table {} (i int) partitioned by(p int) stored as textfile".format(tbl))
|
|
self.client.execute("insert into {} partition(p) values (0,0),(1,1),(2,2)"
|
|
.format(tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
|
|
def __get_fs_location(table_name):
|
|
return '%s/%s.db/%s/' % (WAREHOUSE, unique_database, table_name)
|
|
|
|
batch_events_1 = EventProcessorUtils.get_int_metric("batch-events-created")
|
|
tbl_path = __get_fs_location("batch_refresh_tbl")
|
|
# new data for p=0, overwrite p=2 with p=1, delete data under p=1
|
|
self.filesystem_client.create_file(os.path.join(tbl_path + "p=0/",
|
|
"new_file.txt"), "4")
|
|
self.filesystem_client.delete_file_dir(tbl_path + "p=2/", recursive=True)
|
|
self.filesystem_client.make_dir(tbl_path + "p=2/")
|
|
self.filesystem_client.copy(tbl_path + "p=1/", tbl_path + "p=2/")
|
|
self.filesystem_client.delete_file_dir(tbl_path + "p=1/", recursive=True)
|
|
data = FireEventRequestData()
|
|
data.refreshEvent = True
|
|
req = FireEventRequest(True, data)
|
|
req.dbName = unique_database
|
|
req.tableName = "batch_refresh_tbl"
|
|
req.partitionVals = ["0"]
|
|
self.hive_client.fire_listener_event(req)
|
|
req.partitionVals = ["1"]
|
|
self.hive_client.fire_listener_event(req)
|
|
req.partitionVals = ["2"]
|
|
self.hive_client.fire_listener_event(req)
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
batch_events_2 = EventProcessorUtils.get_int_metric("batch-events-created")
|
|
assert batch_events_2 == batch_events_1 + 1
|
|
# p=0 has two values 0, 4 and p=2 has value 1
|
|
result = self.execute_query(
|
|
"select * from {} order by i".format(tbl))
|
|
parsed_data = []
|
|
for line in result.get_data().strip().split('\n'):
|
|
row = [int(val) for val in line.strip().split('\t')]
|
|
parsed_data.append(row)
|
|
expected_data = [[0, 0], [1, 2], [4, 0]]
|
|
assert expected_data == parsed_data
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
|
|
def test_commit_compaction_events(self, unique_database):
|
|
"""Test is to verify Impala-11626, commit compaction events triggered in HMS would
|
|
be consumed by CatalogD's event processor.
|
|
"""
|
|
|
|
# Test scenario 1: partitioned table
|
|
test_cc_part_table = "test_cc_partitioned_table"
|
|
self.run_stmt_in_hive(
|
|
"create transactional table {}.{} (i int) partitioned by (year int)"
|
|
.format(unique_database, test_cc_part_table))
|
|
for i in range(2):
|
|
self.run_stmt_in_hive(
|
|
"insert into {}.{} partition (year=2022) values (1),(2),(3)"
|
|
.format(unique_database, test_cc_part_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
parts_refreshed_before_compaction = EventProcessorUtils.get_int_metric(
|
|
"partitions-refreshed")
|
|
self.client.execute(
|
|
"select * from {}.{} limit 2"
|
|
.format(unique_database, test_cc_part_table))
|
|
self.run_stmt_in_hive(
|
|
"alter table {}.{} partition(year=2022) compact 'minor' and wait"
|
|
.format(unique_database, test_cc_part_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
parts_refreshed_after_compaction = EventProcessorUtils.get_int_metric(
|
|
"partitions-refreshed")
|
|
assert parts_refreshed_after_compaction > parts_refreshed_before_compaction
|
|
|
|
# Test scenario 2:
|
|
test_cc_unpart_tab = "test_cc_unpart_table"
|
|
self.run_stmt_in_hive(
|
|
"create transactional table {}.{} (i int)"
|
|
.format(unique_database, test_cc_unpart_tab))
|
|
for i in range(2):
|
|
self.run_stmt_in_hive(
|
|
"insert into {}.{} values (1),(2),(3)"
|
|
.format(unique_database, test_cc_unpart_tab))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
tables_refreshed_before_compaction = EventProcessorUtils.get_int_metric(
|
|
"tables-refreshed")
|
|
self.client.execute(
|
|
"select * from {}.{} limit 2"
|
|
.format(unique_database, test_cc_unpart_tab))
|
|
self.run_stmt_in_hive("alter table {}.{} compact 'minor' and wait"
|
|
.format(unique_database, test_cc_unpart_tab))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
tables_refreshed_after_compaction = EventProcessorUtils.get_int_metric(
|
|
"tables-refreshed")
|
|
assert tables_refreshed_after_compaction > tables_refreshed_before_compaction
|
|
|
|
# Test scenario 3: partitioned table has partition deleted
|
|
test_cc_part_table = "test_cc_partitioned_table_error"
|
|
self.run_stmt_in_hive(
|
|
"create transactional table {}.{} (i int) partitioned by (year int)"
|
|
.format(unique_database, test_cc_part_table))
|
|
for i in range(2):
|
|
self.run_stmt_in_hive(
|
|
"insert into {}.{} partition (year=2022) values (1),(2),(3)"
|
|
.format(unique_database, test_cc_part_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
self.client.execute(
|
|
"select * from {}.{} limit 2"
|
|
.format(unique_database, test_cc_part_table))
|
|
self.run_stmt_in_hive(
|
|
"alter table {}.{} partition(year=2022) compact 'minor' and wait"
|
|
.format(unique_database, test_cc_part_table))
|
|
self.run_stmt_in_hive("alter table {}.{} Drop if exists partition(year=2022)"
|
|
.format(unique_database, test_cc_part_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
|
|
# Test scenario 4: process commit compaction for an unloaded table
|
|
test_cc_part_table = "test_cc_table_unloaded"
|
|
self.run_stmt_in_hive(
|
|
"create transactional table {}.{} (i int) partitioned by (year int)"
|
|
.format(unique_database, test_cc_part_table))
|
|
for i in range(2):
|
|
self.run_stmt_in_hive(
|
|
"insert into {}.{} partition (year=2022) values (1),(2),(3)"
|
|
.format(unique_database, test_cc_part_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
self.run_stmt_in_hive(
|
|
"alter table {}.{} partition(year=2022) compact 'minor' and wait"
|
|
.format(unique_database, test_cc_part_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
|
|
def test_event_batching(self, unique_database):
|
|
"""Runs queries which generate multiple ALTER_PARTITION events which must be
|
|
batched by events processor. Runs as a custom cluster test to isolate the metric
|
|
values from other tests."""
|
|
testtbl = "test_event_batching"
|
|
test_acid_tbl = "test_event_batching_acid"
|
|
acid_props = self._get_transactional_tblproperties(True)
|
|
# create test tables
|
|
self.client.execute(
|
|
"create table {}.{} like functional.alltypes".format(unique_database, testtbl))
|
|
self.client.execute(
|
|
"insert into {}.{} partition (year,month) select * from functional.alltypes".format(
|
|
unique_database, testtbl))
|
|
self.client.execute(
|
|
"create table {}.{} (id int) partitioned by (year int, month int) {}".format(
|
|
unique_database, test_acid_tbl, acid_props))
|
|
self.client.execute(
|
|
"insert into {}.{} partition (year, month) "
|
|
"select id, year, month from functional.alltypes".format(unique_database,
|
|
test_acid_tbl))
|
|
# run compute stats from impala; this should generate 24 ALTER_PARTITION events which
|
|
# should be batched together into 1 or more number of events.
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
batch_events_metric = "batch-events-created"
|
|
batch_events_1 = EventProcessorUtils.get_int_metric(batch_events_metric)
|
|
self.client.execute("compute stats {}.{}".format(unique_database, testtbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
batch_events_2 = EventProcessorUtils.get_int_metric(batch_events_metric)
|
|
assert batch_events_2 > batch_events_1
|
|
# run analyze stats event from hive which generates ALTER_PARTITION event on each
|
|
# partition of the table
|
|
self.run_stmt_in_hive(
|
|
"analyze table {}.{} compute statistics".format(unique_database, testtbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
batch_events_3 = EventProcessorUtils.get_int_metric(batch_events_metric)
|
|
assert batch_events_3 > batch_events_2
|
|
# in case of transactional table since we batch the events together, the number of
|
|
# tables refreshed must be far lower than number of events generated
|
|
num_table_refreshes_1 = EventProcessorUtils.get_int_metric(
|
|
"tables-refreshed")
|
|
self.client.execute("compute stats {}.{}".format(unique_database, test_acid_tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
batch_events_4 = EventProcessorUtils.get_int_metric(batch_events_metric)
|
|
num_table_refreshes_2 = EventProcessorUtils.get_int_metric(
|
|
"tables-refreshed")
|
|
# we should generate atleast 1 batch event if not more due to the 24 consecutive
|
|
# ALTER_PARTITION events
|
|
assert batch_events_4 > batch_events_3
|
|
# table should not be refreshed since this is a self-event
|
|
assert num_table_refreshes_2 == num_table_refreshes_1
|
|
self.run_stmt_in_hive(
|
|
"analyze table {}.{} compute statistics".format(unique_database, test_acid_tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
batch_events_5 = EventProcessorUtils.get_int_metric(batch_events_metric)
|
|
assert batch_events_5 > batch_events_4
|
|
num_table_refreshes_2 = EventProcessorUtils.get_int_metric("tables-refreshed")
|
|
# the analyze table from hive generates 24 ALTER_PARTITION events which should be
|
|
# batched into 1-2 batches (depending on timing of the event poll thread).
|
|
assert num_table_refreshes_2 > num_table_refreshes_1
|
|
assert int(num_table_refreshes_2) - int(num_table_refreshes_1) < 24
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
# test for batching of insert events
|
|
batch_events_insert = EventProcessorUtils.get_int_metric(batch_events_metric)
|
|
tables_refreshed_insert = EventProcessorUtils.get_int_metric("tables-refreshed")
|
|
partitions_refreshed_insert = EventProcessorUtils.get_int_metric(
|
|
"partitions-refreshed")
|
|
self.client.execute(
|
|
"insert into {}.{} partition (year,month) select * from functional.alltypes".format(
|
|
unique_database, testtbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
batch_events_after_insert = EventProcessorUtils.get_int_metric(batch_events_metric)
|
|
tables_refreshed_after_insert = EventProcessorUtils.get_int_metric("tables-refreshed")
|
|
partitions_refreshed_after_insert = EventProcessorUtils.get_int_metric(
|
|
"partitions-refreshed")
|
|
# this is a self-event tables or partitions should not be refreshed
|
|
assert batch_events_after_insert > batch_events_insert
|
|
assert tables_refreshed_after_insert == tables_refreshed_insert
|
|
assert partitions_refreshed_after_insert == partitions_refreshed_insert
|
|
# run the insert from hive to make sure that batch event is refreshing all the
|
|
# partitions
|
|
self.run_stmt_in_hive(
|
|
"SET hive.exec.dynamic.partition.mode=nonstrict; insert into {}.{} partition"
|
|
" (year,month) select * from functional.alltypes".format(
|
|
unique_database, testtbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
batch_events_after_hive = EventProcessorUtils.get_int_metric(batch_events_metric)
|
|
partitions_refreshed_after_hive = EventProcessorUtils.get_int_metric(
|
|
"partitions-refreshed")
|
|
assert batch_events_after_hive > batch_events_insert
|
|
# 24 partitions inserted and hence we must refresh 24 partitions once.
|
|
assert int(partitions_refreshed_after_hive) == int(partitions_refreshed_insert) + 24
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5")
|
|
def test_event_processor_failure_extra_space(self, unique_database):
|
|
"""This test verifies that impala event processor is in active state after
|
|
processing a couple of previously erroneous events"""
|
|
test_table = "extra_space_table"
|
|
# IMPALA-11939 -- create table event in HMS contains extra spaces in the db/table
|
|
self.run_stmt_in_hive("create table ` {}`.`{} ` (i1 int) partitioned by (year int)"
|
|
.format(unique_database, test_table))
|
|
self.run_stmt_in_hive("alter table ` {}`.`{} ` add columns (i2 int)"
|
|
.format(unique_database, test_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5")
|
|
def test_disable_hms_sync(self, unique_database):
|
|
"""This test verifies that impala event processor is in active state after
|
|
processing an alter table event that re-enables hms sync"""
|
|
# test 1: re-enable disableHmsSync config at table level
|
|
test_table = "disable_hms_sync_table"
|
|
self.client.execute(
|
|
"""create table {}.{} (i int) TBLPROPERTIES ('impala.disableHmsSync'='true')"""
|
|
.format(unique_database, test_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
prev_events_skipped = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
self.run_stmt_in_hive(
|
|
"""ALTER TABLE {}.{} SET TBLPROPERTIES('somekey'='somevalue')"""
|
|
.format(unique_database, test_table))
|
|
self.client.execute(
|
|
"""ALTER TABLE {}.{} SET TBLPROPERTIES ('impala.disableHmsSync'='false')"""
|
|
.format(unique_database, test_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
current_events_skipped = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
assert current_events_skipped >= prev_events_skipped + 1
|
|
|
|
# test 2: re-enabling disableHmsSync config on a table shouldn't put event processor
|
|
# in error state if the database is not loaded.
|
|
try:
|
|
test_db = "unloaded_db_sync"
|
|
self.run_stmt_in_hive("""create database {}""".format(test_db))
|
|
self.run_stmt_in_hive("""create table {}.{} (id int)
|
|
TBLPROPERTIES ('impala.disableHmsSync'='true')""".format(test_db, test_table))
|
|
self.run_stmt_in_hive(
|
|
"""ALTER TABLE {}.{} SET TBLPROPERTIES ('impala.disableHmsSync'='false')"""
|
|
.format(test_db, test_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
finally:
|
|
self.run_stmt_in_hive("""drop database {} cascade""".format(test_db))
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=10")
|
|
def test_event_processor_dropped_partition(self, unique_database):
|
|
"""This test verifies that impala event processor is in active state after
|
|
processing partitioned insert events of a dropped table"""
|
|
# IMPALA-11768 -- Insert partition events should be ignored
|
|
# if the table is dropped
|
|
test_table = "partitioned_table"
|
|
|
|
def is_event_processor_active(is_insert):
|
|
self.run_stmt_in_hive("create table {}.{} (i1 int) partitioned by (year int)"
|
|
.format(unique_database, test_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
self.client.execute("refresh {}.{}".format(unique_database, test_table))
|
|
self.run_stmt_in_hive(
|
|
"insert into {}.{} partition(year=2023) values (4),(5),(6)"
|
|
.format(unique_database, test_table))
|
|
data = FireEventRequestData()
|
|
if is_insert:
|
|
insert_data = InsertEventRequestData()
|
|
insert_data.filesAdded = "/warehouse/mytable/b1"
|
|
insert_data.replace = False
|
|
data.insertData = insert_data
|
|
else:
|
|
data.refreshEvent = True
|
|
req = FireEventRequest(True, data)
|
|
req.dbName = unique_database
|
|
req.tableName = test_table
|
|
req.partitionVals = ["2023"]
|
|
self.hive_client.fire_listener_event(req)
|
|
self.run_stmt_in_hive(
|
|
"drop table {}.{}".format(unique_database, test_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
|
|
is_event_processor_active(True)
|
|
is_event_processor_active(False)
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
|
|
def test_iceberg_self_events(self, unique_database):
|
|
"""This test checks that Impala doesn't refresh Iceberg tables on self events."""
|
|
tbl_name = unique_database + ".test_iceberg_events"
|
|
iceberg_catalogs = IcebergCatalogs(unique_database)
|
|
|
|
def check_self_events(query, skips_events=True):
|
|
tbls_refreshed_before, partitions_refreshed_before, \
|
|
events_skipped_before = self._get_self_event_metrics()
|
|
self.client.execute(query)
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
tbls_refreshed_after, partitions_refreshed_after, \
|
|
events_skipped_after = self._get_self_event_metrics()
|
|
assert tbls_refreshed_before == tbls_refreshed_after
|
|
assert partitions_refreshed_before == partitions_refreshed_after
|
|
if skips_events:
|
|
assert events_skipped_after > events_skipped_before
|
|
|
|
for catalog in iceberg_catalogs.get_iceberg_catalog_properties():
|
|
is_hive_catalog = iceberg_catalogs.is_a_hive_catalog(catalog)
|
|
self.client.execute("""
|
|
CREATE TABLE {0} (i int) STORED AS ICEBERG
|
|
TBLPROPERTIES ({1})""".format(tbl_name, catalog))
|
|
|
|
check_self_events("INSERT OVERWRITE {0} VALUES (1)".format(tbl_name),
|
|
skips_events=is_hive_catalog)
|
|
check_self_events("ALTER TABLE {0} ADD COLUMN j INT".format(tbl_name))
|
|
check_self_events("ALTER TABLE {0} DROP COLUMN i".format(tbl_name))
|
|
check_self_events("ALTER TABLE {0} CHANGE COLUMN j j BIGINT".format(tbl_name))
|
|
# SET PARTITION SPEC only updates HMS in case of HiveCatalog (which sets
|
|
# table property 'metadata_location')
|
|
check_self_events(
|
|
"ALTER TABLE {0} SET PARTITION SPEC (truncate(2, j))".format(tbl_name),
|
|
skips_events=is_hive_catalog)
|
|
check_self_events(
|
|
"ALTER TABLE {0} SET TBLPROPERTIES('key'='value')".format(tbl_name))
|
|
check_self_events("ALTER TABLE {0} UNSET TBLPROPERTIES('key')".format(tbl_name))
|
|
check_self_events("INSERT INTO {0} VALUES (2), (3), (4)".format(tbl_name),
|
|
skips_events=is_hive_catalog)
|
|
ctas_tbl = unique_database + ".ice_ctas"
|
|
check_self_events("""CREATE TABLE {0} STORED AS ICEBERG
|
|
TBLPROPERTIES ({1}) AS SELECT * FROM {2}""".format(ctas_tbl, catalog, tbl_name))
|
|
check_self_events("DROP TABLE {0}".format(ctas_tbl))
|
|
check_self_events("TRUNCATE TABLE {0}".format(tbl_name),
|
|
skips_events=is_hive_catalog)
|
|
|
|
self.client.execute("DROP TABLE {0}".format(tbl_name))
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5")
|
|
def test_stale_drop_partition_events(self, unique_database):
|
|
"""Regression Tests for IMPALA-12256. Verifies stale DROP_PARTITION events are
|
|
skipped even if they are processed late after some other DDLs. Uses a higher polling
|
|
interval to ensure late processing on the events"""
|
|
self.client.execute(
|
|
"create table %s.part(i int) partitioned by (p int) stored as textfile"
|
|
% unique_database)
|
|
self.client.execute(
|
|
"insert into %s.part partition (p=0) values (0)" % unique_database)
|
|
# These DDLs will reload the partition metadata. We will verify they don't lose
|
|
# the create event ids after the reload.
|
|
partition_ddls = [
|
|
"compute stats %s.part" % unique_database,
|
|
"compute incremental stats %s.part" % unique_database,
|
|
"compute incremental stats %s.part partition(p=0)" % unique_database,
|
|
"alter table %s.part partition(p=0) set row format"
|
|
" delimited fields terminated by ','" % unique_database,
|
|
"alter table %s.part partition(p=0) set fileformat parquet" % unique_database,
|
|
"alter table %s.part partition(p=0) set location '/tmp'" % unique_database,
|
|
"alter table %s.part partition(p=0) set tblproperties('k'='v')" % unique_database,
|
|
"refresh %s.part partition(p=0)" % unique_database,
|
|
"refresh %s.part" % unique_database,
|
|
]
|
|
# Wait until the events in preparing the table are consumed.
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
parts_added_before = EventProcessorUtils.get_int_metric("partitions-added")
|
|
parts_refreshed_before = EventProcessorUtils.get_int_metric("partitions-refreshed")
|
|
parts_removed_before = EventProcessorUtils.get_int_metric("partitions-removed")
|
|
for ddl in partition_ddls:
|
|
events_skipped_before = EventProcessorUtils.get_int_metric("events-skipped")
|
|
# Drop-create the partition and then runs a DDL on it. A DROP_PARTITION and an
|
|
# ADD_PARTITION event will be generated and should be skipped. The 3rd DDL might
|
|
# generate an ALTER_PARTITION event but it should be skipped as self-event.
|
|
# Note that we don't perform self-event detection on ADD/DROP_PARTITION events.
|
|
# They are skipped based on the partition level create event ids. So we should see
|
|
# no partitions are added/removed/refreshed if we correctly track the create event
|
|
# id (saved by the 2nd DDL that creates the partition).
|
|
# For the DROP_PARTITION event, there are 3 cases:
|
|
# 1) The DROP_PARTITION event is processed before the INSERT statement.
|
|
# It's skipped since the partition doesn't exist.
|
|
# 2) The DROP_PARTITION event is processed after the INSERT statement
|
|
# and before the 3rd DDL. The INSERT statement creates the partition so saves
|
|
# the create event id which is higher than the id of the DROP_PARTITION event.
|
|
# Thus the DROP_PARTITION event is skipped.
|
|
# 3) The DROP_PARTITION event is processed after the 3rd DDL. The reload triggered
|
|
# by the DDL should keep track of the create event id so the DROP_PARTITION event
|
|
# can be skipped.
|
|
# This test sets hms_event_polling_interval_s to 5 which is long enough for the
|
|
# 3 DDLs to finish. So it's more likely the 3rd case would happen, which is the
|
|
# case of IMPALA-12256.
|
|
self.client.execute(
|
|
"alter table %s.part drop partition (p=0)" % unique_database)
|
|
self.client.execute(
|
|
"insert into %s.part partition(p=0) values (1),(2)" % unique_database)
|
|
self.client.execute(ddl)
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
events_skipped_after = EventProcessorUtils.get_int_metric("events-skipped")
|
|
parts_added_after = EventProcessorUtils.get_int_metric("partitions-added")
|
|
parts_refreshed_after = EventProcessorUtils.get_int_metric("partitions-refreshed")
|
|
parts_removed_after = EventProcessorUtils.get_int_metric("partitions-removed")
|
|
# Event-processor should not update any partitions since all events should be
|
|
# skipped
|
|
assert parts_removed_before == parts_removed_after
|
|
assert parts_added_before == parts_added_after
|
|
assert parts_refreshed_before == parts_refreshed_after
|
|
assert events_skipped_after > events_skipped_before
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5")
|
|
def test_truncate_table_from_hive(self, unique_database):
|
|
"""IMPALA-12636: verify truncate table from hive reloads file metadata in Impala"""
|
|
hive_tbl = "tbl_in_hive"
|
|
values = "values (10),(20),(30)"
|
|
|
|
def verify_truncate_op_in_hive(tbl_name, is_transactional, is_partitioned,
|
|
is_batched):
|
|
create_query = " ".join(["create", "table `{}`.`{}` (i int)",
|
|
" partitioned by (year int) " if is_partitioned else '',
|
|
self._get_transactional_tblproperties(is_transactional)])
|
|
self.execute_query(create_query.format(unique_database, tbl_name))
|
|
insert_query = " ".join(["insert into `{}`.`{}`", "partition (year=2024)"
|
|
if is_partitioned else '', values])
|
|
self.run_stmt_in_hive(insert_query.format(unique_database, tbl_name))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
self.client.execute("refresh {}.{}".format(unique_database, tbl_name))
|
|
truncate_query = " ".join(["truncate table `{}`.`{}`", "partition (year=2024)"
|
|
if is_partitioned else ''])
|
|
self.run_stmt_in_hive(truncate_query.format(unique_database, tbl_name))
|
|
if is_batched:
|
|
self.run_stmt_in_hive(
|
|
"insert into {}.{} partition (year=2024) values (1),(2)"
|
|
.format(unique_database, tbl_name))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
|
|
unique_database, tbl_name)))
|
|
assert data == 2 if is_batched else data == 0
|
|
self.client.execute("drop table {}.{}".format(unique_database, tbl_name))
|
|
# Case-I: truncate single partition
|
|
verify_truncate_op_in_hive(hive_tbl, False, False, False)
|
|
verify_truncate_op_in_hive(hive_tbl, True, False, False)
|
|
verify_truncate_op_in_hive(hive_tbl, False, True, False)
|
|
verify_truncate_op_in_hive(hive_tbl, False, True, True)
|
|
verify_truncate_op_in_hive(hive_tbl, True, True, False)
|
|
verify_truncate_op_in_hive(hive_tbl, True, True, True)
|
|
|
|
# Case-II: truncate partition in multi partition
|
|
hive_tbl = "multi_part_tbl"
|
|
self.client.execute("create table {}.{} (i int) partitioned by "
|
|
"(p int, q int)".format(unique_database, hive_tbl))
|
|
self.client.execute("insert into {}.{} partition(p, q) values "
|
|
"(0,0,0), (0,0,1), (0,0,2)".format(unique_database, hive_tbl))
|
|
self.client.execute("insert into {}.{} partition(p, q) values "
|
|
"(0,1,0), (0,1,1)".format(unique_database, hive_tbl))
|
|
self.run_stmt_in_hive("truncate table {}.{} partition(p=0)"
|
|
.format(unique_database, hive_tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
|
|
unique_database, hive_tbl)))
|
|
assert data == 2
|
|
self.run_stmt_in_hive("truncate table {}.{}"
|
|
.format(unique_database, hive_tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
|
|
unique_database, hive_tbl)))
|
|
assert data == 0
|
|
|
|
@SkipIf.is_test_jdk
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_polling_interval_s=100",
|
|
hive_conf_dir=HIVE_SITE_HOUSEKEEPING_ON)
|
|
def test_commit_compaction_with_abort_txn(self, unique_database):
|
|
"""Use a long enough polling interval to allow Hive statements to finish before
|
|
the ABORT_TXN event is processed. In local tests, the Hive statements usually
|
|
finish in 60s.
|
|
TODO: improve this by adding commands to pause and resume the event-processor."""
|
|
tbl = "part_table"
|
|
fq_tbl = unique_database + '.' + tbl
|
|
acid = AcidTxn(self.hive_client)
|
|
self.run_stmt_in_hive(
|
|
"create transactional table {} (i int) partitioned by (p int)".format(fq_tbl))
|
|
|
|
# Allocate a write id on this table and abort the txn
|
|
txn_id = acid.open_txns()
|
|
acid.allocate_table_write_ids(txn_id, unique_database, tbl)
|
|
acid.abort_txn(txn_id)
|
|
|
|
# Insert some rows and trigger compaction
|
|
for i in range(2):
|
|
self.run_stmt_in_hive(
|
|
"insert into {} partition(p=0) values (1),(2),(3)".format(fq_tbl))
|
|
self.run_stmt_in_hive(
|
|
"alter table {} partition(p=0) compact 'major' and wait".format(fq_tbl))
|
|
|
|
# The CREATE_TABLE event hasn't been processed yet so we have to explictily invalidate
|
|
# the table first.
|
|
self.client.execute("invalidate metadata " + fq_tbl)
|
|
# Reload the table so the latest valid writeIdList is loaded
|
|
self.client.execute("refresh " + fq_tbl)
|
|
# Process the ABORT_TXN event
|
|
EventProcessorUtils.wait_for_event_processing(self, timeout=100)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
# Uncomment this once we can stop and resume the event-processor using commands.
|
|
# Currently the test is flaky with it since the Hive statements could take longer to
|
|
# finish than 100s (e.g. I saw a run of 5mins).
|
|
# self.assert_catalogd_log_contains("INFO", "Not added ABORTED write id 1 since it's "
|
|
# + "not opened and might already be cleaned up")
|
|
|
|
@SkipIfCdpHive.deprecated_feature
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_incremental_refresh_transactional_table=false",
|
|
statestored_args=STATESTORED_ARGS)
|
|
def test_no_hms_event_incremental_refresh_transactional_table(self, unique_database):
|
|
"""IMPALA-12835: Test that Impala notices inserts to acid tables when
|
|
hms_event_incremental_refresh_transactional_table is false.
|
|
"""
|
|
for partitioned in [False, True]:
|
|
tbl = "part_tbl" if partitioned else "tbl"
|
|
fq_tbl = unique_database + '.' + tbl
|
|
part_create = " partitioned by (p int)" if partitioned else ""
|
|
part_insert = " partition (p = 1)" if partitioned else ""
|
|
|
|
create_stmt = ("create table {} (i int){} stored as ORC "
|
|
"tblproperties ('transactional'='true',"
|
|
"'transactional_properties'='default')").format(fq_tbl, part_create)
|
|
self.run_stmt_in_hive(create_stmt)
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
# Wait for StatestoreD to propagate the update.
|
|
wait_statestore_heartbeat(num_heartbeat=4)
|
|
|
|
# Load the table in Impala before INSERT
|
|
self.client.execute("refresh " + fq_tbl)
|
|
self.run_stmt_in_hive(
|
|
"insert into {}{} values (1),(2),(3)".format(fq_tbl, part_insert))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
# Wait for StatestoreD to propagate the update.
|
|
wait_statestore_heartbeat(num_heartbeat=4)
|
|
|
|
results = self.client.execute("select i from " + fq_tbl)
|
|
assert results.data == ["1", "2", "3"], (
|
|
"ACID table is not updated. Create statement: {}").format(create_stmt)
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5")
|
|
def test_invalidate_better_create_event_id(self, unique_database):
|
|
"""This test should set better create event id for invalidate table"""
|
|
test_tbl = "test_invalidate_table"
|
|
self.client.execute("create table {}.{} (i int)".format(unique_database, test_tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
tables_removed_before = EventProcessorUtils.get_int_metric("tables-removed")
|
|
self.client.execute("drop table {}.{}".format(unique_database, test_tbl))
|
|
self.run_stmt_in_hive(
|
|
"create table {}.{} (i int, j int)".format(unique_database, test_tbl))
|
|
self.client.execute("invalidate metadata {}.{}".format(unique_database, test_tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
tables_removed_after = EventProcessorUtils.get_int_metric("tables-removed")
|
|
assert tables_removed_after == tables_removed_before
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_polling_interval_s=0")
|
|
def test_ep_delay_metadata_reload_for_insert(self, unique_database):
|
|
"""IMPALA-12277: This test verifies that insert operation on a partitioned table
|
|
succeeds if the partition is dropped previously externally with the event processor
|
|
being lagging or not active. Impala should create a new partition in this case."""
|
|
|
|
def verify_partition(is_transactional, test_table="insert_part_reload"):
|
|
query = " ".join(["create table {}.{} (i int)", " partitioned by (year int) ",
|
|
self._get_transactional_tblproperties(is_transactional)])
|
|
self.client.execute(query.format(unique_database, test_table))
|
|
self.client.execute("insert into {}.{} partition(year) values (0,2024), (0,2022)"
|
|
.format(unique_database, test_table))
|
|
self.run_stmt_in_hive("alter table {}.{} drop partition(year=2024)"
|
|
.format(unique_database, test_table))
|
|
self.run_stmt_in_hive("alter table {}.{} add partition(year=2023)"
|
|
.format(unique_database, test_table))
|
|
self.run_stmt_in_hive("insert into {}.{} partition(year=2021) values (1)"
|
|
.format(unique_database, test_table))
|
|
self.client.execute(
|
|
"insert into {}.{} partition(year) values (0,2023), (0,2024), (0,2022)"
|
|
.format(unique_database, test_table))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
results = self.client.execute(
|
|
"select DISTINCT(year) from {}.{} order by year desc"
|
|
.format(unique_database, test_table))
|
|
assert results.data == ["2024", "2023", "2022", "2021"]
|
|
self.client.execute("drop table {}.{}".format(unique_database, test_table))
|
|
verify_partition(True)
|
|
verify_partition(False)
|
|
|
|
def __search_table_version(self, profile, full_table_name):
|
|
"""Seach version of the first table mentioned in runtime protile."""
|
|
version_regex = r"Original Table Versions: {}, (\d+),".format(
|
|
full_table_name)
|
|
m = re.search(version_regex, profile)
|
|
assert m is not None, 'Searching for "{}" in profile but found none\n{}'.format(
|
|
version_regex, profile
|
|
)
|
|
return int(m.group(1))
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=1",
|
|
statestored_args=STATESTORED_ARGS,
|
|
disable_log_buffering=True, cluster_size=1)
|
|
def test_invalidate_stale_partition_on_reload(self, unique_database):
|
|
"""Test that metadata change from external system will eventually propagate
|
|
to Coordinator in local catalog mode."""
|
|
# Create table with 3 partitions.
|
|
test_tbl = unique_database + ".test_invalidate_table"
|
|
self.client.execute("create table {} (id int) partitioned by (p int)"
|
|
.format(test_tbl))
|
|
self.client.execute("alter table {} add partition (p=0)".format(test_tbl))
|
|
self.client.execute("alter table {} add partition (p=1)".format(test_tbl))
|
|
self.client.execute("alter table {} add partition (p=2)".format(test_tbl))
|
|
|
|
# Run Impala query to make sure all partitions metadata are loaded.
|
|
result = self.client.execute("select * from {}".format(test_tbl))
|
|
assert len(result.data) == 0
|
|
first_version = self.__search_table_version(result.runtime_profile, test_tbl)
|
|
|
|
# Run Hive query that triggers metadata change.
|
|
self.run_stmt_in_hive("SET hive.exec.dynamic.partition.mode=nonstrict; "
|
|
"insert into {} partition(p) values (0,0),(1,1),(2,2)".format(test_tbl))
|
|
|
|
# Let CatalogD hear about the new event.
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
|
|
# Wait for StatestoreD to propagate the update.
|
|
wait_statestore_heartbeat()
|
|
|
|
# Validate that relevant log lines are printed in Coordinator.
|
|
# This should be instantaneous after statestore update received, but set explicit
|
|
# long timeout as a precaution.
|
|
log_regex = r"Invalidated objects in cache: \[partition {}:p={}"\
|
|
.format(test_tbl, '%d')
|
|
self.assert_impalad_log_contains('INFO', log_regex % 0, expected_count=1,
|
|
timeout_s=20)
|
|
self.assert_impalad_log_contains('INFO', log_regex % 1, expected_count=1,
|
|
timeout_s=20)
|
|
self.assert_impalad_log_contains('INFO', log_regex % 2, expected_count=1,
|
|
timeout_s=20)
|
|
|
|
# Rerun Impala query and confirm that Coordinator use a newer table version.
|
|
result = self.client.execute("select * from {}".format(test_tbl))
|
|
assert len(result.data) == 3
|
|
second_version = self.__search_table_version(result.runtime_profile, test_tbl)
|
|
assert first_version < second_version
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal "
|
|
"--hms_event_polling_interval_s=1 "
|
|
"--invalidate_metadata_on_event_processing_failure=false "
|
|
"--debug_actions=mock_write_lock_failure:true",
|
|
disable_log_buffering=True, cluster_size=1)
|
|
def test_write_lock_on_partitioned_events(self, unique_database):
|
|
"""IMPALA-12277: This test verifies that CommitCompactionEvent on a partitioned table
|
|
succeeds if the write lock is not held by the table while processing the event by the
|
|
event processor. 'mock_write_lock_failure' mocks that there is a failure while
|
|
acquiring write lock for CommitCompactionEvent"""
|
|
test_tbl = unique_database + ".test_invalidate_table"
|
|
acid_props = self._get_transactional_tblproperties(True)
|
|
self.client.execute("create table {} (id int) partitioned by (p int) {}"
|
|
.format(test_tbl, acid_props))
|
|
for _ in range(10):
|
|
self.client.execute(
|
|
"insert into {} partition(p=0) values (1),(2),(3)".format(test_tbl))
|
|
self.run_stmt_in_hive(
|
|
"alter table {} partition(p=0) compact 'major' and wait".format(test_tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--enable_reload_events=true "
|
|
"--invalidate_metadata_on_event_processing_failure=false")
|
|
def test_reload_events_modified_concurrently(self):
|
|
"""IMPALA-13126: This test verifies that the event processor successfully consumes or
|
|
ignores the RELOAD event triggered by refresh operation on a partitioned table if the
|
|
partitions are modified concurrently on the table."""
|
|
tbl = "scale_db.num_partitions_1234_blocks_per_partition_1"
|
|
refresh_stmt = "refresh {} partition(j=0)".format(tbl)
|
|
for _ in range(32):
|
|
self.client.execute_async(refresh_stmt)
|
|
for _ in range(100):
|
|
self.client.execute(
|
|
"alter table {} add if not exists partition(j=-1)".format(tbl))
|
|
self.client.execute(
|
|
"alter table {} drop partition(j=-1)".format(tbl))
|
|
|
|
try:
|
|
EventProcessorUtils.wait_for_event_processing(self, 1000) # bigger timeout required
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
finally:
|
|
# Make sure the table doesn't change after this test
|
|
self.execute_query("alter table {} drop if exists partition(j=-1)".format(tbl))
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--invalidate_metadata_on_event_processing_failure=false")
|
|
def test_encoded_partition_values(self, unique_database):
|
|
# Create a table with URL encoded partition values
|
|
tbl = unique_database + ".tbl"
|
|
self.execute_query("create table {}(i int) partitioned by(s string)".format(tbl))
|
|
part_values = [
|
|
"00:00:00",
|
|
"00%3A00%3A00",
|
|
"00%253A00%253A00",
|
|
]
|
|
for p in part_values:
|
|
self.execute_query("alter table {} add partition (s='{}')".format(tbl, p))
|
|
|
|
# Generate INSERT event on a URL encoded partition which could be incorrectly decoded
|
|
# to another partition, e.g. '00%3A00%3A00' -> '00:00:00'
|
|
self.run_stmt_in_hive(
|
|
"insert into table {} partition(s='00%3A00%3A00') values (0)".format(tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
res = self.execute_query("select * from " + tbl)
|
|
assert len(res.data) == 1
|
|
assert res.data[0] == '0\t00%3A00%3A00'
|
|
|
|
# Test on processing batch of such partition events
|
|
self.run_stmt_in_hive(
|
|
"set hive.exec.dynamic.partition.mode=nonstrict; "
|
|
"insert into table {} values (1, '00%3A00%3A00'), (2, '00%253A00%253A00')"
|
|
.format(tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
res = self.execute_query("select * from " + tbl)
|
|
assert len(res.data) == 3
|
|
assert '0\t00%3A00%3A00' in res.data
|
|
assert '1\t00%3A00%3A00' in res.data
|
|
assert '2\t00%253A00%253A00' in res.data
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal",
|
|
disable_log_buffering=True, cluster_size=1)
|
|
def test_bulk_alter_partitions(self, unique_database):
|
|
test_tbl = unique_database + ".bulk_alter_partitions"
|
|
self.client.execute("create table {} (id int) partitioned by (p int)"
|
|
.format(test_tbl))
|
|
self.run_stmt_in_hive("SET hive.exec.dynamic.partition.mode=nonstrict; "
|
|
"insert into {} partition(p) values (0,0),(1,1),(2,2)".format(test_tbl))
|
|
self.client.execute("select * from {}".format(test_tbl))
|
|
self.client.execute("compute incremental stats {}".format(test_tbl))
|
|
self.client.execute("drop incremental stats {} partition(p>=0)".format(test_tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
log_regex = r"HMS alterPartitions done on {}/{} partitions of table {}" \
|
|
.format(3, 3, test_tbl)
|
|
# we see the above twice, once for compute stats and second for drop stats
|
|
self.assert_catalogd_log_contains('INFO', log_regex, expected_count=2, timeout_s=20)
|
|
self.client.execute("alter table {} partition(p>=0) set cached in 'testPool'"
|
|
.format(test_tbl))
|
|
self.assert_catalogd_log_contains('INFO', log_regex, expected_count=3, timeout_s=20)
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_polling_interval_s=0.2"
|
|
" --enable_hierarchical_event_processing=true"
|
|
" --num_db_event_executors=2"
|
|
" --num_table_event_executors_per_db_event_executor=2")
|
|
def test_create_table_after_rename(self, unique_database):
|
|
# Test creating table with the name after rename it to another
|
|
t1 = "t1"
|
|
t2 = "t2"
|
|
# Create tables t1 and t2 and load them on catalogd
|
|
self.run_stmt_in_hive("create table {}.{} (i int)".format(unique_database, t1))
|
|
self.run_stmt_in_hive("create table {}.{} (i int)".format(unique_database, t2))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
self.client.execute("describe formatted {}.{}".format(unique_database, t1))
|
|
self.client.execute("describe formatted {}.{}".format(unique_database, t2))
|
|
# Drop table t2, rename t1 to t2, load some data to t2 and create t1 from hive again
|
|
self.client.execute(":event_processor('pause')")
|
|
self.run_stmt_in_hive("drop table {}.{}".format(unique_database, t2))
|
|
self.run_stmt_in_hive("alter table {}.{} rename to {}.{}".format(unique_database, t1,
|
|
unique_database, t2))
|
|
self.run_stmt_in_hive("insert into {}.{} values (1),(2)".format(unique_database, t2))
|
|
self.run_stmt_in_hive("create table {}.{} as select * from {}.{}"
|
|
.format(unique_database, t1, unique_database, t2))
|
|
self.client.execute(":event_processor('start')")
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
res = self.execute_query_expect_success(self.client,
|
|
"select count(*) from {}.{}"
|
|
.format(unique_database, t1))
|
|
assert int(res.get_data()) == 2
|
|
res = self.execute_query_expect_success(self.client,
|
|
"select count(*) from {}.{}"
|
|
.format(unique_database, t2))
|
|
assert int(res.get_data()) == 2
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_polling_interval_s=0.2"
|
|
" --enable_hierarchical_event_processing=true"
|
|
" --num_db_event_executors=2"
|
|
" --num_table_event_executors_per_db_event_executor=1")
|
|
@UniqueDatabase.parametrize(num_dbs=2, sync_ddl=True)
|
|
def test_rename_table_hierarchical_two_db_and_table_threads(self, unique_database):
|
|
# Test rename table across DBs on different db executor(implicitly different table
|
|
# executors)
|
|
# Test for events generated through impala
|
|
src_db = unique_database
|
|
target_db = unique_database + '2'
|
|
self.rename_table_with_hierarchical_event_process(src_db, target_db, "t1", "t2", True)
|
|
# Test for events generated through hive
|
|
self.rename_table_with_hierarchical_event_process(src_db, target_db, "t3", "t4",
|
|
False)
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_polling_interval_s=0.2"
|
|
" --enable_hierarchical_event_processing=true"
|
|
" --num_db_event_executors=1"
|
|
" --num_table_event_executors_per_db_event_executor=2")
|
|
@UniqueDatabase.parametrize(num_dbs=2, sync_ddl=True)
|
|
def test_rename_table_hierarchical_one_db_and_two_table_threads(self, unique_database):
|
|
# Test for events generated through impala
|
|
# Test rename table across DBs on same db executor and 2 different table executors
|
|
src_db = unique_database
|
|
target_db = unique_database + '2'
|
|
self.rename_table_with_hierarchical_event_process(src_db, target_db, "t1", "t2", True)
|
|
# Test rename table within DB(same db executor) and 2 different table executors
|
|
self.rename_table_with_hierarchical_event_process(src_db, src_db, "t3", "t4", True)
|
|
# Test for events generated through hive
|
|
# Test rename table across DBs on same db executor and 2 different table executors
|
|
self.rename_table_with_hierarchical_event_process(src_db, target_db, "t5", "t6",
|
|
False)
|
|
# Test rename table within DB(same db executor) and 2 different table executors
|
|
self.rename_table_with_hierarchical_event_process(src_db, src_db, "t7", "t8", False)
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_polling_interval_s=0.2"
|
|
" --enable_hierarchical_event_processing=true"
|
|
" --num_db_event_executors=1"
|
|
" --num_table_event_executors_per_db_event_executor=1")
|
|
def test_rename_table_hierarchical_one_db_and_one_table_threads(self, unique_database):
|
|
# Test for events generated through impala
|
|
# Test rename table within DB(same db executor) and same table executor
|
|
self.rename_table_with_hierarchical_event_process(unique_database, unique_database,
|
|
"t1", "t2", True)
|
|
# Test for events generated through hive
|
|
# Test rename table within DB(same db executor) and same table executor
|
|
self.rename_table_with_hierarchical_event_process(unique_database, unique_database,
|
|
"t3", "t4", False)
|
|
|
|
def rename_table_with_hierarchical_event_process(self, src_db_name, target_db_name,
|
|
src_table_name, target_table_name,
|
|
is_exec_from_impala):
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
if is_exec_from_impala:
|
|
execute = self.client.execute
|
|
prev_events_skipped = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
else:
|
|
execute = self.run_stmt_in_hive
|
|
prev_create_table_metric = EventProcessorUtils.get_int_metric('tables-added', 0)
|
|
execute("create table {}.{} (i int)".format(src_db_name, src_table_name))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
if is_exec_from_impala:
|
|
curr_events_skipped = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
assert curr_events_skipped == prev_events_skipped + 1
|
|
prev_events_skipped = curr_events_skipped
|
|
else:
|
|
curr_create_table_metric = EventProcessorUtils.get_int_metric('tables-added', 0)
|
|
assert curr_create_table_metric == prev_create_table_metric + 1
|
|
prev_create_table_metric = curr_create_table_metric
|
|
prev_drop_table_metric = EventProcessorUtils.get_int_metric('tables-removed', 0)
|
|
execute("alter table {}.{} rename to {}.{}".format(src_db_name, src_table_name,
|
|
target_db_name, target_table_name))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
if is_exec_from_impala:
|
|
curr_events_skipped = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
assert curr_events_skipped == prev_events_skipped + 1
|
|
else:
|
|
curr_create_table_metric = EventProcessorUtils.get_int_metric('tables-added', 0)
|
|
curr_drop_table_metric = EventProcessorUtils.get_int_metric('tables-removed', 0)
|
|
assert curr_drop_table_metric == prev_drop_table_metric + 1
|
|
assert curr_create_table_metric == prev_create_table_metric + 1
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_polling_interval_s=1")
|
|
def test_hms_sync_disabled_txn_events(self, unique_database):
|
|
"""Verify IMPALA-12829: skip txn events if db/table has disabled applying HMS
|
|
events"""
|
|
# Iterate through different levels at which HMS sync can be disabled
|
|
for disable_level in ['table', 'database']:
|
|
for partitioned in [False, True]:
|
|
event_id_before = EventProcessorUtils.get_last_synced_event_id()
|
|
txn_table = "part_tbl" if partitioned else "tbl"
|
|
full_tbl_name = unique_database + '.' + txn_table
|
|
part_create = " partitioned by (year int)" if partitioned else ""
|
|
part_insert = " partition (year=2025)" if partitioned else ""
|
|
|
|
create_stmt = "create transactional table {} (i int){}".format(full_tbl_name,
|
|
part_create)
|
|
self.run_stmt_in_hive(create_stmt)
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
if disable_level == 'database':
|
|
self.run_stmt_in_hive(
|
|
"""ALTER DATABASE {} SET DBPROPERTIES ('impala.disableHmsSync'='true')"""
|
|
.format(unique_database))
|
|
else:
|
|
self.run_stmt_in_hive(
|
|
"""ALTER TABLE {} SET TBLPROPERTIES ('impala.disableHmsSync'='true')"""
|
|
.format(full_tbl_name))
|
|
self.client.execute("select * from {}".format(full_tbl_name))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
tbls_refreshed_before = EventProcessorUtils.get_int_metric('tables-refreshed', 0)
|
|
partitions_refreshed_before = EventProcessorUtils.get_int_metric(
|
|
'partitions-refreshed', 0)
|
|
self.client.execute(":event_processor('pause')")
|
|
# commit txn event from insert operation
|
|
if partitioned:
|
|
self.client.execute("ALTER TABLE {} ADD PARTITION(year=2025)".format(
|
|
full_tbl_name))
|
|
self.run_stmt_in_hive("insert into {} {} values (1),(2),(3)"
|
|
.format(full_tbl_name, part_insert))
|
|
self.client.execute(":event_processor('start')")
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
tbls_refreshed_after = EventProcessorUtils.get_int_metric('tables-refreshed', 0)
|
|
partitions_refreshed_after = EventProcessorUtils.get_int_metric(
|
|
'partitions-refreshed', 0)
|
|
assert tbls_refreshed_after == tbls_refreshed_before
|
|
assert partitions_refreshed_after == partitions_refreshed_before
|
|
assert EventProcessorUtils.get_last_synced_event_id() > event_id_before
|
|
self.client.execute("""DROP DATABASE {} CASCADE""".format(unique_database))
|
|
self.client.execute("""CREATE DATABASE {}""".format(unique_database))
|
|
|
|
@SkipIf.is_test_jdk
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=1 "
|
|
"--debug_actions=catalogd_event_processing_delay:SLEEP@1000",
|
|
hive_conf_dir=HIVE_SITE_ALTER_PARTITIONS_EVENT)
|
|
def test_alter_partitions_event_from_metastore(self, unique_database):
|
|
tbl = unique_database + ".test_alter_partitions"
|
|
self.client.execute("create table {} (id int) partitioned by (year int)"
|
|
.format(tbl))
|
|
|
|
def _verify_alter_partitions_event(events):
|
|
event_found = False
|
|
for event in events:
|
|
if event.eventType == "ALTER_PARTITIONS":
|
|
event_found = True
|
|
else:
|
|
logging.debug("Found " + str(event))
|
|
return event_found
|
|
|
|
# Verify that test always generates single ALTER_PARTITIONS event
|
|
self.client.execute(
|
|
"insert into {} partition(year) values (0,2024), (1,2023), (2,2022)"
|
|
.format(tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self, 10)
|
|
|
|
# Case-I: compute stats from hive
|
|
parts_refreshed_before = EventProcessorUtils.get_int_metric("partitions-refreshed")
|
|
batch_events_before = EventProcessorUtils.get_int_metric("batch-events-created")
|
|
self.run_stmt_in_hive("analyze table {} compute statistics".format(tbl))
|
|
EventProcessorUtils.wait_for_event_processing(self, 10)
|
|
batch_events_after = EventProcessorUtils.get_int_metric("batch-events-created")
|
|
parts_refreshed_after = EventProcessorUtils.get_int_metric("partitions-refreshed")
|
|
assert batch_events_after == batch_events_before # verify there are no new batches
|
|
assert parts_refreshed_after == parts_refreshed_before + 3
|
|
|
|
# Case-II: compute stats from impala
|
|
last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
|
|
self.client.execute("compute stats {}".format(tbl))
|
|
events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
EventProcessorUtils.wait_for_event_processing(self, 10)
|
|
events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
|
|
# There will be COMMIT_TXN, ALLOC_WRITE_ID_EVENT, ALTER_PARTITIONS in any order
|
|
events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
assert _verify_alter_partitions_event(events)
|
|
assert events_skipped_after > events_skipped_before
|
|
|
|
# Case-III: truncate table from Impala
|
|
last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
|
|
self.client.execute("truncate table {}".format(tbl))
|
|
events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
EventProcessorUtils.wait_for_event_processing(self, 10)
|
|
events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
|
|
events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
assert _verify_alter_partitions_event(events)
|
|
assert events_skipped_after > events_skipped_before
|
|
|
|
# Case-IV: Truncate table from Hive is currently generating single alter_partition
|
|
# events. HIVE-28668 will address it.
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_polling_interval_s=1 "
|
|
"--disable_hms_sync_by_default=true")
|
|
def test_disable_hms_sync_globally(self, unique_database):
|
|
"""Verify IMPALA-14131: hms events are synced/skipped based on global flag
|
|
--disable_hms_sync_by_default and the db/table property 'impala.disableHmsSync'"""
|
|
tbl1 = unique_database + ".test_disable_hms_sync_1"
|
|
tbl2 = unique_database + ".test_disable_hms_sync_2"
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
|
|
# Case 1: verify global config
|
|
events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
self.run_stmt_in_hive(
|
|
"""create table {} (id int) partitioned by (year int);
|
|
create table {} (id int);""".format(tbl1, tbl2))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
assert events_skipped_after > events_skipped_before
|
|
table_names = self.client.execute("show tables in {}".format(unique_database))\
|
|
.get_data()
|
|
assert not table_names
|
|
|
|
def _check_insert_events(tbl, expected_val, skip_events=0, part=''):
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
# modify data externally
|
|
self.run_stmt_in_hive(
|
|
"""insert into {tb1} {partition} values(1),(2);"""
|
|
.format(tb1=tbl, partition=part))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0)
|
|
assert events_skipped_after == events_skipped_before + skip_events, \
|
|
"Expected {} events to be skipped, but {} events were skipped.".format(
|
|
skip_events, events_skipped_after - events_skipped_before)
|
|
data = self.client.execute("select * from {}".format(tbl))
|
|
assert len(data.data) == expected_val, \
|
|
"Expected {} rows in table {}, but found {}.".format(expected_val, tbl,
|
|
len(data.data))
|
|
|
|
# Case 2: Enable hms sync at database level but disabled globally
|
|
def validate_hms_sync(unique_database, tbl, partition=''):
|
|
# load tables in cache
|
|
self.client.execute("invalidate metadata {}".format(tbl))
|
|
self.client.execute("describe {}".format(tbl))
|
|
self.run_stmt_in_hive(
|
|
"""ALTER DATABASE {} SET DBPROPERTIES ('impala.disableHmsSync'='false')"""
|
|
.format(unique_database))
|
|
_check_insert_events(tbl, 2, 0, partition)
|
|
|
|
validate_hms_sync(unique_database, tbl1, partition='partition(year=2024)')
|
|
validate_hms_sync(unique_database, tbl2)
|
|
|
|
# Case 3: disable hms sync at database level and enable it at table level
|
|
self.run_stmt_in_hive(
|
|
"""ALTER DATABASE {} SET DBPROPERTIES ('impala.disableHmsSync'='true')"""
|
|
.format(unique_database))
|
|
self.client.execute(
|
|
"""alter table {} SET TBLPROPERTIES ('impala.disableHmsSync'='false')"""
|
|
.format(tbl1))
|
|
self.client.execute(
|
|
"""alter table {} SET TBLPROPERTIES ('impala.disableHmsSync'='false')"""
|
|
.format(tbl2))
|
|
EventProcessorUtils.wait_for_event_processing(self)
|
|
_check_insert_events(tbl1, 4, skip_events=1, part='partition(year=2024)')
|
|
_check_insert_events(tbl2, 4, skip_events=0)
|
|
|
|
|
|
@SkipIfFS.hive
|
|
class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):
|
|
"""This class contains tests that exercise the event processing mechanism in the
|
|
catalog for non-default configurations"""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestEventProcessingWithImpala, cls).add_test_dimensions()
|
|
|
|
# Inject 2s sleep at catalogd to interleave with 1s poll period of
|
|
# HMS Event Processor.
|
|
add_exec_option_dimension(
|
|
cls, 'debug_action', [None, 'catalogd_load_metadata_delay:SLEEP@2000'])
|
|
add_exec_option_dimension(cls, 'sync_ddl', [0, 1])
|
|
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('sync_ddl') == 0 or v.get_value('debug_action') is not None)
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
|
|
def test_self_events_with_impala(self, vector, unique_database):
|
|
"""Runs multiple queries which generate events using impala as client"""
|
|
# Print trace logs from DebugUtils.
|
|
self.cluster.catalogd.set_jvm_log_level("org.apache.impala.util.DebugUtils", "trace")
|
|
self._run_self_events_test(unique_database, vector.get_value('exec_option'),
|
|
use_impala=True)
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--hms_event_polling_interval_s=0.2"
|
|
" --enable_hierarchical_event_processing=true")
|
|
def test_self_events_with_impala_hierarchical(self, vector, unique_database):
|
|
self._run_self_events_test(unique_database, vector.get_value('exec_option'),
|
|
use_impala=True)
|
|
|
|
|
|
@SkipIfFS.hive
|
|
class TestEventSyncFailures(TestEventProcessingCustomConfigsBase):
|
|
|
|
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=0")
|
|
def test_hms_event_sync_with_event_processing_disabled(self, vector):
|
|
"""Test with HMS event processing disabled. Verify the error message appears
|
|
correctly in non-strict mode. Verify the query fails in strict mode."""
|
|
client = self.default_impala_client(vector.get_value('protocol'))
|
|
query = "select count(*) from functional.alltypes"
|
|
# Timeline label shown in the profile when the wait failed.
|
|
label = "Continuing without syncing Metastore events"
|
|
# Verify error messages in non-strict mode
|
|
client.set_configuration({"sync_hms_events_wait_time_s": 10})
|
|
handle = client.execute_async(query)
|
|
client.wait_for_finished_timeout(handle, 5)
|
|
results = client.fetch(query, handle)
|
|
assert results.success
|
|
assert len(results.data) == 1
|
|
assert int(results.data[0]) == 7300
|
|
|
|
client_log = client.get_log(handle)
|
|
expected_error = "Continuing without syncing Metastore events: " \
|
|
"HMS event processing is disabled"
|
|
assert expected_error in client_log
|
|
profile = client.get_runtime_profile(handle)
|
|
assert "Errors: " + expected_error in profile, profile
|
|
self.verify_timeline_item("Query Compilation", label, profile)
|
|
client.close_query(handle)
|
|
|
|
# Verify multi-lines in error log
|
|
client.set_configuration({"sync_hms_events_wait_time_s": 10,
|
|
"debug_action": "0:PREPARE:INJECT_ERROR_LOG"})
|
|
handle = client.execute_async(query)
|
|
client.wait_for_finished_timeout(handle, 5)
|
|
client_log = client.get_log(handle)
|
|
expected_error += "\nDebug Action: INJECT_ERROR_LOG"
|
|
assert expected_error in client_log
|
|
profile = client.get_runtime_profile(handle)
|
|
assert "Errors: " + expected_error in profile, profile
|
|
self.verify_timeline_item("Query Compilation", label, profile)
|
|
|
|
# Verify failures in strict mode
|
|
err = self.execute_query_expect_failure(
|
|
client, query, EVENT_SYNC_QUERY_OPTIONS)
|
|
expected_error = "Failed to sync events from Metastore: " \
|
|
"HMS event processing is disabled"
|
|
assert expected_error in str(err)
|
|
client.close_query(handle)
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--debug_actions=catalogd_event_processing_delay:SLEEP@4000")
|
|
def test_hms_event_sync_timeout(self, vector, unique_database):
|
|
client = self.default_impala_client(vector.get_value('protocol'))
|
|
# Timeline label shown in the profile when the wait failed.
|
|
label = "Continuing without syncing Metastore events"
|
|
# Prepare a partitioned table and load it into catalogd
|
|
create_stmt = "create table {}.part (i int) partitioned by (p int)".format(
|
|
unique_database)
|
|
self.execute_query_expect_success(client, create_stmt)
|
|
self.execute_query_expect_success(client, "describe {}.part".format(unique_database))
|
|
|
|
# Add a partition in Hive which generates an ADD_PARTITION event
|
|
alter_stmt = "insert into {}.part partition (p=0) values (0)".format(unique_database)
|
|
self.run_stmt_in_hive(alter_stmt)
|
|
|
|
# SELECT gets 0 rows since timeout waiting for HMS events to be synced
|
|
query = "select * from {}.part".format(unique_database)
|
|
client.set_configuration({"sync_hms_events_wait_time_s": 2})
|
|
handle = client.execute_async(query)
|
|
client.wait_for_finished_timeout(handle, 60)
|
|
results = client.fetch(query, handle)
|
|
assert results.success
|
|
assert len(results.data) == 0
|
|
|
|
# Verify the warnings
|
|
client_log = client.get_log(handle)
|
|
expected_error = (
|
|
"Continuing without syncing Metastore events: "
|
|
"Timeout waiting for HMS events to be synced. Event id to wait for:")
|
|
assert expected_error in client_log
|
|
assert ". Last synced event id: " in client_log
|
|
profile = client.get_runtime_profile(handle)
|
|
assert "Errors: " + expected_error in profile, profile
|
|
self.verify_timeline_item("Query Compilation", label, profile)
|
|
# The duration is something like "2s034ms". Just checking "2s" here.
|
|
assert "- Continuing without syncing Metastore events: 2s" in profile, profile
|
|
client.close_query(handle)
|
|
|
|
# SELECT gets the new row if waiting for enough time
|
|
results = self.execute_query_expect_success(client, query, EVENT_SYNC_QUERY_OPTIONS)
|
|
assert len(results.data) == 1
|
|
|
|
|
|
@SkipIfFS.hive
|
|
class TestEventSyncWaiting(TestEventProcessingCustomConfigsBase):
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--debug_actions=catalogd_event_processing_delay:SLEEP@200")
|
|
def test_hms_event_sync_with_commit_compaction(self, vector, unique_database):
|
|
"""Test waiting for COMMIT_COMPACTION_EVENT. There is always a COMMIT_TXN event
|
|
before it so use a sleep of 200ms to delay processing it."""
|
|
client = self.default_impala_client(vector.get_value('protocol'))
|
|
client.set_configuration({"sync_hms_events_wait_time_s": 10})
|
|
tbl = unique_database + ".foo"
|
|
self.run_stmt_in_hive("""create transactional table {} partitioned by(p)
|
|
as select 0 as i, 0 as p""".format(tbl))
|
|
self.run_stmt_in_hive("insert into {} select 1,0".format(tbl))
|
|
res = self.execute_query_expect_success(client, "show files in " + tbl)
|
|
assert len(res.data) == 2
|
|
self.run_stmt_in_hive(
|
|
"alter table {} partition(p=0) compact 'minor' and wait".format(tbl))
|
|
res = self.execute_query_expect_success(client, "show files in " + tbl)
|
|
assert len(res.data) == 1
|