mirror of
https://github.com/apache/impala.git
synced 2025-12-19 09:58:28 -05:00
TestEventProcessing.test_event_based_replication is turning flaky when there is a lag replication of a database that has too many events to replicate. The case III in the test is turning flaky because the event processor has to processes so many ALTER_PARTITIONS events that valid writeId list can be inaccurate when the replication is not complete. So a 20 sec timeout is introduced in case III after replication so that event processor will process events after replication process is completely done. Testing: - Looped the test 100 times to avoid flakiness Change-Id: I89fcd951f6a65ab7fe97c4f23554d93d9ba12f4e Reviewed-on: http://gerrit.cloudera.org:8080/22131 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Riza Suminto <riza.suminto@cloudera.com>
343 lines
18 KiB
Python
343 lines
18 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
from __future__ import absolute_import, division, print_function
|
|
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.util.event_processor_utils import EventProcessorUtils
|
|
|
|
EVENT_SYNC_QUERY_OPTIONS = {
|
|
"sync_hms_events_wait_time_s": 10,
|
|
"sync_hms_events_strict_mode": True
|
|
}
|
|
|
|
|
|
class TestEventProcessingBase(ImpalaTestSuite):
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
super(TestEventProcessingBase, cls).setup_class()
|
|
|
|
@classmethod
|
|
def _run_test_insert_events_impl(cls, suite, unique_database, is_transactional=False):
|
|
"""Test for insert event processing. Events are created in Hive and processed in
|
|
Impala. The following cases are tested :
|
|
Insert into table --> for partitioned and non-partitioned table
|
|
Insert overwrite table --> for partitioned and non-partitioned table
|
|
Insert into partition --> for partitioned table
|
|
"""
|
|
# TODO: change into an instance method and remove argument "suite" (IMPALA-14174)
|
|
with suite.create_impala_client() as impala_client:
|
|
# Test table with no partitions.
|
|
tbl_insert_nopart = 'tbl_insert_nopart'
|
|
suite.run_stmt_in_hive(
|
|
"drop table if exists %s.%s" % (unique_database, tbl_insert_nopart))
|
|
tblproperties = ""
|
|
if is_transactional:
|
|
tblproperties = "tblproperties ('transactional'='true'," \
|
|
"'transactional_properties'='insert_only')"
|
|
cls.run_stmt_in_hive("create table %s.%s (id int, val int) %s"
|
|
% (unique_database, tbl_insert_nopart, tblproperties))
|
|
impala_client.set_configuration(EVENT_SYNC_QUERY_OPTIONS)
|
|
# Test CTAS and insert by Impala with empty results (IMPALA-10765).
|
|
cls.execute_query_expect_success(impala_client,
|
|
"create table {db}.ctas_tbl {prop} as select * from {db}.{tbl}"
|
|
.format(db=unique_database, tbl=tbl_insert_nopart, prop=tblproperties))
|
|
cls.execute_query_expect_success(impala_client,
|
|
"insert into {db}.ctas_tbl select * from {db}.{tbl}"
|
|
.format(db=unique_database, tbl=tbl_insert_nopart))
|
|
# Test insert into table, this will fire an insert event.
|
|
cls.run_stmt_in_hive("insert into %s.%s values(101, 200)"
|
|
% (unique_database, tbl_insert_nopart))
|
|
# With MetastoreEventProcessor running, the insert event will be processed. Query
|
|
# the table from Impala. Verify that the data is present in Impala.
|
|
data = cls.execute_scalar_expect_success(impala_client, "select * from %s.%s" %
|
|
(unique_database, tbl_insert_nopart))
|
|
assert data.split('\t') == ['101', '200']
|
|
|
|
# Test insert overwrite. Overwrite the existing value.
|
|
cls.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)"
|
|
% (unique_database, tbl_insert_nopart))
|
|
# Make sure the event has been processed using sync_hms_events_wait_time_s.
|
|
# Verify that the data is present in Impala.
|
|
data = cls.execute_scalar_expect_success(impala_client, "select * from %s.%s" %
|
|
(unique_database, tbl_insert_nopart))
|
|
assert data.split('\t') == ['101', '201']
|
|
# Test insert overwrite by Impala with empty results (IMPALA-10765).
|
|
cls.execute_query_expect_success(impala_client,
|
|
"insert overwrite {db}.{tbl} select * from {db}.ctas_tbl"
|
|
.format(db=unique_database, tbl=tbl_insert_nopart))
|
|
result = cls.execute_query_expect_success(impala_client,
|
|
"select * from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_nopart))
|
|
assert len(result.data) == 0
|
|
|
|
# Test partitioned table.
|
|
tbl_insert_part = 'tbl_insert_part'
|
|
cls.run_stmt_in_hive("drop table if exists %s.%s"
|
|
% (unique_database, tbl_insert_part))
|
|
cls.run_stmt_in_hive("create table %s.%s (id int, name string) "
|
|
"partitioned by(day int, month int, year int) %s"
|
|
% (unique_database, tbl_insert_part, tblproperties))
|
|
# Test insert overwrite by Impala with empty results (IMPALA-10765).
|
|
cls.execute_query_expect_success(impala_client,
|
|
"create table {db}.ctas_part partitioned by (day, month, year) {prop} as "
|
|
"select * from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part,
|
|
prop=tblproperties))
|
|
cls.execute_query_expect_success(impala_client,
|
|
"insert into {db}.ctas_part partition(day=0, month=0, year=0) select id, "
|
|
"name from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part))
|
|
# Insert data into partitions.
|
|
cls.run_stmt_in_hive(
|
|
"insert into %s.%s partition(day=28, month=03, year=2019)"
|
|
"values(101, 'x')" % (unique_database, tbl_insert_part))
|
|
# Make sure the event has been processed using sync_hms_events_wait_time_s.
|
|
# Verify that the data is present in Impala.
|
|
data = cls.execute_scalar_expect_success(impala_client,
|
|
"select * from %s.%s" % (unique_database, tbl_insert_part))
|
|
assert data.split('\t') == ['101', 'x', '28', '3', '2019']
|
|
|
|
# Test inserting into existing partitions.
|
|
cls.run_stmt_in_hive(
|
|
"insert into %s.%s partition(day=28, month=03, year=2019)"
|
|
"values(102, 'y')" % (unique_database, tbl_insert_part))
|
|
# Verify that the data is present in Impala.
|
|
data = cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from %s.%s where day=28 and month=3 "
|
|
"and year=2019" % (unique_database, tbl_insert_part))
|
|
assert data.split('\t') == ['2']
|
|
# Test inserting into existing partitions by Impala with empty results
|
|
# (IMPALA-10765).
|
|
cls.execute_query_expect_success(impala_client,
|
|
"insert into {db}.{tbl} partition(day=28, month=03, year=2019) "
|
|
"select id, name from {db}.ctas_part"
|
|
.format(db=unique_database, tbl=tbl_insert_part))
|
|
|
|
# Test insert overwrite into existing partitions
|
|
cls.run_stmt_in_hive(
|
|
"insert overwrite table %s.%s partition(day=28, month=03, "
|
|
"year=2019)" "values(101, 'z')" % (unique_database, tbl_insert_part))
|
|
# Verify that the data is present in Impala.
|
|
data = cls.execute_scalar_expect_success(impala_client,
|
|
"select * from %s.%s where day=28 and month=3 and"
|
|
" year=2019 and id=101" % (unique_database, tbl_insert_part))
|
|
assert data.split('\t') == ['101', 'z', '28', '3', '2019']
|
|
impala_client.clear_configuration()
|
|
# Test insert overwrite into existing partitions by Impala with empty results
|
|
# (IMPALA-10765).
|
|
cls.execute_query_expect_success(impala_client, "insert overwrite {db}.{tbl} "
|
|
"partition(day=28, month=03, year=2019) "
|
|
"select id, name from {db}.ctas_part"
|
|
.format(db=unique_database, tbl=tbl_insert_part))
|
|
result = cls.execute_query_expect_success(impala_client, "select * from {db}.{tbl} "
|
|
"where day=28 and month=3 and year=2019"
|
|
.format(db=unique_database, tbl=tbl_insert_part))
|
|
assert len(result.data) == 0
|
|
|
|
@classmethod
|
|
def _run_event_based_replication_tests_impl(cls, suite,
|
|
filesystem_client, transactional=True):
|
|
"""Hive Replication relies on the insert events generated on the tables.
|
|
This test issues some basic replication commands from Hive and makes sure
|
|
that the replicated table has correct data."""
|
|
# TODO: change into an instance method and remove argument "suite" (IMPALA-14174)
|
|
TBLPROPERTIES = cls._get_transactional_tblproperties(transactional)
|
|
source_db = ImpalaTestSuite.get_random_name("repl_source_")
|
|
target_db = ImpalaTestSuite.get_random_name("repl_target_")
|
|
unpartitioned_tbl = "unpart_tbl"
|
|
partitioned_tbl = "part_tbl"
|
|
impala_client = suite.create_impala_client()
|
|
try:
|
|
cls.run_stmt_in_hive("create database {0}".format(source_db))
|
|
cls.run_stmt_in_hive(
|
|
"alter database {0} set dbproperties ('repl.source.for'='xyz')"
|
|
.format(source_db))
|
|
impala_client.set_configuration(EVENT_SYNC_QUERY_OPTIONS)
|
|
# explicit create table command since create table like doesn't allow tblproperties
|
|
impala_client.execute("create table {0}.{1} (a string, b string) stored as parquet"
|
|
" {2}".format(source_db, unpartitioned_tbl, TBLPROPERTIES))
|
|
impala_client.execute(
|
|
"create table {0}.{1} (id int, bool_col boolean, tinyint_col tinyint, "
|
|
"smallint_col smallint, int_col int, bigint_col bigint, float_col float, "
|
|
"double_col double, date_string string, string_col string, "
|
|
"timestamp_col timestamp) partitioned by (year int, month int) stored as parquet"
|
|
" {2}".format(source_db, partitioned_tbl, TBLPROPERTIES))
|
|
|
|
# case I: insert
|
|
# load the table with some data from impala, this also creates new partitions.
|
|
impala_client.execute("insert into {0}.{1}"
|
|
" select * from functional.tinytable".format(source_db,
|
|
unpartitioned_tbl))
|
|
impala_client.execute("insert into {0}.{1} partition(year,month)"
|
|
" select * from functional_parquet.alltypessmall".format(
|
|
source_db, partitioned_tbl))
|
|
rows_in_unpart_tbl = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from {0}.{1}".format(source_db, unpartitioned_tbl)).split('\t')[
|
|
0])
|
|
rows_in_part_tbl = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from {0}.{1}".format(source_db, partitioned_tbl))
|
|
.split('\t')[0])
|
|
assert rows_in_unpart_tbl > 0
|
|
assert rows_in_part_tbl > 0
|
|
# bootstrap the replication
|
|
cls.run_stmt_in_hive("repl dump {0}".format(source_db))
|
|
# create a target database where tables will be replicated
|
|
impala_client.execute("create database {0}".format(target_db))
|
|
# replicate the table from source to target
|
|
cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
|
|
target_db))
|
|
assert unpartitioned_tbl in impala_client.execute(
|
|
"show tables in {0}".format(target_db)).get_data()
|
|
assert partitioned_tbl in impala_client.execute(
|
|
"show tables in {0}".format(target_db)).get_data()
|
|
# confirm the number of rows in target match with the source table.
|
|
rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
|
|
.split('\t')[0])
|
|
rows_in_part_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from {0}.{1}".format(target_db, partitioned_tbl))
|
|
.split('\t')[0])
|
|
assert rows_in_unpart_tbl == rows_in_unpart_tbl_target
|
|
assert rows_in_part_tbl == rows_in_part_tbl_target
|
|
|
|
# case II: insert into existing partitions.
|
|
impala_client.execute("insert into {0}.{1}"
|
|
" select * from functional.tinytable".format(
|
|
source_db, unpartitioned_tbl))
|
|
impala_client.execute("insert into {0}.{1} partition(year,month)"
|
|
" select * from functional_parquet.alltypessmall".format(
|
|
source_db, partitioned_tbl))
|
|
cls.run_stmt_in_hive("repl dump {0}".format(source_db))
|
|
# replicate the table from source to target
|
|
cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
|
|
target_db))
|
|
# confirm the number of rows in target match with the source table.
|
|
rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
|
|
.split('\t')[0])
|
|
rows_in_part_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from {0}.{1}".format(target_db, partitioned_tbl))
|
|
.split('\t')[0])
|
|
assert 2 * rows_in_unpart_tbl == rows_in_unpart_tbl_target
|
|
assert 2 * rows_in_part_tbl == rows_in_part_tbl_target
|
|
|
|
# Case III: insert overwrite
|
|
# impala does a insert overwrite of the tables.
|
|
impala_client.execute("insert overwrite table {0}.{1}"
|
|
" select * from functional.tinytable".format(
|
|
source_db, unpartitioned_tbl))
|
|
impala_client.execute("insert overwrite table {0}.{1} partition(year,month)"
|
|
" select * from functional_parquet.alltypessmall".format(
|
|
source_db, partitioned_tbl))
|
|
cls.run_stmt_in_hive("repl dump {0}".format(source_db))
|
|
# replicate the table from source to target
|
|
cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
|
|
target_db))
|
|
# we wait (20sec) until the events catch up in case repl command above did some HMS
|
|
# operations.
|
|
EventProcessorUtils.wait_for_event_processing(suite, timeout=20)
|
|
# confirm the number of rows in target match with the source table.
|
|
rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
|
|
.split('\t')[0])
|
|
rows_in_part_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from {0}.{1}".format(target_db, partitioned_tbl))
|
|
.split('\t')[0])
|
|
assert rows_in_unpart_tbl == rows_in_unpart_tbl_target
|
|
assert rows_in_part_tbl == rows_in_part_tbl_target
|
|
|
|
# Case IV: CTAS which creates a transactional table.
|
|
impala_client.execute(
|
|
"create table {0}.insertonly_nopart_ctas {1} as "
|
|
"select * from {0}.{2}".format(source_db, TBLPROPERTIES, unpartitioned_tbl))
|
|
impala_client.execute(
|
|
"create table {0}.insertonly_part_ctas partitioned by (year, month) {1}"
|
|
" as select * from {0}.{2}".format(source_db, TBLPROPERTIES, partitioned_tbl))
|
|
cls.run_stmt_in_hive("repl dump {0}".format(source_db))
|
|
# replicate the table from source to target
|
|
cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
|
|
target_db))
|
|
# confirm the number of rows in target match with the source table.
|
|
rows_in_unpart_tbl_source = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from "
|
|
"{0}.insertonly_nopart_ctas".format(source_db)).split('\t')[0])
|
|
rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from "
|
|
"{0}.insertonly_nopart_ctas".format(target_db)).split('\t')[0])
|
|
assert rows_in_unpart_tbl_source == rows_in_unpart_tbl_target
|
|
rows_in_unpart_tbl_source = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from "
|
|
"{0}.insertonly_part_ctas".format(source_db)).split('\t')[0])
|
|
rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from "
|
|
"{0}.insertonly_part_ctas".format(target_db)).split('\t')[0])
|
|
assert rows_in_unpart_tbl_source == rows_in_unpart_tbl_target
|
|
|
|
# Case V: truncate table
|
|
# impala truncates both the tables. Make sure replication sees that.
|
|
impala_client.execute("truncate table {0}.{1}".format(source_db,
|
|
unpartitioned_tbl))
|
|
impala_client.execute("truncate table {0}.{1}".format(source_db, partitioned_tbl))
|
|
cls.run_stmt_in_hive("repl dump {0}".format(source_db))
|
|
# replicate the table from source to target
|
|
cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
|
|
target_db))
|
|
# confirm the number of rows in target match with the source table.
|
|
rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
|
|
.split('\t')[0])
|
|
rows_in_part_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
|
"select count(*) from {0}.{1}".format(target_db, partitioned_tbl))
|
|
.split('\t')[0])
|
|
assert rows_in_unpart_tbl_target == 0
|
|
assert rows_in_part_tbl_target == 0
|
|
finally:
|
|
src_db = cls.__get_db_nothrow(source_db)
|
|
target_db_obj = cls.__get_db_nothrow(target_db)
|
|
if src_db is not None:
|
|
cls.run_stmt_in_hive(
|
|
"alter database {0} set dbproperties ('repl.source.for'='')".format(source_db))
|
|
cls.run_stmt_in_hive("drop database if exists {0} cascade"
|
|
.format(source_db))
|
|
if target_db_obj is not None:
|
|
cls.run_stmt_in_hive("drop database if exists {0} cascade"
|
|
.format(target_db))
|
|
# workaround for HIVE-24135. the managed db location doesn't get cleaned up
|
|
if src_db is not None and src_db.managedLocationUri is not None:
|
|
filesystem_client.delete_file_dir(src_db.managedLocationUri,
|
|
True)
|
|
if target_db_obj is not None and target_db_obj.managedLocationUri is not None:
|
|
filesystem_client.delete_file_dir(
|
|
target_db_obj.managedLocationUri, True)
|
|
impala_client.close()
|
|
|
|
@classmethod
|
|
def __get_db_nothrow(self, name):
|
|
try:
|
|
return self.hive_client.get_database(name)
|
|
except Exception:
|
|
return None
|
|
|
|
@classmethod
|
|
def _get_transactional_tblproperties(self, is_transactional):
|
|
"""
|
|
Util method to generate the tblproperties for transactional tables
|
|
"""
|
|
tblproperties = ""
|
|
if is_transactional:
|
|
tblproperties = "tblproperties ('transactional'='true'," \
|
|
"'transactional_properties'='insert_only')"
|
|
return tblproperties
|