Files
impala/tests/custom_cluster/test_event_processing_perf.py
Venu Reddy 5db760662f IMPALA-12709: Add support for hierarchical metastore event processing
At present, metastore event processor is single threaded. Notification
events are processed sequentially with a maximum limit of 1000 events
fetched and processed in a single batch. Multiple locks are used to
address the concurrency issues that may arise when catalog DDL
operation processing and metastore event processing tries to
access/update the catalog objects concurrently. Waiting for a lock or
file metadata loading of a table can slow the event processing and can
affect the processing of other events following it. Those events may
not be dependent on the previous event. Altogether it takes a very
long time to synchronize all the HMS events.

Existing metastore event processing is turned into multi-level
event processing with enable_hierarchical_event_processing flag. It
is not enabled by default. Idea is to segregate the events based on
their dependency, maintain the order of events as they occur within
the dependency and process them independently as much as possible.
Following 3 main classes represents the three level threaded event
processing.
1. EventExecutorService
   It provides the necessary methods to initialize, start, clear,
   stop and process the metastore events processing in hierarchical
   mode. It is instantiated from MetastoreEventsProcessor and its
   methods are invoked from MetastoreEventsProcessor. Upon receiving
   the event to process, EventExecutorService queues the event to
   appropriate DbEventExecutor for processing.
2. DbEventExecutor
   An instance of this class has an execution thread, manage events
   of multiple databases with DbProcessors. An instance of DbProcessor
   is maintained to store the context of each database within the
   DbEventExecutor. On each scheduled execution, input events on
   DbProcessor are segregated to appropriate TableProcessors for the
   event processing and also process the database events that are
   eligible for processing.
   Once a DbEventExecutor is assigned to a database, a DbProcessor
   is created. And the subsequent events belonging to the database
   are queued to same DbEventExecutor thread for further processing.
   Hence, linearizability is ensured in dealing with events within
   the database. Each instance of DbEventExecutor has a fixed list
   of TableEventExecutors.
3. TableEventExecutor
   An instance of this class has an execution thread, processes
   events of multiple tables with TableProcessors. An instance of
   TableProcessor is maintained to store context of each table within
   a TableEventExecutor. On each scheduled execution, events from
   TableProcessors are processed.
   Once a TableEventExecutor is assigned to table, a TableProcessor
   is created. And the subsequent table events are processed by same
   TableEventExecutor thread. Hence, linearizability is guaranteed
   in processing events of a particular table.
   - All the events of a table are processed in the same order they
     have occurred.
   - Events of different tables are processed in parallel when those
     tables are assigned to different TableEventExecutors.

Following new events are added:
1. DbBarrierEvent
   This event wraps a database event. It is used to synchronize all
   the TableProcessors belonging to database before processing the
   database event. It acts as a barrier to restrict the processing
   of table events that occurred after the database event until the
   database event is processed on DbProcessor.
2. RenameTableBarrierEvent
   This event wraps an alter table event for rename. It is used to
   synchronize the source and target TableProcessors to
   process the rename table event. It ensures the source
   TableProcessor removes the table first and then allows the target
   TableProcessor to create the renamed table.
3. PseudoCommitTxnEvent and PseudoAbortTxnEvent
   CommitTxnEvent and AbortTxnEvent can involve multiple tables in
   a transaction and processing these events modifies multiple table
   objects. Pseudo events are introduced such that a pseudo event is
   created for each table involved in the transaction and these
   pseudo events are processed independently at respective
   TableProcessors.

Following new flags are introduced:
1. enable_hierarchical_event_processing
   To enable the hierarchical event processing on catalogd.
2. num_db_event_executors
   To set the number of database level event executors.
3. num_table_event_executors_per_db_event_executor
   To set the number of table level event executors within a
   database event executor.
4. min_event_processor_idle_ms
   To set the minimum time to retain idle db processors and table
   processors on the database event executors and table event
   executors respectively, when they do not have events to process.
5. max_outstanding_events_on_executors
   To set the limit of maximum outstanding events to process on
   event executors.

Changed hms_event_polling_interval_s type from int to double to support
millisecond precision interval

TODOs:
1. We need to redefine the lag in the hierarchical processing mode.
2. Need to have a mechanism to capture the actual event processing time
   in hierarchical processing mode. Currently, with
   enable_hierarchical_event_processing as true, lastSyncedEventId_ and
   lastSyncedEventTimeSecs_ are updated upon event dispatch to
   EventExecutorService for processing on respective DbEventExecutor
   and/or TableEventExecutor. So lastSyncedEventId_ and
   lastSyncedEventTimeSecs_ doesn't actually mean events are processed.
3. Hierarchical processing mode currently have a mechanism to show the
   total number of outstanding events on all the db and table executors
   at the moment. Need to enhance observability further with this mode.
Filed a jira[IMPALA-13801] to fix them.

Testing:
 - Executed existing end to end tests.
 - Added fe and end-to-end tests with enable_hierarchical_event_processing.
 - Added event processing performance tests.
 - Have executed the existing tests with hierarchical processing
   mode enabled. lastSyncedEventId_ is now used in the new feature of
   sync_hms_events_wait_time_s (IMPALA-12152) as well. Some tests fail when
   hierarchical processing mode is enabled because lastSyncedEventId_ do
   not actually mean event is processed in this mode. This need to be
   fixed/verified with above jira[IMPALA-13801].

Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6
Reviewed-on: http://gerrit.cloudera.org:8080/21031
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-04-30 11:51:03 +00:00

507 lines
22 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from builtins import range
import logging
import multiprocessing.pool
import pytest
import re
import threading
import time
import traceback
from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfFS
from tests.common.test_vector import HS2
from tests.util.event_processor_utils import EventProcessorUtils
LOG = logging.getLogger(__name__)
LOG.setLevel(level=logging.INFO)
@SkipIfFS.hive
class TestEventProcessingPerf(CustomClusterTestSuite):
"""This class contains tests to measure the event processing time on catalogd.
Measures performance for various operations with queries executed from hive and impala
clients."""
# Below parameters are set to lower values so that tests are run faster. Need to
# increase values and run the tests manually to measure the performance.
db_count = 3
table_count = 3
partition_count = 100
insert_nonpartition_values_count = 100
insert_nonpartition_repeat_count = 2
# process_events_together flag indicates whether to process events of all operations
# together or not. Except create/drop databases and tables, remaining operations are
# processed together when set to true. If it is set to false, each type of operation
# (such as add partitions, insert into partitions, insert into table, refresh
# partitions, refresh tables, compute stats etc) are processed separately to get the
# time taken to process each type of operation.
process_events_together = False
db_prefix = "perf_db"
table_prefix = "perf_table"
stage_table = "perf_stage_table"
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestEventProcessingPerf, cls).setup_class()
def setup_method(self, method):
super(TestEventProcessingPerf, self).setup_method(method)
self.__cleanup()
def teardown_method(self, method):
self.__cleanup()
super(TestEventProcessingPerf, self).teardown_method(method)
def __cleanup(self):
self.__drop_databases()
self.client.execute("drop table if exists {}".format(self.stage_table))
self.__ensure_events_processed()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_hive_external_non_part_table(self):
self.__run_event_processing_tests("test_perf_hive_external_non_part_table", True,
False, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--min_event_processor_idle_ms=600000 "
"--enable_hierarchical_event_processing=true")
def test_perf_hive_external_non_part_table_hierarchical(self):
self.__run_event_processing_tests(
"test_perf_hive_external_non_part_table_hierarchical", True, False, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_hive_external_part_table(self):
self.__run_event_processing_tests("test_perf_hive_external_part_table", True,
False, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--min_event_processor_idle_ms=600000 "
"--enable_hierarchical_event_processing=true")
def test_perf_hive_external_part_table_hierarchical(self):
self.__run_event_processing_tests(
"test_perf_hive_external_part_table_hierarchical", True, False, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_hive_transact_non_part_table(self):
self.__run_event_processing_tests("test_perf_hive_transact_non_part_table", True,
True, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--min_event_processor_idle_ms=600000 "
"--enable_hierarchical_event_processing=true")
def test_perf_hive_transact_non_part_table_hierarchical(self):
self.__run_event_processing_tests(
"test_perf_hive_transact_non_part_table_hierarchical", True, True, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_hive_transact_part_table(self):
self.__run_event_processing_tests("test_perf_hive_transact_part_table", True, True,
True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--min_event_processor_idle_ms=600000 "
"--enable_hierarchical_event_processing=true")
def test_perf_hive_transact_part_table_hierarchical(self):
self.__run_event_processing_tests("test_perf_hive_transact_part_table_hierarchical",
True, True, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_impala_external_non_part_table(self):
self.__run_event_processing_tests("test_perf_impala_external_non_part_table", False,
False, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--enable_hierarchical_event_processing=true")
def test_perf_impala_external_non_part_table_hierarchical(self):
self.__run_event_processing_tests(
"test_perf_impala_external_non_part_table_hierarchical", False, False, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_impala_external_part_table(self):
self.__run_event_processing_tests("test_perf_impala_external_part_table", False,
False, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--enable_hierarchical_event_processing=true")
def test_perf_impala_external_part_table_hierarchical(self):
self.__run_event_processing_tests("test_perf_impala_external_part_table_hierarchical",
False, False, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_impala_transact_non_part_table(self):
self.__run_event_processing_tests("test_perf_impala_transact_non_part_table", False,
True, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--enable_hierarchical_event_processing=true")
def test_perf_impala_transact_non_part_table_hierarchical(self):
self.__run_event_processing_tests(
"test_perf_impala_transact_non_part_table_hierarchical", False, True, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_impala_transact_part_table(self):
self.__run_event_processing_tests("test_perf_impala_transact_part_table", False,
True, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--enable_hierarchical_event_processing=true")
def test_perf_impala_transact_part_table_hierarchical(self):
self.__run_event_processing_tests(
"test_perf_impala_transact_part_table_hierarchical", False, True, True)
def __run_event_processing_tests(self, case, is_hive, is_transactional, is_partitioned):
"""Method to measure time taken to create databases, tables(external if
is_transactional is false otherwise transactional), partitions(if is_partitioned is
true), insert data into tables, refresh partitions and tables, compute table stats
and drop tables and databases in the end. is_hive is used to control all the queries
to be executed on hive or impala."""
LOG.info("Test: %s::%s" % (type(self).__name__, case))
refresh_table_format = "refresh {}.{}"
refresh_partition_format = refresh_table_format + " partition (j='{}')"
stats = "analyze table {}.{} compute statistics for columns"
dyn_part_cfg = "set hive.exec.dynamic.partition.mode=nonstrict;"
dyn_part_cfg += "set hive.exec.max.dynamic.partitions={};" \
.format(self.partition_count)
dyn_part_cfg += "set hive.exec.max.dynamic.partitions.pernode={};" \
.format(self.partition_count)
if not is_hive:
stats = "compute stats {}.{}"
dyn_part_cfg = ""
create_table_query = " ".join(["create", "external" if not is_transactional else '',
"table `{}`.`{}` (i int)",
" partitioned by (j string) " if is_partitioned else '',
self.__get_transactional_tblproperties(is_transactional)])
test_self = self
class ThreadLocalClient(threading.local):
def __init__(self, is_hive):
# called for main thread and each thread in the pool
self.is_hive = is_hive
self.name = threading.currentThread().name
LOG.info("Initializing for thread %s", self.name)
def __del__(self):
# Invoked only for main thread
LOG.info("Deleting for thread %s", self.name)
def __enter__(self):
# Invoked only for main thread
LOG.info("Entering for thread %s", self.name)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Invoked only for main thread
LOG.info("Exiting for thread %s", self.name)
if exc_type is not None:
traceback.print_exception(exc_type, exc_val, exc_tb)
def run_query(self, query, run_with_impala=None):
# Creating and closing impala_client for each query. There is no good way to close
# the impala_client connection for each ThreadLocalClient instance if we maintain
# at instance level. Only __init__ is called for each ThreadLocalClient instance.
# Establishing and closing connections are faster. So it should be ok to create
# and close connection for each query.
is_hive = self.is_hive
if run_with_impala:
# Overriding is_hive for some queries like describe formatted to load tables on
# impala side
is_hive = False
if is_hive:
with test_self.create_impala_client(host_port=pytest.config.option.hive_server2,
protocol=HS2,
is_hive=is_hive) as hive_client:
for query in query.split(';'):
hive_client.execute(query)
else:
with test_self.create_impala_client() as impala_client:
try:
handle = impala_client.execute_async(query)
is_finished = impala_client.wait_for_finished_timeout(handle, timeout=60)
assert is_finished, "Query timeout(60s): " + query
impala_client.close_query(handle)
except IMPALA_CONNECTION_EXCEPTION as e:
LOG.debug(e)
pool = multiprocessing.pool.ThreadPool(processes=8)
with ThreadLocalClient(is_hive) as tls:
dbs = []
for iter in range(self.db_count):
dbs.append(self.db_prefix + str(iter))
tables = []
for iter1 in range(self.table_count):
table_name = self.table_prefix + str(iter1)
for iter2 in range(self.db_count):
tables.append((self.db_prefix + str(iter2), table_name))
def create_database(dbname):
tls.run_query("create database {}".format(dbname))
def create_table(table_name_tuple):
tls.run_query(create_table_query.format(table_name_tuple[0], table_name_tuple[1]))
start_event_id = self.__pause_event_processing()
pool.map_async(create_database, dbs).get()
pool.map_async(create_table, tables).get()
create_db_table_time = self.__process_events_now(start_event_id)
def load_table(table_name_tuple):
tls.run_query("describe formatted {}.{}"
.format(table_name_tuple[0], table_name_tuple[1]), True)
pool.map_async(load_table, tables).get()
add_part_time = None
insert_into_part_time = None
insert_time = None
if is_partitioned:
# Stage table to create dynamic partitions
self.client.execute("create table {} (i int, j string)".format(self.stage_table))
insert_query = "insert into {} values {}"
values = ",".join([("(" + str(item) + ",'" + str(item) + "')")
for item in range(self.partition_count)])
self.client.execute(insert_query.format(self.stage_table, values))
self.__process_events_now(start_event_id)
# Create dynamic partitions
def add_or_insert_into_partitions(table_name_tuple):
tls.run_query("{} insert into {}.{} partition(j) select * from {}"
.format(dyn_part_cfg, table_name_tuple[0], table_name_tuple[1],
self.stage_table))
start_event_id = self.__pause_event_processing()
pool.map_async(add_or_insert_into_partitions, tables).get()
add_part_time = self.__process_events(start_event_id)
# Insert into existing partitions
start_event_id = self.__pause_event_processing()
pool.map_async(add_or_insert_into_partitions, tables).get()
insert_into_part_time = self.__process_events(start_event_id)
else:
repeat_insert_into_tables = []
for i in range(self.insert_nonpartition_repeat_count):
for table_name_tuple in tables:
repeat_insert_into_tables.append(table_name_tuple)
insert_query = "insert into {}.{} values {}"
values = ",".join([("(" + str(item) + ")")
for item in range(self.insert_nonpartition_values_count)])
def insert_into_table(table_name_tuple):
tls.run_query(insert_query
.format(table_name_tuple[0], table_name_tuple[1], values))
start_event_id = self.__pause_event_processing()
pool.map_async(insert_into_table, repeat_insert_into_tables).get()
insert_time = self.__process_events(start_event_id)
# Refresh
refresh_part_time = None
refresh_table_time = None
if not is_hive:
# Refresh partitions
if is_partitioned and not is_transactional:
partitions = []
for iter1 in range(self.table_count):
table_name = self.table_prefix + str(iter1)
for iter2 in range(self.db_count):
for iter3 in range(self.partition_count):
partitions.append((self.db_prefix + str(iter2), table_name, str(iter3)))
def refresh_partition(table_name_tuple):
tls.run_query(refresh_partition_format
.format(table_name_tuple[0], table_name_tuple[1],
table_name_tuple[2]))
start_event_id = self.__pause_event_processing()
pool.map_async(refresh_partition, partitions).get()
refresh_part_time = self.__process_events(start_event_id)
# Refresh tables
def refresh_table(table_name_tuple):
tls.run_query(refresh_table_format
.format(table_name_tuple[0], table_name_tuple[1]))
start_event_id = self.__pause_event_processing()
pool.map_async(refresh_table, tables).get()
refresh_table_time = self.__process_events(start_event_id)
# compute statistics
def compute_stats(table_name_tuple):
tls.run_query(stats.format(table_name_tuple[0], table_name_tuple[1]))
start_event_id = self.__pause_event_processing()
pool.map_async(compute_stats, tables).get()
compute_stats_time = self.__process_events(start_event_id)
total_time = None
if self.process_events_together:
total_time = self.__process_events_now(start_event_id)
def drop_database(dbname):
tls.run_query("drop database if exists {} cascade".format(dbname))
start_event_id = self.__pause_event_processing()
pool.map_async(drop_database, dbs).get()
drop_db_table_time = self.__process_events_now(start_event_id)
pool.terminate()
LOG.info("[Performance] Create database and table: Event count: {}, Time taken: {} s"
.format(create_db_table_time[0], create_db_table_time[1]))
if add_part_time is not None:
LOG.info("[Performance] Add partition: Event count: {}, Time taken: {} s"
.format(add_part_time[0], add_part_time[1]))
LOG.info("[Performance] Insert into partition: Event count: {}, Time taken: {} s"
.format(insert_into_part_time[0], insert_into_part_time[1]))
if insert_time is not None:
LOG.info("[Performance] Insert: Event count: {}, Time taken: {} s"
.format(insert_time[0], insert_time[1]))
if refresh_part_time is not None:
LOG.info("[Performance] Refresh partition: Event count: {}, Time taken: {} s"
.format(refresh_part_time[0], refresh_part_time[1]))
if refresh_table_time is not None:
LOG.info("[Performance] Refresh table: Event count: {}, Time taken: {} s"
.format(refresh_table_time[0], refresh_table_time[1]))
if compute_stats_time is not None:
LOG.info("[Performance] Compute statistics: Event count: {}, Time taken: {} s"
.format(compute_stats_time[0], compute_stats_time[1]))
if total_time is not None:
LOG.info("[Performance] Processed together: Event count: {}, Time taken: {} s"
.format(total_time[0], total_time[1]))
LOG.info("[Performance] Drop table and database: Event count: {}, Time taken: {} s"
.format(drop_db_table_time[0], drop_db_table_time[1]))
def __drop_databases(self):
for iter in range(self.db_count):
self.client.execute("drop database if exists {} cascade"
.format(self.db_prefix + str(iter)))
def __process_events_now(self, start_event_id):
end_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
start_time = time.time()
self.__ensure_events_processed()
return end_event_id - start_event_id, time.time() - start_time
def __process_events(self, start_event_id):
if not self.process_events_together:
return self.__process_events_now(start_event_id)
def __ensure_events_processed(self):
self.client.execute(":event_processor('start')")
EventProcessorUtils.wait_for_event_processing(self, 100)
def __pause_event_processing(self):
output = self.client.execute(":event_processor('pause')").get_data()
lastSyncedEventId = re.search(r"LastSyncedEventId:\s*(\d+)", output)
if lastSyncedEventId:
return int(lastSyncedEventId.group(1))
else:
return EventProcessorUtils.get_current_notification_id(self.hive_client)
def __get_transactional_tblproperties(self, is_transactional):
"""Get the tblproperties for transactional tables"""
return "tblproperties ('transactional'='true'," \
"'transactional_properties'='insert_only')" if is_transactional else ""