mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
At present, metastore event processor is single threaded. Notification
events are processed sequentially with a maximum limit of 1000 events
fetched and processed in a single batch. Multiple locks are used to
address the concurrency issues that may arise when catalog DDL
operation processing and metastore event processing tries to
access/update the catalog objects concurrently. Waiting for a lock or
file metadata loading of a table can slow the event processing and can
affect the processing of other events following it. Those events may
not be dependent on the previous event. Altogether it takes a very
long time to synchronize all the HMS events.
Existing metastore event processing is turned into multi-level
event processing with enable_hierarchical_event_processing flag. It
is not enabled by default. Idea is to segregate the events based on
their dependency, maintain the order of events as they occur within
the dependency and process them independently as much as possible.
Following 3 main classes represents the three level threaded event
processing.
1. EventExecutorService
It provides the necessary methods to initialize, start, clear,
stop and process the metastore events processing in hierarchical
mode. It is instantiated from MetastoreEventsProcessor and its
methods are invoked from MetastoreEventsProcessor. Upon receiving
the event to process, EventExecutorService queues the event to
appropriate DbEventExecutor for processing.
2. DbEventExecutor
An instance of this class has an execution thread, manage events
of multiple databases with DbProcessors. An instance of DbProcessor
is maintained to store the context of each database within the
DbEventExecutor. On each scheduled execution, input events on
DbProcessor are segregated to appropriate TableProcessors for the
event processing and also process the database events that are
eligible for processing.
Once a DbEventExecutor is assigned to a database, a DbProcessor
is created. And the subsequent events belonging to the database
are queued to same DbEventExecutor thread for further processing.
Hence, linearizability is ensured in dealing with events within
the database. Each instance of DbEventExecutor has a fixed list
of TableEventExecutors.
3. TableEventExecutor
An instance of this class has an execution thread, processes
events of multiple tables with TableProcessors. An instance of
TableProcessor is maintained to store context of each table within
a TableEventExecutor. On each scheduled execution, events from
TableProcessors are processed.
Once a TableEventExecutor is assigned to table, a TableProcessor
is created. And the subsequent table events are processed by same
TableEventExecutor thread. Hence, linearizability is guaranteed
in processing events of a particular table.
- All the events of a table are processed in the same order they
have occurred.
- Events of different tables are processed in parallel when those
tables are assigned to different TableEventExecutors.
Following new events are added:
1. DbBarrierEvent
This event wraps a database event. It is used to synchronize all
the TableProcessors belonging to database before processing the
database event. It acts as a barrier to restrict the processing
of table events that occurred after the database event until the
database event is processed on DbProcessor.
2. RenameTableBarrierEvent
This event wraps an alter table event for rename. It is used to
synchronize the source and target TableProcessors to
process the rename table event. It ensures the source
TableProcessor removes the table first and then allows the target
TableProcessor to create the renamed table.
3. PseudoCommitTxnEvent and PseudoAbortTxnEvent
CommitTxnEvent and AbortTxnEvent can involve multiple tables in
a transaction and processing these events modifies multiple table
objects. Pseudo events are introduced such that a pseudo event is
created for each table involved in the transaction and these
pseudo events are processed independently at respective
TableProcessors.
Following new flags are introduced:
1. enable_hierarchical_event_processing
To enable the hierarchical event processing on catalogd.
2. num_db_event_executors
To set the number of database level event executors.
3. num_table_event_executors_per_db_event_executor
To set the number of table level event executors within a
database event executor.
4. min_event_processor_idle_ms
To set the minimum time to retain idle db processors and table
processors on the database event executors and table event
executors respectively, when they do not have events to process.
5. max_outstanding_events_on_executors
To set the limit of maximum outstanding events to process on
event executors.
Changed hms_event_polling_interval_s type from int to double to support
millisecond precision interval
TODOs:
1. We need to redefine the lag in the hierarchical processing mode.
2. Need to have a mechanism to capture the actual event processing time
in hierarchical processing mode. Currently, with
enable_hierarchical_event_processing as true, lastSyncedEventId_ and
lastSyncedEventTimeSecs_ are updated upon event dispatch to
EventExecutorService for processing on respective DbEventExecutor
and/or TableEventExecutor. So lastSyncedEventId_ and
lastSyncedEventTimeSecs_ doesn't actually mean events are processed.
3. Hierarchical processing mode currently have a mechanism to show the
total number of outstanding events on all the db and table executors
at the moment. Need to enhance observability further with this mode.
Filed a jira[IMPALA-13801] to fix them.
Testing:
- Executed existing end to end tests.
- Added fe and end-to-end tests with enable_hierarchical_event_processing.
- Added event processing performance tests.
- Have executed the existing tests with hierarchical processing
mode enabled. lastSyncedEventId_ is now used in the new feature of
sync_hms_events_wait_time_s (IMPALA-12152) as well. Some tests fail when
hierarchical processing mode is enabled because lastSyncedEventId_ do
not actually mean event is processed in this mode. This need to be
fixed/verified with above jira[IMPALA-13801].
Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6
Reviewed-on: http://gerrit.cloudera.org:8080/21031
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
507 lines
22 KiB
Python
507 lines
22 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
from __future__ import absolute_import, division, print_function
|
|
from builtins import range
|
|
import logging
|
|
import multiprocessing.pool
|
|
import pytest
|
|
import re
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.common.skip import SkipIfFS
|
|
from tests.common.test_vector import HS2
|
|
from tests.util.event_processor_utils import EventProcessorUtils
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
LOG.setLevel(level=logging.INFO)
|
|
|
|
|
|
@SkipIfFS.hive
|
|
class TestEventProcessingPerf(CustomClusterTestSuite):
|
|
"""This class contains tests to measure the event processing time on catalogd.
|
|
Measures performance for various operations with queries executed from hive and impala
|
|
clients."""
|
|
# Below parameters are set to lower values so that tests are run faster. Need to
|
|
# increase values and run the tests manually to measure the performance.
|
|
db_count = 3
|
|
table_count = 3
|
|
partition_count = 100
|
|
insert_nonpartition_values_count = 100
|
|
insert_nonpartition_repeat_count = 2
|
|
# process_events_together flag indicates whether to process events of all operations
|
|
# together or not. Except create/drop databases and tables, remaining operations are
|
|
# processed together when set to true. If it is set to false, each type of operation
|
|
# (such as add partitions, insert into partitions, insert into table, refresh
|
|
# partitions, refresh tables, compute stats etc) are processed separately to get the
|
|
# time taken to process each type of operation.
|
|
process_events_together = False
|
|
db_prefix = "perf_db"
|
|
table_prefix = "perf_table"
|
|
stage_table = "perf_stage_table"
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
if cls.exploration_strategy() != 'exhaustive':
|
|
pytest.skip('runs only in exhaustive')
|
|
super(TestEventProcessingPerf, cls).setup_class()
|
|
|
|
def setup_method(self, method):
|
|
super(TestEventProcessingPerf, self).setup_method(method)
|
|
self.__cleanup()
|
|
|
|
def teardown_method(self, method):
|
|
self.__cleanup()
|
|
super(TestEventProcessingPerf, self).teardown_method(method)
|
|
|
|
def __cleanup(self):
|
|
self.__drop_databases()
|
|
self.client.execute("drop table if exists {}".format(self.stage_table))
|
|
self.__ensure_events_processed()
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true")
|
|
def test_perf_hive_external_non_part_table(self):
|
|
self.__run_event_processing_tests("test_perf_hive_external_non_part_table", True,
|
|
False, False)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true "
|
|
"--min_event_processor_idle_ms=600000 "
|
|
"--enable_hierarchical_event_processing=true")
|
|
def test_perf_hive_external_non_part_table_hierarchical(self):
|
|
self.__run_event_processing_tests(
|
|
"test_perf_hive_external_non_part_table_hierarchical", True, False, False)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true")
|
|
def test_perf_hive_external_part_table(self):
|
|
self.__run_event_processing_tests("test_perf_hive_external_part_table", True,
|
|
False, True)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true "
|
|
"--min_event_processor_idle_ms=600000 "
|
|
"--enable_hierarchical_event_processing=true")
|
|
def test_perf_hive_external_part_table_hierarchical(self):
|
|
self.__run_event_processing_tests(
|
|
"test_perf_hive_external_part_table_hierarchical", True, False, True)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true")
|
|
def test_perf_hive_transact_non_part_table(self):
|
|
self.__run_event_processing_tests("test_perf_hive_transact_non_part_table", True,
|
|
True, False)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true "
|
|
"--min_event_processor_idle_ms=600000 "
|
|
"--enable_hierarchical_event_processing=true")
|
|
def test_perf_hive_transact_non_part_table_hierarchical(self):
|
|
self.__run_event_processing_tests(
|
|
"test_perf_hive_transact_non_part_table_hierarchical", True, True, False)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true")
|
|
def test_perf_hive_transact_part_table(self):
|
|
self.__run_event_processing_tests("test_perf_hive_transact_part_table", True, True,
|
|
True)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true "
|
|
"--min_event_processor_idle_ms=600000 "
|
|
"--enable_hierarchical_event_processing=true")
|
|
def test_perf_hive_transact_part_table_hierarchical(self):
|
|
self.__run_event_processing_tests("test_perf_hive_transact_part_table_hierarchical",
|
|
True, True, True)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true")
|
|
def test_perf_impala_external_non_part_table(self):
|
|
self.__run_event_processing_tests("test_perf_impala_external_non_part_table", False,
|
|
False, False)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true "
|
|
"--enable_hierarchical_event_processing=true")
|
|
def test_perf_impala_external_non_part_table_hierarchical(self):
|
|
self.__run_event_processing_tests(
|
|
"test_perf_impala_external_non_part_table_hierarchical", False, False, False)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true")
|
|
def test_perf_impala_external_part_table(self):
|
|
self.__run_event_processing_tests("test_perf_impala_external_part_table", False,
|
|
False, True)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true "
|
|
"--enable_hierarchical_event_processing=true")
|
|
def test_perf_impala_external_part_table_hierarchical(self):
|
|
self.__run_event_processing_tests("test_perf_impala_external_part_table_hierarchical",
|
|
False, False, True)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true")
|
|
def test_perf_impala_transact_non_part_table(self):
|
|
self.__run_event_processing_tests("test_perf_impala_transact_non_part_table", False,
|
|
True, False)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true "
|
|
"--enable_hierarchical_event_processing=true")
|
|
def test_perf_impala_transact_non_part_table_hierarchical(self):
|
|
self.__run_event_processing_tests(
|
|
"test_perf_impala_transact_non_part_table_hierarchical", False, True, False)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true")
|
|
def test_perf_impala_transact_part_table(self):
|
|
self.__run_event_processing_tests("test_perf_impala_transact_part_table", False,
|
|
True, True)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
cluster_size=1,
|
|
impalad_args="--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
|
|
"--enable_reload_events=true "
|
|
"--enable_hierarchical_event_processing=true")
|
|
def test_perf_impala_transact_part_table_hierarchical(self):
|
|
self.__run_event_processing_tests(
|
|
"test_perf_impala_transact_part_table_hierarchical", False, True, True)
|
|
|
|
def __run_event_processing_tests(self, case, is_hive, is_transactional, is_partitioned):
|
|
"""Method to measure time taken to create databases, tables(external if
|
|
is_transactional is false otherwise transactional), partitions(if is_partitioned is
|
|
true), insert data into tables, refresh partitions and tables, compute table stats
|
|
and drop tables and databases in the end. is_hive is used to control all the queries
|
|
to be executed on hive or impala."""
|
|
LOG.info("Test: %s::%s" % (type(self).__name__, case))
|
|
refresh_table_format = "refresh {}.{}"
|
|
refresh_partition_format = refresh_table_format + " partition (j='{}')"
|
|
stats = "analyze table {}.{} compute statistics for columns"
|
|
dyn_part_cfg = "set hive.exec.dynamic.partition.mode=nonstrict;"
|
|
dyn_part_cfg += "set hive.exec.max.dynamic.partitions={};" \
|
|
.format(self.partition_count)
|
|
dyn_part_cfg += "set hive.exec.max.dynamic.partitions.pernode={};" \
|
|
.format(self.partition_count)
|
|
if not is_hive:
|
|
stats = "compute stats {}.{}"
|
|
dyn_part_cfg = ""
|
|
|
|
create_table_query = " ".join(["create", "external" if not is_transactional else '',
|
|
"table `{}`.`{}` (i int)",
|
|
" partitioned by (j string) " if is_partitioned else '',
|
|
self.__get_transactional_tblproperties(is_transactional)])
|
|
|
|
test_self = self
|
|
|
|
class ThreadLocalClient(threading.local):
|
|
def __init__(self, is_hive):
|
|
# called for main thread and each thread in the pool
|
|
self.is_hive = is_hive
|
|
self.name = threading.currentThread().name
|
|
LOG.info("Initializing for thread %s", self.name)
|
|
|
|
def __del__(self):
|
|
# Invoked only for main thread
|
|
LOG.info("Deleting for thread %s", self.name)
|
|
|
|
def __enter__(self):
|
|
# Invoked only for main thread
|
|
LOG.info("Entering for thread %s", self.name)
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
# Invoked only for main thread
|
|
LOG.info("Exiting for thread %s", self.name)
|
|
if exc_type is not None:
|
|
traceback.print_exception(exc_type, exc_val, exc_tb)
|
|
|
|
def run_query(self, query, run_with_impala=None):
|
|
# Creating and closing impala_client for each query. There is no good way to close
|
|
# the impala_client connection for each ThreadLocalClient instance if we maintain
|
|
# at instance level. Only __init__ is called for each ThreadLocalClient instance.
|
|
# Establishing and closing connections are faster. So it should be ok to create
|
|
# and close connection for each query.
|
|
is_hive = self.is_hive
|
|
if run_with_impala:
|
|
# Overriding is_hive for some queries like describe formatted to load tables on
|
|
# impala side
|
|
is_hive = False
|
|
|
|
if is_hive:
|
|
with test_self.create_impala_client(host_port=pytest.config.option.hive_server2,
|
|
protocol=HS2,
|
|
is_hive=is_hive) as hive_client:
|
|
for query in query.split(';'):
|
|
hive_client.execute(query)
|
|
else:
|
|
with test_self.create_impala_client() as impala_client:
|
|
try:
|
|
handle = impala_client.execute_async(query)
|
|
is_finished = impala_client.wait_for_finished_timeout(handle, timeout=60)
|
|
assert is_finished, "Query timeout(60s): " + query
|
|
impala_client.close_query(handle)
|
|
except IMPALA_CONNECTION_EXCEPTION as e:
|
|
LOG.debug(e)
|
|
|
|
pool = multiprocessing.pool.ThreadPool(processes=8)
|
|
with ThreadLocalClient(is_hive) as tls:
|
|
dbs = []
|
|
for iter in range(self.db_count):
|
|
dbs.append(self.db_prefix + str(iter))
|
|
|
|
tables = []
|
|
for iter1 in range(self.table_count):
|
|
table_name = self.table_prefix + str(iter1)
|
|
for iter2 in range(self.db_count):
|
|
tables.append((self.db_prefix + str(iter2), table_name))
|
|
|
|
def create_database(dbname):
|
|
tls.run_query("create database {}".format(dbname))
|
|
|
|
def create_table(table_name_tuple):
|
|
tls.run_query(create_table_query.format(table_name_tuple[0], table_name_tuple[1]))
|
|
|
|
start_event_id = self.__pause_event_processing()
|
|
pool.map_async(create_database, dbs).get()
|
|
pool.map_async(create_table, tables).get()
|
|
create_db_table_time = self.__process_events_now(start_event_id)
|
|
|
|
def load_table(table_name_tuple):
|
|
tls.run_query("describe formatted {}.{}"
|
|
.format(table_name_tuple[0], table_name_tuple[1]), True)
|
|
|
|
pool.map_async(load_table, tables).get()
|
|
|
|
add_part_time = None
|
|
insert_into_part_time = None
|
|
insert_time = None
|
|
if is_partitioned:
|
|
# Stage table to create dynamic partitions
|
|
self.client.execute("create table {} (i int, j string)".format(self.stage_table))
|
|
insert_query = "insert into {} values {}"
|
|
values = ",".join([("(" + str(item) + ",'" + str(item) + "')")
|
|
for item in range(self.partition_count)])
|
|
self.client.execute(insert_query.format(self.stage_table, values))
|
|
self.__process_events_now(start_event_id)
|
|
|
|
# Create dynamic partitions
|
|
def add_or_insert_into_partitions(table_name_tuple):
|
|
tls.run_query("{} insert into {}.{} partition(j) select * from {}"
|
|
.format(dyn_part_cfg, table_name_tuple[0], table_name_tuple[1],
|
|
self.stage_table))
|
|
|
|
start_event_id = self.__pause_event_processing()
|
|
pool.map_async(add_or_insert_into_partitions, tables).get()
|
|
add_part_time = self.__process_events(start_event_id)
|
|
|
|
# Insert into existing partitions
|
|
start_event_id = self.__pause_event_processing()
|
|
pool.map_async(add_or_insert_into_partitions, tables).get()
|
|
insert_into_part_time = self.__process_events(start_event_id)
|
|
else:
|
|
repeat_insert_into_tables = []
|
|
for i in range(self.insert_nonpartition_repeat_count):
|
|
for table_name_tuple in tables:
|
|
repeat_insert_into_tables.append(table_name_tuple)
|
|
|
|
insert_query = "insert into {}.{} values {}"
|
|
values = ",".join([("(" + str(item) + ")")
|
|
for item in range(self.insert_nonpartition_values_count)])
|
|
|
|
def insert_into_table(table_name_tuple):
|
|
tls.run_query(insert_query
|
|
.format(table_name_tuple[0], table_name_tuple[1], values))
|
|
|
|
start_event_id = self.__pause_event_processing()
|
|
pool.map_async(insert_into_table, repeat_insert_into_tables).get()
|
|
insert_time = self.__process_events(start_event_id)
|
|
|
|
# Refresh
|
|
refresh_part_time = None
|
|
refresh_table_time = None
|
|
if not is_hive:
|
|
# Refresh partitions
|
|
if is_partitioned and not is_transactional:
|
|
partitions = []
|
|
for iter1 in range(self.table_count):
|
|
table_name = self.table_prefix + str(iter1)
|
|
for iter2 in range(self.db_count):
|
|
for iter3 in range(self.partition_count):
|
|
partitions.append((self.db_prefix + str(iter2), table_name, str(iter3)))
|
|
|
|
def refresh_partition(table_name_tuple):
|
|
tls.run_query(refresh_partition_format
|
|
.format(table_name_tuple[0], table_name_tuple[1],
|
|
table_name_tuple[2]))
|
|
|
|
start_event_id = self.__pause_event_processing()
|
|
pool.map_async(refresh_partition, partitions).get()
|
|
refresh_part_time = self.__process_events(start_event_id)
|
|
|
|
# Refresh tables
|
|
def refresh_table(table_name_tuple):
|
|
tls.run_query(refresh_table_format
|
|
.format(table_name_tuple[0], table_name_tuple[1]))
|
|
|
|
start_event_id = self.__pause_event_processing()
|
|
pool.map_async(refresh_table, tables).get()
|
|
refresh_table_time = self.__process_events(start_event_id)
|
|
|
|
# compute statistics
|
|
def compute_stats(table_name_tuple):
|
|
tls.run_query(stats.format(table_name_tuple[0], table_name_tuple[1]))
|
|
|
|
start_event_id = self.__pause_event_processing()
|
|
pool.map_async(compute_stats, tables).get()
|
|
compute_stats_time = self.__process_events(start_event_id)
|
|
|
|
total_time = None
|
|
if self.process_events_together:
|
|
total_time = self.__process_events_now(start_event_id)
|
|
|
|
def drop_database(dbname):
|
|
tls.run_query("drop database if exists {} cascade".format(dbname))
|
|
|
|
start_event_id = self.__pause_event_processing()
|
|
pool.map_async(drop_database, dbs).get()
|
|
drop_db_table_time = self.__process_events_now(start_event_id)
|
|
|
|
pool.terminate()
|
|
|
|
LOG.info("[Performance] Create database and table: Event count: {}, Time taken: {} s"
|
|
.format(create_db_table_time[0], create_db_table_time[1]))
|
|
if add_part_time is not None:
|
|
LOG.info("[Performance] Add partition: Event count: {}, Time taken: {} s"
|
|
.format(add_part_time[0], add_part_time[1]))
|
|
LOG.info("[Performance] Insert into partition: Event count: {}, Time taken: {} s"
|
|
.format(insert_into_part_time[0], insert_into_part_time[1]))
|
|
if insert_time is not None:
|
|
LOG.info("[Performance] Insert: Event count: {}, Time taken: {} s"
|
|
.format(insert_time[0], insert_time[1]))
|
|
if refresh_part_time is not None:
|
|
LOG.info("[Performance] Refresh partition: Event count: {}, Time taken: {} s"
|
|
.format(refresh_part_time[0], refresh_part_time[1]))
|
|
if refresh_table_time is not None:
|
|
LOG.info("[Performance] Refresh table: Event count: {}, Time taken: {} s"
|
|
.format(refresh_table_time[0], refresh_table_time[1]))
|
|
if compute_stats_time is not None:
|
|
LOG.info("[Performance] Compute statistics: Event count: {}, Time taken: {} s"
|
|
.format(compute_stats_time[0], compute_stats_time[1]))
|
|
if total_time is not None:
|
|
LOG.info("[Performance] Processed together: Event count: {}, Time taken: {} s"
|
|
.format(total_time[0], total_time[1]))
|
|
LOG.info("[Performance] Drop table and database: Event count: {}, Time taken: {} s"
|
|
.format(drop_db_table_time[0], drop_db_table_time[1]))
|
|
|
|
def __drop_databases(self):
|
|
for iter in range(self.db_count):
|
|
self.client.execute("drop database if exists {} cascade"
|
|
.format(self.db_prefix + str(iter)))
|
|
|
|
def __process_events_now(self, start_event_id):
|
|
end_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
|
|
start_time = time.time()
|
|
self.__ensure_events_processed()
|
|
return end_event_id - start_event_id, time.time() - start_time
|
|
|
|
def __process_events(self, start_event_id):
|
|
if not self.process_events_together:
|
|
return self.__process_events_now(start_event_id)
|
|
|
|
def __ensure_events_processed(self):
|
|
self.client.execute(":event_processor('start')")
|
|
EventProcessorUtils.wait_for_event_processing(self, 100)
|
|
|
|
def __pause_event_processing(self):
|
|
output = self.client.execute(":event_processor('pause')").get_data()
|
|
lastSyncedEventId = re.search(r"LastSyncedEventId:\s*(\d+)", output)
|
|
if lastSyncedEventId:
|
|
return int(lastSyncedEventId.group(1))
|
|
else:
|
|
return EventProcessorUtils.get_current_notification_id(self.hive_client)
|
|
|
|
def __get_transactional_tblproperties(self, is_transactional):
|
|
"""Get the tblproperties for transactional tables"""
|
|
return "tblproperties ('transactional'='true'," \
|
|
"'transactional_properties'='insert_only')" if is_transactional else ""
|